This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 3cbd15d Implement limit pushdown (#38)
3cbd15d is described below
commit 3cbd15da4a52e3a0bc79d9b00f21d79f0844e496
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 20 12:27:06 2023 +0800
Implement limit pushdown (#38)
---
.../org/apache/paimon/trino/TrinoPageSource.java | 8 +++-
.../org/apache/paimon/trino/TrinoPageSource.java | 8 +++-
.../org/apache/paimon/trino/TrinoPageSource.java | 7 ++-
.../org/apache/paimon/trino/TrinoMetadataBase.java | 20 ++++++++
.../org/apache/paimon/trino/TrinoPageSource.java | 8 +++-
.../apache/paimon/trino/TrinoPageSourceBase.java | 56 +++++++++++++++++-----
.../paimon/trino/TrinoPageSourceProvider.java | 13 +++--
.../org/apache/paimon/trino/TrinoTableHandle.java | 27 +++++++++--
.../apache/paimon/trino/SimpleTableTestHelper.java | 9 +---
.../org/apache/paimon/trino/TestTrinoITCase.java | 7 +++
.../apache/paimon/trino/TestTrinoTableHandle.java | 8 +++-
11 files changed, 135 insertions(+), 36 deletions(-)
diff --git
a/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
b/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index bfdb18f..617c9d0 100644
---
a/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++
b/paimon-trino-358/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import java.util.List;
+import java.util.OptionalLong;
/** Trino {@link ConnectorPageSource}. */
public class TrinoPageSource extends TrinoPageSourceBase {
- public TrinoPageSource(RecordReader<InternalRow> reader,
List<ColumnHandle> projectedColumns) {
- super(reader, projectedColumns);
+ public TrinoPageSource(
+ RecordReader<InternalRow> reader,
+ List<ColumnHandle> projectedColumns,
+ OptionalLong limit) {
+ super(reader, projectedColumns, limit);
}
@Override
diff --git
a/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
b/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index bfdb18f..617c9d0 100644
---
a/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++
b/paimon-trino-368/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import java.util.List;
+import java.util.OptionalLong;
/** Trino {@link ConnectorPageSource}. */
public class TrinoPageSource extends TrinoPageSourceBase {
- public TrinoPageSource(RecordReader<InternalRow> reader,
List<ColumnHandle> projectedColumns) {
- super(reader, projectedColumns);
+ public TrinoPageSource(
+ RecordReader<InternalRow> reader,
+ List<ColumnHandle> projectedColumns,
+ OptionalLong limit) {
+ super(reader, projectedColumns, limit);
}
@Override
diff --git
a/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
b/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index cf6bcfe..75eb2d5 100644
---
a/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++
b/paimon-trino-422/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -51,8 +51,11 @@ import static
io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
/** Trino {@link ConnectorPageSource}. */
public class TrinoPageSource extends TrinoPageSourceBase {
- public TrinoPageSource(RecordReader<InternalRow> reader,
List<ColumnHandle> projectedColumns) {
- super(reader, projectedColumns);
+ public TrinoPageSource(
+ RecordReader<InternalRow> reader,
+ List<ColumnHandle> projectedColumns,
+ OptionalLong limit) {
+ super(reader, projectedColumns, limit);
}
protected void writeBlock(BlockBuilder output, Type type, DataType
logicalType, Object value) {
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
index 641a102..0f6dbc3 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadataBase.java
@@ -39,6 +39,7 @@ import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
@@ -55,6 +56,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.function.Function;
import static java.lang.String.format;
@@ -383,4 +385,22 @@ public abstract class TrinoMetadataBase implements
ConnectorMetadata {
List<? extends ColumnHandle> first, List<? extends ColumnHandle>
second) {
return new HashSet<>(first).equals(new HashSet<>(second));
}
+
+ @Override
+ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(
+ ConnectorSession session, ConnectorTableHandle handle, long limit)
{
+ TrinoTableHandle table = (TrinoTableHandle) handle;
+
+ if (table.getLimit().isPresent() && table.getLimit().getAsLong() <=
limit) {
+ return Optional.empty();
+ }
+
+ if (!table.getFilter().isAll()) {
+ return Optional.empty();
+ }
+
+ table = table.copy(OptionalLong.of(limit));
+
+ return Optional.of(new LimitApplicationResult<>(table, false, false));
+ }
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
index 767ea92..687f409 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSource.java
@@ -25,12 +25,16 @@ import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import java.util.List;
+import java.util.OptionalLong;
/** Trino {@link ConnectorPageSource}. */
public class TrinoPageSource extends TrinoPageSourceBase {
- public TrinoPageSource(RecordReader<InternalRow> reader,
List<ColumnHandle> projectedColumns) {
- super(reader, projectedColumns);
+ public TrinoPageSource(
+ RecordReader<InternalRow> reader,
+ List<ColumnHandle> projectedColumns,
+ OptionalLong limit) {
+ super(reader, projectedColumns, limit);
}
@Override
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
index f54cf6c..8c724e3 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceBase.java
@@ -25,9 +25,9 @@ import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.InternalRowUtils;
import io.airlift.slice.Slice;
@@ -48,11 +48,14 @@ import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
+import java.util.OptionalLong;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
@@ -77,16 +80,23 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Trino {@link ConnectorPageSource}. */
public abstract class TrinoPageSourceBase implements ConnectorPageSource {
- private final RecordReader<InternalRow> reader;
+ private static final int ROWS_PER_REQUEST = 4096;
+
+ private final CloseableIterator<InternalRow> iterator;
+ private final OptionalLong limit;
private final PageBuilder pageBuilder;
private final List<Type> columnTypes;
private final List<DataType> logicalTypes;
private boolean isFinished = false;
+ private long numReturn = 0;
public TrinoPageSourceBase(
- RecordReader<InternalRow> reader, List<ColumnHandle>
projectedColumns) {
- this.reader = reader;
+ RecordReader<InternalRow> reader,
+ List<ColumnHandle> projectedColumns,
+ OptionalLong limit) {
+ this.iterator = reader.toCloseableIterator();
+ this.limit = limit;
this.columnTypes = new ArrayList<>();
this.logicalTypes = new ArrayList<>();
for (ColumnHandle handle : projectedColumns) {
@@ -126,15 +136,23 @@ public abstract class TrinoPageSourceBase implements
ConnectorPageSource {
TrinoPageSourceBase.class.getClassLoader());
}
+ @Nullable
private Page nextPage() throws IOException {
- RecordIterator<InternalRow> batch = reader.readBatch();
- if (batch == null) {
- isFinished = true;
- return null;
- }
- InternalRow row;
- while ((row = batch.next()) != null) {
+ int count = 0;
+ while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
+ if (limit.isPresent() && numReturn + count >= limit.getAsLong()) {
+ isFinished = true;
+ return returnPage(count);
+ }
+
+ if (!iterator.hasNext()) {
+ isFinished = true;
+ return returnPage(count);
+ }
+
+ InternalRow row = iterator.next();
pageBuilder.declarePosition();
+ count++;
for (int i = 0; i < columnTypes.size(); i++) {
BlockBuilder output = pageBuilder.getBlockBuilder(i);
appendTo(
@@ -144,7 +162,15 @@ public abstract class TrinoPageSourceBase implements
ConnectorPageSource {
output);
}
}
- batch.releaseBatch();
+
+ return returnPage(count);
+ }
+
+ private Page returnPage(int count) {
+ if (count == 0) {
+ return null;
+ }
+ numReturn += count;
Page page = pageBuilder.build();
pageBuilder.reset();
return page;
@@ -152,7 +178,11 @@ public abstract class TrinoPageSourceBase implements
ConnectorPageSource {
@Override
public void close() throws IOException {
- this.reader.close();
+ try {
+ this.iterator.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
protected void appendTo(Type type, DataType logicalType, Object value,
BlockBuilder output) {
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index f44b1b0..b8bed14 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -34,6 +34,7 @@ import io.trino.spi.predicate.TupleDomain;
import java.io.IOException;
import java.util.List;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
import static
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
@@ -54,7 +55,11 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
return runWithContextClassLoader(
() ->
createPageSource(
- table, trinoTableHandle.getFilter(),
(TrinoSplit) split, columns),
+ table,
+ trinoTableHandle.getFilter(),
+ (TrinoSplit) split,
+ columns,
+ trinoTableHandle.getLimit()),
TrinoPageSourceProvider.class.getClassLoader());
}
@@ -62,7 +67,8 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
Table table,
TupleDomain<TrinoColumnHandle> filter,
TrinoSplit split,
- List<ColumnHandle> columns) {
+ List<ColumnHandle> columns,
+ OptionalLong limit) {
ReadBuilder read = table.newReadBuilder();
RowType rowType = table.rowType();
List<String> fieldNames = FieldNameUtils.fieldNames(rowType);
@@ -79,7 +85,8 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
new
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
try {
- return new
TrinoPageSource(read.newRead().createReader(split.decodeSplit()), columns);
+ return new TrinoPageSource(
+ read.newRead().createReader(split.decodeSplit()), columns,
limit);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
index 75ac401..1034e26 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoTableHandle.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
/** Trino {@link ConnectorTableHandle}. */
@@ -53,11 +54,18 @@ public final class TrinoTableHandle implements
ConnectorTableHandle {
private final byte[] serializedTable;
private final TupleDomain<TrinoColumnHandle> filter;
private final Optional<List<ColumnHandle>> projectedColumns;
+ private final OptionalLong limit;
private Table lazyTable;
public TrinoTableHandle(String schemaName, String tableName, byte[]
serializedTable) {
- this(schemaName, tableName, serializedTable, TupleDomain.all(),
Optional.empty());
+ this(
+ schemaName,
+ tableName,
+ serializedTable,
+ TupleDomain.all(),
+ Optional.empty(),
+ OptionalLong.empty());
}
@JsonCreator
@@ -66,12 +74,14 @@ public final class TrinoTableHandle implements
ConnectorTableHandle {
@JsonProperty("tableName") String tableName,
@JsonProperty("serializedTable") byte[] serializedTable,
@JsonProperty("filter") TupleDomain<TrinoColumnHandle> filter,
- @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns) {
+ @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns,
+ @JsonProperty("limit") OptionalLong limit) {
this.schemaName = schemaName;
this.tableName = tableName;
this.serializedTable = serializedTable;
this.filter = filter;
this.projectedColumns = projectedColumns;
+ this.limit = limit;
}
@JsonProperty
@@ -99,14 +109,23 @@ public final class TrinoTableHandle implements
ConnectorTableHandle {
return projectedColumns;
}
+ public OptionalLong getLimit() {
+ return limit;
+ }
+
public TrinoTableHandle copy(TupleDomain<TrinoColumnHandle> filter) {
return new TrinoTableHandle(
- schemaName, tableName, serializedTable, filter,
projectedColumns);
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
}
public TrinoTableHandle copy(Optional<List<ColumnHandle>>
projectedColumns) {
return new TrinoTableHandle(
- schemaName, tableName, serializedTable, filter,
projectedColumns);
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
+ }
+
+ public TrinoTableHandle copy(OptionalLong limit) {
+ return new TrinoTableHandle(
+ schemaName, tableName, serializedTable, filter,
projectedColumns, limit);
}
public Table tableWithDynamicOptions(ConnectorSession session) {
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
index 621cfbc..2ef7fc7 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
@@ -30,7 +30,6 @@ import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.types.RowType;
import java.util.Collections;
-import java.util.HashMap;
/** A simple table test helper to write and commit. */
public class SimpleTableTestHelper {
@@ -44,12 +43,8 @@ public class SimpleTableTestHelper {
new Schema(
rowType.getFields(),
Collections.emptyList(),
- Collections.emptyList(),
- new HashMap<>() {
- {
- put("write-mode", "change-log");
- }
- },
+ Collections.singletonList("a"),
+ Collections.emptyMap(),
""));
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), path);
String user = "user";
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index c9ab033..64140d9 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -233,6 +233,13 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
assertThat(sql("SELECT SUM(b) FROM
paimon.default.t1")).isEqualTo("[[8]]");
}
+ @Test
+ public void testLimit() {
+ assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT
1")).isEqualTo("[[1, 2, 1, 1]]");
+ assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+ .isEqualTo("[[5, 6, 3, 3]]");
+ }
+
@Test
public void testSystemTable() {
assertThat(
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
index 10f58fc..08986d7 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoTableHandle.java
@@ -23,6 +23,7 @@ import io.trino.spi.predicate.TupleDomain;
import org.junit.jupiter.api.Test;
import java.util.Optional;
+import java.util.OptionalLong;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,7 +37,12 @@ public class TestTrinoTableHandle {
byte[] serializedTable = TrinoTestUtils.getSerializedTable();
TrinoTableHandle expected =
new TrinoTableHandle(
- "test", "user", serializedTable, TupleDomain.all(),
Optional.empty());
+ "test",
+ "user",
+ serializedTable,
+ TupleDomain.all(),
+ Optional.empty(),
+ OptionalLong.empty());
testRoundTrip(expected);
}