This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cbd15d  Implement limit pushdown (#38)
3cbd15d is described below

commit 3cbd15da4a52e3a0bc79d9b00f21d79f0844e496
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 20 12:27:06 2023 +0800

    Implement limit pushdown (#38)
---
 .../org/apache/paimon/trino/TrinoPageSource.java   |  8 +++-
 .../org/apache/paimon/trino/TrinoPageSource.java   |  8 +++-
 .../org/apache/paimon/trino/TrinoPageSource.java   |  7 ++-
 .../org/apache/paimon/trino/TrinoMetadataBase.java | 20 ++++++++
 .../org/apache/paimon/trino/TrinoPageSource.java   |  8 +++-
 .../apache/paimon/trino/TrinoPageSourceBase.java   | 56 +++++++++++++++++-----
 .../paimon/trino/TrinoPageSourceProvider.java      | 13 +++--
 .../org/apache/paimon/trino/TrinoTableHandle.java  | 27 +++++++++--
 .../apache/paimon/trino/SimpleTableTestHelper.java |  9 +---
 .../org/apache/paimon/trino/TestTrinoITCase.java   |  7 +++
 .../apache/paimon/trino/TestTrinoTableHandle.java  |  8 +++-
 11 files changed, 135 insertions(+), 36 deletions(-)

diff --git 
a/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java 
b/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index bfdb18f..617c9d0 100644
--- 
a/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++ 
b/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ConnectorPageSource;
 
 import java.util.List;
+import java.util.OptionalLong;
 
 /** Trino {@link ConnectorPageSource}. */
 public class TrinoPageSource extends TrinoPageSourceBase {
 
-    public TrinoPageSource(RecordReader<InternalRow> reader, 
List<ColumnHandle> projectedColumns) {
-        super(reader, projectedColumns);
+    public TrinoPageSource(
+            RecordReader<InternalRow> reader,
+            List<ColumnHandle> projectedColumns,
+            OptionalLong limit) {
+        super(reader, projectedColumns, limit);
     }
 
     @Override
diff --git 
a/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java 
b/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index bfdb18f..617c9d0 100644
--- 
a/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++ 
b/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ConnectorPageSource;
 
 import java.util.List;
+import java.util.OptionalLong;
 
 /** Trino {@link ConnectorPageSource}. */
 public class TrinoPageSource extends TrinoPageSourceBase {
 
-    public TrinoPageSource(RecordReader<InternalRow> reader, 
List<ColumnHandle> projectedColumns) {
-        super(reader, projectedColumns);
+    public TrinoPageSource(
+            RecordReader<InternalRow> reader,
+            List<ColumnHandle> projectedColumns,
+            OptionalLong limit) {
+        super(reader, projectedColumns, limit);
     }
 
     @Override
diff --git 
a/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java 
b/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index cf6bcfe..75eb2d5 100644
--- 
a/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++ 
b/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -51,8 +51,11 @@ import static 
io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 /** Trino {@link ConnectorPageSource}. */
 public class TrinoPageSource extends TrinoPageSourceBase {
 
-    public TrinoPageSource(RecordReader<InternalRow> reader, 
List<ColumnHandle> projectedColumns) {
-        super(reader, projectedColumns);
+    public TrinoPageSource(
+            RecordReader<InternalRow> reader,
+            List<ColumnHandle> projectedColumns,
+            OptionalLong limit) {
+        super(reader, projectedColumns, limit);
     }
 
     protected void writeBlock(BlockBuilder output, Type type, DataType 
logicalType, Object value) {
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
index 641a102..0f6dbc3 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
@@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorTableMetadata;
 import io.trino.spi.connector.ConnectorTableProperties;
 import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
 import io.trino.spi.connector.ProjectionApplicationResult;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SchemaTablePrefix;
@@ -55,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.function.Function;
 
 import static java.lang.String.format;
@@ -383,4 +385,22 @@ public abstract class TrinoMetadataBase implements 
ConnectorMetadata {
             List<? extends ColumnHandle> first, List<? extends ColumnHandle> 
second) {
         return new HashSet<>(first).equals(new HashSet<>(second));
     }
+
+    @Override
+    public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
+            ConnectorSession session, ConnectorTableHandle handle, long limit) 
{
+        TrinoTableHandle table = (TrinoTableHandle) handle;
+
+        if (table.getLimit().isPresent() && table.getLimit().getAsLong() <= 
limit) {
+            return Optional.empty();
+        }
+
+        if (!table.getFilter().isAll()) {
+            return Optional.empty();
+        }
+
+        table = table.copy(OptionalLong.of(limit));
+
+        return Optional.of(new LimitApplicationResult<>(table, false, false));
+    }
 }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index 767ea92..687f409 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ConnectorPageSource;
 
 import java.util.List;
+import java.util.OptionalLong;
 
 /** Trino {@link ConnectorPageSource}. */
 public class TrinoPageSource extends TrinoPageSourceBase {
 
-    public TrinoPageSource(RecordReader<InternalRow> reader, 
List<ColumnHandle> projectedColumns) {
-        super(reader, projectedColumns);
+    public TrinoPageSource(
+            RecordReader<InternalRow> reader,
+            List<ColumnHandle> projectedColumns,
+            OptionalLong limit) {
+        super(reader, projectedColumns, limit);
     }
 
     @Override
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
index f54cf6c..8c724e3 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
@@ -25,9 +25,9 @@ import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.InternalRowUtils;
 
 import io.airlift.slice.Slice;
@@ -48,11 +48,14 @@ import io.trino.spi.type.Type;
 import io.trino.spi.type.VarbinaryType;
 import io.trino.spi.type.VarcharType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.OptionalLong;
 
 import static io.airlift.slice.Slices.wrappedBuffer;
 import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
@@ -77,16 +80,23 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Trino {@link ConnectorPageSource}. */
 public abstract class TrinoPageSourceBase implements ConnectorPageSource {
 
-    private final RecordReader<InternalRow> reader;
+    private static final int ROWS_PER_REQUEST = 4096;
+
+    private final CloseableIterator<InternalRow> iterator;
+    private final OptionalLong limit;
     private final PageBuilder pageBuilder;
     private final List<Type> columnTypes;
     private final List<DataType> logicalTypes;
 
     private boolean isFinished = false;
+    private long numReturn = 0;
 
     public TrinoPageSourceBase(
-            RecordReader<InternalRow> reader, List<ColumnHandle> 
projectedColumns) {
-        this.reader = reader;
+            RecordReader<InternalRow> reader,
+            List<ColumnHandle> projectedColumns,
+            OptionalLong limit) {
+        this.iterator = reader.toCloseableIterator();
+        this.limit = limit;
         this.columnTypes = new ArrayList<>();
         this.logicalTypes = new ArrayList<>();
         for (ColumnHandle handle : projectedColumns) {
@@ -126,15 +136,23 @@ public abstract class TrinoPageSourceBase implements 
ConnectorPageSource {
                 TrinoPageSourceBase.class.getClassLoader());
     }
 
+    @Nullable
     private Page nextPage() throws IOException {
-        RecordIterator<InternalRow> batch = reader.readBatch();
-        if (batch == null) {
-            isFinished = true;
-            return null;
-        }
-        InternalRow row;
-        while ((row = batch.next()) != null) {
+        int count = 0;
+        while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
+            if (limit.isPresent() && numReturn + count >= limit.getAsLong()) {
+                isFinished = true;
+                return returnPage(count);
+            }
+
+            if (!iterator.hasNext()) {
+                isFinished = true;
+                return returnPage(count);
+            }
+
+            InternalRow row = iterator.next();
             pageBuilder.declarePosition();
+            count++;
             for (int i = 0; i < columnTypes.size(); i++) {
                 BlockBuilder output = pageBuilder.getBlockBuilder(i);
                 appendTo(
@@ -144,7 +162,15 @@ public abstract class TrinoPageSourceBase implements 
ConnectorPageSource {
                         output);
             }
         }
-        batch.releaseBatch();
+
+        return returnPage(count);
+    }
+
+    private Page returnPage(int count) {
+        if (count == 0) {
+            return null;
+        }
+        numReturn += count;
         Page page = pageBuilder.build();
         pageBuilder.reset();
         return page;
@@ -152,7 +178,11 @@ public abstract class TrinoPageSourceBase implements 
ConnectorPageSource {
 
     @Override
     public void close() throws IOException {
-        this.reader.close();
+        try {
+            this.iterator.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     protected void appendTo(Type type, DataType logicalType, Object value, 
BlockBuilder output) {
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index f44b1b0..b8bed14 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -34,6 +34,7 @@ import io.trino.spi.predicate.TupleDomain;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
@@ -54,7 +55,11 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
         return runWithContextClassLoader(
                 () ->
                         createPageSource(
-                                table, trinoTableHandle.getFilter(), 
(TrinoSplit) split, columns),
+                                table,
+                                trinoTableHandle.getFilter(),
+                                (TrinoSplit) split,
+                                columns,
+                                trinoTableHandle.getLimit()),
                 TrinoPageSourceProvider.class.getClassLoader());
     }
 
@@ -62,7 +67,8 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
             Table table,
             TupleDomain<TrinoColumnHandle> filter,
             TrinoSplit split,
-            List<ColumnHandle> columns) {
+            List<ColumnHandle> columns,
+            OptionalLong limit) {
         ReadBuilder read = table.newReadBuilder();
         RowType rowType = table.rowType();
         List<String> fieldNames = FieldNameUtils.fieldNames(rowType);
@@ -79,7 +85,8 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
         new 
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
 
         try {
-            return new 
TrinoPageSource(read.newRead().createReader(split.decodeSplit()), columns);
+            return new TrinoPageSource(
+                    read.newRead().createReader(split.decodeSplit()), columns, 
limit);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index 75ac401..1034e26 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 /** Trino {@link ConnectorTableHandle}. */
@@ -53,11 +54,18 @@ public final class TrinoTableHandle implements 
ConnectorTableHandle {
     private final byte[] serializedTable;
     private final TupleDomain<TrinoColumnHandle> filter;
     private final Optional<List<ColumnHandle>> projectedColumns;
+    private final OptionalLong limit;
 
     private Table lazyTable;
 
     public TrinoTableHandle(String schemaName, String tableName, byte[] 
serializedTable) {
-        this(schemaName, tableName, serializedTable, TupleDomain.all(), 
Optional.empty());
+        this(
+                schemaName,
+                tableName,
+                serializedTable,
+                TupleDomain.all(),
+                Optional.empty(),
+                OptionalLong.empty());
     }
 
     @JsonCreator
@@ -66,12 +74,14 @@ public final class TrinoTableHandle implements 
ConnectorTableHandle {
             @JsonProperty("tableName") String tableName,
             @JsonProperty("serializedTable") byte[] serializedTable,
             @JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
-            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns) {
+            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns,
+            @JsonProperty("limit") OptionalLong limit) {
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.serializedTable = serializedTable;
         this.filter = filter;
         this.projectedColumns = projectedColumns;
+        this.limit = limit;
     }
 
     @JsonProperty
@@ -99,14 +109,23 @@ public final class TrinoTableHandle implements 
ConnectorTableHandle {
         return projectedColumns;
     }
 
+    public OptionalLong getLimit() {
+        return limit;
+    }
+
     public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) {
         return new TrinoTableHandle(
-                schemaName, tableName, serializedTable, filter, 
projectedColumns);
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
     }
 
     public TrinoTableHandle copy(Optional<List<ColumnHandle>> 
projectedColumns) {
         return new TrinoTableHandle(
-                schemaName, tableName, serializedTable, filter, 
projectedColumns);
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
+    }
+
+    public TrinoTableHandle copy(OptionalLong limit) {
+        return new TrinoTableHandle(
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, limit);
     }
 
     public Table tableWithDynamicOptions(ConnectorSession session) {
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
index 621cfbc..2ef7fc7 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
@@ -30,7 +30,6 @@ import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.types.RowType;
 
 import java.util.Collections;
-import java.util.HashMap;
 
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
@@ -44,12 +43,8 @@ public class SimpleTableTestHelper {
                         new Schema(
                                 rowType.getFields(),
                                 Collections.emptyList(),
-                                Collections.emptyList(),
-                                new HashMap<>() {
-                                    {
-                                        put("write-mode", "change-log");
-                                    }
-                                },
+                                Collections.singletonList("a"),
+                                Collections.emptyMap(),
                                 ""));
         FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
         String user = "user";
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index c9ab033..64140d9 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -233,6 +233,13 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
         assertThat(sql("SELECT SUM(b) FROM 
paimon.default.t1")).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testLimit() {
+        assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT 
1")).isEqualTo("[[1, 2, 1, 1]]");
+        assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+                .isEqualTo("[[5, 6, 3, 3]]");
+    }
+
     @Test
     public void testSystemTable() {
         assertThat(
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
index 10f58fc..08986d7 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
@@ -23,6 +23,7 @@ import io.trino.spi.predicate.TupleDomain;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -36,7 +37,12 @@ public class TestTrinoTableHandle {
         byte[] serializedTable = TrinoTestUtils.getSerializedTable();
         TrinoTableHandle expected =
                 new TrinoTableHandle(
-                        "test", "user", serializedTable, TupleDomain.all(), 
Optional.empty());
+                        "test",
+                        "user",
+                        serializedTable,
+                        TupleDomain.all(),
+                        Optional.empty(),
+                        OptionalLong.empty());
         testRoundTrip(expected);
     }
 

Reply via email to