This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 6a5a34c Support skip by file index. (#70)
6a5a34c is described below
commit 6a5a34cf940f4077689f9fe1d5ef69d577a01aaa
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 29 15:15:07 2024 +0800
Support skip by file index. (#70)
Co-authored-by: yejunhao <[email protected]>
---
.../paimon/trino/TrinoPageSourceProvider.java | 23 +++++++++-
.../org/apache/paimon/trino/TestTrinoITCase.java | 53 ++++++++++++++++++++--
2 files changed, 72 insertions(+), 4 deletions(-)
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index e2244b5..7917588 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -20,10 +20,14 @@ package org.apache.paimon.trino;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IndexFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -128,12 +132,14 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
.map(TrinoColumnHandle::getColumnName)
.toList();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
+ Optional<Predicate> paimonFilter = new
TrinoFilterConverter(rowType).convert(filter);
try {
Split paimonSplit = split.decodeSplit();
Optional<List<RawFile>> optionalRawFiles =
paimonSplit.convertToRawFiles();
if (checkRawFile(optionalRawFiles)) {
Optional<List<DeletionFile>> deletionFiles =
paimonSplit.deletionFiles();
+ Optional<List<IndexFile>> indexFiles =
paimonSplit.indexFiles();
FileStoreTable fileStoreTable = (FileStoreTable) table;
SchemaManager schemaManager =
@@ -148,8 +154,23 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
List<RawFile> files = optionalRawFiles.orElseThrow();
LinkedList<ConnectorPageSource> sources = new
LinkedList<>();
+ // if file index exists, do the filter.
for (int i = 0; i < files.size(); i++) {
RawFile rawFile = files.get(i);
+ if (indexFiles.isPresent()) {
+ IndexFile indexFile = indexFiles.get().get(i);
+ if (indexFile != null && paimonFilter.isPresent())
{
+ try (FileIndexPredicate fileIndexPredicate =
+ new FileIndexPredicate(
+ new Path(indexFile.path()),
+ ((FileStoreTable)
table).fileIO(),
+ rowType)) {
+ if
(!fileIndexPredicate.testPredicate(paimonFilter.get())) {
+ continue;
+ }
+ }
+ }
+ }
ConnectorPageSource source =
createDataPageSource(
rawFile.format(),
@@ -200,7 +221,7 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
// old read way
ReadBuilder read = table.newReadBuilder();
- new
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
+ paimonFilter.ifPresent(read::withFilter);
if (!fieldNames.equals(projectedFields)) {
read.withProjection(columnIndex);
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index c9ae2f1..99f54d4 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -447,6 +447,46 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
commit.commit(1, writer.prepareCommit(true, 1));
}
+ {
+ Path tablePath = new Path(warehouse, "default.db/t102");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", DataTypes.STRING()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c", DataTypes.INT())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>() {
+ {
+
put("file-index.bloom-filter.columns", "a,b,c");
+ }
+ },
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ InnerTableWrite writer = table.newWrite("user");
+ writer.withIOManager(new IOManagerImpl("/tmp"));
+ InnerTableCommit commit = table.newCommit("user");
+ for (int i = 0; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i),
i, i));
+ }
+ commit.commit(0, writer.prepareCommit(true, 0));
+
+ for (int i = 1; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i),
i, i));
+ }
+ commit.commit(1, writer.prepareCommit(true, 1));
+
+ for (int i = 2; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i),
i, i));
+ }
+ commit.commit(2, writer.prepareCommit(true, 2));
+ }
+
DistributedQueryRunner queryRunner = null;
try {
queryRunner =
@@ -579,7 +619,8 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "changelog_producer = 'input'"
+ ")");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [orders], [t1], [t100], [t101], [t2],
[t3], [t4], [t99]]");
+ .isEqualTo(
+ "[[empty_t], [orders], [t1], [t100], [t101], [t102],
[t2], [t3], [t4], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.orders");
}
@@ -602,7 +643,8 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ ")");
sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3],
[t4], [t6], [t99]]");
+ .isEqualTo(
+ "[[empty_t], [t1], [t100], [t101], [t102], [t2], [t3],
[t4], [t6], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.t6");
}
@@ -625,7 +667,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ ")");
sql("DROP TABLE IF EXISTS paimon.default.t5");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3],
[t4], [t99]]");
+ .isEqualTo("[[empty_t], [t1], [t100], [t101], [t102], [t2],
[t3], [t4], [t99]]");
}
@Test
@@ -775,6 +817,11 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
"[[a1, 1, 1], [a2, 2, 2], [a3, 3, 3], [a4, 4, 4], [a5,
5, 5], [a6, 6, 6], [a7, 7, 7], [a8, 8, 8], [a9, 9, 9]]");
}
+ @Test
+ public void testFileIndex() {
+ assertThat(sql("SELECT * FROM paimon.default.t102 where c =
2")).isEqualTo("[[a2, 2, 2]]");
+ }
+
protected String sql(String sql) {
MaterializedResult result = getQueryRunner().execute(sql);
return result.getMaterializedRows().toString();