This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a5a34c  Support skip by file index. (#70)
6a5a34c is described below

commit 6a5a34cf940f4077689f9fe1d5ef69d577a01aaa
Author: YeJunHao <[email protected]>
AuthorDate: Mon Apr 29 15:15:07 2024 +0800

    Support skip by file index. (#70)
    
    Co-authored-by: yejunhao <[email protected]>
---
 .../paimon/trino/TrinoPageSourceProvider.java      | 23 +++++++++-
 .../org/apache/paimon/trino/TestTrinoITCase.java   | 53 ++++++++++++++++++++--
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index e2244b5..7917588 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -20,10 +20,14 @@ package org.apache.paimon.trino;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fileindex.FileIndexPredicate;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.IndexFile;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -128,12 +132,14 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
                         .map(TrinoColumnHandle::getColumnName)
                         .toList();
         TrinoFileSystem fileSystem = fileSystemFactory.create(session);
+        Optional<Predicate> paimonFilter = new 
TrinoFilterConverter(rowType).convert(filter);
 
         try {
             Split paimonSplit = split.decodeSplit();
             Optional<List<RawFile>> optionalRawFiles = 
paimonSplit.convertToRawFiles();
             if (checkRawFile(optionalRawFiles)) {
                 Optional<List<DeletionFile>> deletionFiles = 
paimonSplit.deletionFiles();
+                Optional<List<IndexFile>> indexFiles = 
paimonSplit.indexFiles();
 
                 FileStoreTable fileStoreTable = (FileStoreTable) table;
                 SchemaManager schemaManager =
@@ -148,8 +154,23 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
                     List<RawFile> files = optionalRawFiles.orElseThrow();
                     LinkedList<ConnectorPageSource> sources = new 
LinkedList<>();
 
+                    // if file index exists, do the filter.
                     for (int i = 0; i < files.size(); i++) {
                         RawFile rawFile = files.get(i);
+                        if (indexFiles.isPresent()) {
+                            IndexFile indexFile = indexFiles.get().get(i);
+                            if (indexFile != null && paimonFilter.isPresent()) 
{
+                                try (FileIndexPredicate fileIndexPredicate =
+                                        new FileIndexPredicate(
+                                                new Path(indexFile.path()),
+                                                ((FileStoreTable) 
table).fileIO(),
+                                                rowType)) {
+                                    if 
(!fileIndexPredicate.testPredicate(paimonFilter.get())) {
+                                        continue;
+                                    }
+                                }
+                            }
+                        }
                         ConnectorPageSource source =
                                 createDataPageSource(
                                         rawFile.format(),
@@ -200,7 +221,7 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
 
                 // old read way
                 ReadBuilder read = table.newReadBuilder();
-                new 
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
+                paimonFilter.ifPresent(read::withFilter);
 
                 if (!fieldNames.equals(projectedFields)) {
                     read.withProjection(columnIndex);
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index c9ae2f1..99f54d4 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -447,6 +447,46 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
             commit.commit(1, writer.prepareCommit(true, 1));
         }
 
+        {
+            Path tablePath = new Path(warehouse, "default.db/t102");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "a", DataTypes.STRING()),
+                                    new DataField(1, "b", DataTypes.INT()),
+                                    new DataField(2, "c", DataTypes.INT())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new HashMap<>() {
+                                        {
+                                            
put("file-index.bloom-filter.columns", "a,b,c");
+                                        }
+                                    },
+                                    ""));
+            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+            InnerTableWrite writer = table.newWrite("user");
+            writer.withIOManager(new IOManagerImpl("/tmp"));
+            InnerTableCommit commit = table.newCommit("user");
+            for (int i = 0; i < 100; i = i + 3) {
+                writer.write(GenericRow.of(BinaryString.fromString("a" + i), 
i, i));
+            }
+            commit.commit(0, writer.prepareCommit(true, 0));
+
+            for (int i = 1; i < 100; i = i + 3) {
+                writer.write(GenericRow.of(BinaryString.fromString("a" + i), 
i, i));
+            }
+            commit.commit(1, writer.prepareCommit(true, 1));
+
+            for (int i = 2; i < 100; i = i + 3) {
+                writer.write(GenericRow.of(BinaryString.fromString("a" + i), 
i, i));
+            }
+            commit.commit(2, writer.prepareCommit(true, 2));
+        }
+
         DistributedQueryRunner queryRunner = null;
         try {
             queryRunner =
@@ -579,7 +619,8 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + "changelog_producer = 'input'"
                         + ")");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [orders], [t1], [t100], [t101], [t2], 
[t3], [t4], [t99]]");
+                .isEqualTo(
+                        "[[empty_t], [orders], [t1], [t100], [t101], [t102], 
[t2], [t3], [t4], [t99]]");
         sql("DROP TABLE IF EXISTS paimon.default.orders");
     }
 
@@ -602,7 +643,8 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + ")");
         sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3], 
[t4], [t6], [t99]]");
+                .isEqualTo(
+                        "[[empty_t], [t1], [t100], [t101], [t102], [t2], [t3], 
[t4], [t6], [t99]]");
         sql("DROP TABLE IF EXISTS paimon.default.t6");
     }
 
@@ -625,7 +667,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + ")");
         sql("DROP TABLE IF EXISTS paimon.default.t5");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3], 
[t4], [t99]]");
+                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t102], [t2], 
[t3], [t4], [t99]]");
     }
 
     @Test
@@ -775,6 +817,11 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         "[[a1, 1, 1], [a2, 2, 2], [a3, 3, 3], [a4, 4, 4], [a5, 
5, 5], [a6, 6, 6], [a7, 7, 7], [a8, 8, 8], [a9, 9, 9]]");
     }
 
+    @Test
+    public void testFileIndex() {
+        assertThat(sql("SELECT * FROM paimon.default.t102 where c = 
2")).isEqualTo("[[a2, 2, 2]]");
+    }
+
     protected String sql(String sql) {
         MaterializedResult result = getQueryRunner().execute(sql);
         return result.getMaterializedRows().toString();

Reply via email to