This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-trino.git


The following commit(s) were added to refs/heads/main by this push:
     new 04058be  Support deletion file for primary key table (#59)
04058be is described below

commit 04058be38fbbfdfbe22c6589ca30ae02032a41e4
Author: YeJunHao <[email protected]>
AuthorDate: Mon Mar 18 20:17:31 2024 +0800

    Support deletion file for primary key table (#59)
---
 .../paimon/trino/TrinoPageSourceProvider.java      | 141 ++++++++++++---------
 .../paimon/trino/TrinoPageSourceWrapper.java       | 126 ++++++++++++++++++
 .../org/apache/paimon/trino/TestTrinoITCase.java   |  49 ++++++-
 3 files changed, 255 insertions(+), 61 deletions(-)

diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index 15fad2b..5510128 100644
--- 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.trino;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -57,6 +59,7 @@ import io.trino.spi.predicate.TupleDomain;
 import io.trino.spi.type.Type;
 import org.joda.time.DateTimeZone;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -64,13 +67,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
-import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 import static 
io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
 import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
 import static java.util.Objects.requireNonNull;
-import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexMapping;
 import static 
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
 
 /** Trino {@link ConnectorPageSourceProvider}. */
@@ -126,60 +127,77 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
                         .map(TrinoColumnHandle.class::cast)
                         .map(TrinoColumnHandle::getColumnName)
                         .toList();
-        int[] columnIndex =
-                // the column index, very important
-                
projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
-
         TrinoFileSystem fileSystem = fileSystemFactory.create(session);
 
         try {
             Split paimonSplit = split.decodeSplit();
             Optional<List<RawFile>> optionalRawFiles = 
paimonSplit.convertToRawFiles();
             if (checkRawFile(optionalRawFiles)) {
+                Optional<List<DeletionFile>> deletionFiles = 
paimonSplit.deletionFiles();
+
                 FileStoreTable fileStoreTable = (FileStoreTable) table;
                 SchemaManager schemaManager =
                         new SchemaManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+
                 List<Type> type =
                         columns.stream()
                                 .map(s -> ((TrinoColumnHandle) 
s).getTrinoType())
                                 .collect(Collectors.toList());
+
                 try {
-                    return new DirectTrinoPageSource(
-                            optionalRawFiles.orElseThrow().stream()
-                                    .map(
-                                            rawFile ->
-                                                    createDataPageSource(
-                                                            rawFile.format(),
-                                                            
fileSystem.newInputFile(
-                                                                    
Location.of(rawFile.path())),
-                                                            
fileStoreTable.coreOptions(),
-                                                            // map table 
column index to data column
-                                                            // index, if 
column does not exist in
-                                                            // data columns, 
set it to -1
-                                                            // columns those 
set to -1 will generate
-                                                            // a null vector 
in orc page
-                                                            mapping(
-                                                                    
columnIndex,
-                                                                    
rowType.getFields(),
-                                                                    
schemaManager
-                                                                            
.schema(
-                                                                               
     rawFile
-                                                                               
             .schemaId())
-                                                                            
.fields()),
-                                                            type,
-                                                            
orderDomains(projectedFields, filter)))
-                                    .collect(
-                                            Collector.of(
-                                                    LinkedList::new,
-                                                    List::add,
-                                                    (left, right) -> {
-                                                        left.addAll(right);
-                                                        return left;
-                                                    })));
+                    List<RawFile> files = optionalRawFiles.orElseThrow();
+                    LinkedList<ConnectorPageSource> sources = new 
LinkedList<>();
+
+                    for (int i = 0; i < files.size(); i++) {
+                        RawFile rawFile = files.get(i);
+                        ConnectorPageSource source =
+                                createDataPageSource(
+                                        rawFile.format(),
+                                        
fileSystem.newInputFile(Location.of(rawFile.path())),
+                                        fileStoreTable.coreOptions(),
+                                        // map table column name to data column
+                                        // name, if column does not exist in
+                                        // data columns, set it to null
+                                        // columns those set to null will 
generate
+                                        // a null vector in orc page
+                                        fileStoreTable.schema().id() == 
rawFile.schemaId()
+                                                ? projectedFields
+                                                : schemaEvolutionFieldNames(
+                                                        projectedFields,
+                                                        rowType.getFields(),
+                                                        schemaManager
+                                                                
.schema(rawFile.schemaId())
+                                                                .fields()),
+                                        type,
+                                        orderDomains(projectedFields, filter));
+
+                        if (deletionFiles.isPresent()) {
+                            source =
+                                    TrinoPageSourceWrapper.wrap(
+                                            source,
+                                            
Optional.ofNullable(deletionFiles.get().get(i))
+                                                    .map(
+                                                            deletionFile -> {
+                                                                try {
+                                                                    return 
DeletionVector.read(
+                                                                            
fileStoreTable.fileIO(),
+                                                                            
deletionFile);
+                                                                } catch 
(IOException e) {
+                                                                    throw new 
RuntimeException(e);
+                                                                }
+                                                            }));
+                        }
+                        sources.add(source);
+                    }
+
+                    return new DirectTrinoPageSource(sources);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
             } else {
+                int[] columnIndex =
+                        
projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
+
                 // old read way
                 ReadBuilder read = table.newReadBuilder();
                 new 
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
@@ -226,19 +244,20 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
         return true;
     }
 
-    // map the table schema columnsIndex to data schema columnsIndex
-    private int[] mapping(
-            int[] tableSchemaColumnIndex, List<DataField> tableFields, 
List<DataField> dataFields) {
+    // map the table schema column names to data schema column names
+    private List<String> schemaEvolutionFieldNames(
+            List<String> fieldNames, List<DataField> tableFields, 
List<DataField> dataFields) {
 
-        int[] mapping = createIndexMapping(tableFields, dataFields);
-        if (mapping == null) {
-            return tableSchemaColumnIndex;
-        }
-        int[] result = new int[tableSchemaColumnIndex.length];
+        Map<String, Integer> fieldNameToId = new HashMap<>();
+        Map<Integer, String> idToFieldName = new HashMap<>();
+        List<String> result = new ArrayList<>();
+
+        tableFields.forEach(field -> fieldNameToId.put(field.name(), 
field.id()));
+        dataFields.forEach(field -> idToFieldName.put(field.id(), 
field.name()));
 
-        for (int i = 0; i < tableSchemaColumnIndex.length; i++) {
-            int po = tableSchemaColumnIndex[i];
-            result[i] = mapping[po];
+        for (String fieldName : fieldNames) {
+            Integer id = fieldNameToId.get(fieldName);
+            result.add(idToFieldName.getOrDefault(id, null));
         }
         return result;
     }
@@ -248,7 +267,7 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
             TrinoInputFile inputFile,
             // TODO construct read option by core-options
             CoreOptions coreOptions,
-            int[] columns,
+            List<String> columns,
             List<Type> types,
             List<Domain> domains) {
         switch (format) {
@@ -287,7 +306,7 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
     private ConnectorPageSource createOrcDataPageSource(
             TrinoInputFile inputFile,
             OrcReaderOptions options,
-            int[] columns,
+            List<String> columns,
             List<Type> types,
             List<Domain> domains) {
         try {
@@ -297,22 +316,28 @@ public class TrinoPageSourceProvider implements 
ConnectorPageSourceProvider {
                             .orElseThrow(() -> new RuntimeException("ORC file 
is zero length"));
 
             List<OrcColumn> fileColumns = 
reader.getRootColumn().getNestedColumns();
+            Map<String, OrcColumn> fieldsMap = new HashMap<>();
+            fileColumns.forEach(column -> 
fieldsMap.put(column.getColumnName(), column));
             TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder 
predicateBuilder =
                     TupleDomainOrcPredicate.builder();
             List<OrcPageSource.ColumnAdaptation> columnAdaptations = new 
ArrayList<>();
-            List<OrcColumn> fileReadColumns = new ArrayList<>(columns.length);
-            List<Type> fileReadTypes = new ArrayList<>(columns.length);
+            List<OrcColumn> fileReadColumns = new ArrayList<>(columns.size());
+            List<Type> fileReadTypes = new ArrayList<>(columns.size());
 
-            for (int i = 0; i < columns.length; i++) {
-                if (columns[i] >= 0) {
+            for (int i = 0; i < columns.size(); i++) {
+                if (columns.get(i) != null) {
                     // column exists
                     columnAdaptations.add(
                             
OrcPageSource.ColumnAdaptation.sourceColumn(fileReadColumns.size()));
-                    fileReadColumns.add(fileColumns.get(columns[i]));
+                    OrcColumn orcColumn = fieldsMap.get(columns.get(i));
+                    if (orcColumn == null) {
+                        throw new RuntimeException(
+                                "Column " + columns.get(i) + " does not exist 
in orc file.");
+                    }
+                    fileReadColumns.add(orcColumn);
                     fileReadTypes.add(types.get(i));
                     if (domains.get(i) != null) {
-                        predicateBuilder.addColumn(
-                                fileColumns.get(columns[i]).getColumnId(), 
domains.get(i));
+                        predicateBuilder.addColumn(orcColumn.getColumnId(), 
domains.get(i));
                     }
                 } else {
                     
columnAdaptations.add(OrcPageSource.ColumnAdaptation.nullColumn(types.get(i)));
diff --git 
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
new file mode 100644
index 0000000..2b03285
--- /dev/null
+++ 
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.deletionvectors.DeletionVector;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.metrics.Metrics;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+/** Wrap {@link ConnectorPageSource} using deletion vector. */
+public class TrinoPageSourceWrapper implements ConnectorPageSource {
+
+    private final ConnectorPageSource source;
+
+    private final Optional<DeletionVector> deletionVector;
+
+    public TrinoPageSourceWrapper(
+            ConnectorPageSource source, Optional<DeletionVector> 
deletionVector) {
+        this.source = source;
+        this.deletionVector = deletionVector;
+    }
+
+    @Override
+    public long getCompletedBytes() {
+        return source.getCompletedBytes();
+    }
+
+    @Override
+    public OptionalLong getCompletedPositions() {
+        return source.getCompletedPositions();
+    }
+
+    @Override
+    public long getReadTimeNanos() {
+        return source.getReadTimeNanos();
+    }
+
+    @Override
+    public boolean isFinished() {
+        return source.isFinished();
+    }
+
+    @Override
+    public Page getNextPage() {
+        int startPosition = (int) source.getCompletedPositions().orElseThrow();
+        Page next = source.getNextPage();
+        if (next == null) {
+            return next;
+        }
+
+        int pageCount = next.getPositionCount();
+
+        return deletionVector
+                .map(
+                        deletionVector ->
+                                convertToRetained(next, deletionVector, 
startPosition, pageCount))
+                .orElse(next);
+    }
+
+    @VisibleForTesting
+    Page convertToRetained(
+            Page page, DeletionVector deletionVector, int startPosition, int 
pageCount) {
+        int[] retained = new int[pageCount];
+        int retainedLength = 0;
+        for (int pagePosition = 0; pagePosition < pageCount; pagePosition++) {
+            if (!deletionVector.isDeleted(startPosition + pagePosition)) {
+                retained[retainedLength++] = pagePosition;
+            }
+        }
+        if (retainedLength == pageCount) {
+            return page;
+        }
+
+        return page.getPositions(retained, 0, retainedLength);
+    }
+
+    @Override
+    public long getMemoryUsage() {
+        return source.getMemoryUsage();
+    }
+
+    @Override
+    public void close() throws IOException {
+        source.close();
+    }
+
+    @Override
+    public CompletableFuture<?> isBlocked() {
+        return source.isBlocked();
+    }
+
+    @Override
+    public Metrics getMetrics() {
+        return source.getMetrics();
+    }
+
+    public static ConnectorPageSource wrap(
+            ConnectorPageSource connectorPageSource, Optional<DeletionVector> 
deletionVector) {
+        return new TrinoPageSourceWrapper(connectorPageSource, deletionVector);
+    }
+}
diff --git 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 14cdeea..78fb313 100644
--- 
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++ 
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -18,12 +18,14 @@
 
 package org.apache.paimon.trino;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
@@ -411,6 +413,40 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
             commit.commit(1, writer.prepareCommit(true, 1));
         }
 
+        {
+            Path tablePath6 = new Path(warehouse, "default.db/t101");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "a", DataTypes.STRING()),
+                                    new DataField(1, "b", DataTypes.INT()),
+                                    new DataField(2, "c", DataTypes.INT())));
+            new SchemaManager(LocalFileIO.create(), tablePath6)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    Collections.emptyList(),
+                                    List.of("a"),
+                                    new HashMap<>() {
+                                        {
+                                            put(CoreOptions.BUCKET.key(), "1");
+                                            
put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+                                        }
+                                    },
+                                    ""));
+            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath6);
+            InnerTableWrite writer = table.newWrite("user");
+            writer.withIOManager(new IOManagerImpl("/tmp"));
+            InnerTableCommit commit = table.newCommit("user");
+            for (int i = 0; i < 10; i++) {
+                writer.write(GenericRow.of(BinaryString.fromString("a" + i), 
i, i));
+            }
+            commit.commit(0, writer.prepareCommit(true, 0));
+
+            writer.write(GenericRow.ofKind(RowKind.DELETE, 
BinaryString.fromString("a0"), 0, 0));
+            commit.commit(1, writer.prepareCommit(true, 1));
+        }
+
         DistributedQueryRunner queryRunner = null;
         try {
             queryRunner =
@@ -543,7 +579,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + "changelog_producer = 'input'"
                         + ")");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [orders], [t1], [t100], [t2], [t3], 
[t4], [t99]]");
+                .isEqualTo("[[empty_t], [orders], [t1], [t100], [t101], [t2], 
[t3], [t4], [t99]]");
         sql("DROP TABLE IF EXISTS paimon.default.orders");
     }
 
@@ -566,7 +602,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + ")");
         sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [t1], [t100], [t2], [t3], [t4], [t6], 
[t99]]");
+                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3], 
[t4], [t6], [t99]]");
         sql("DROP TABLE IF EXISTS paimon.default.t6");
     }
 
@@ -589,7 +625,7 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                         + ")");
         sql("DROP TABLE IF EXISTS paimon.default.t5");
         assertThat(sql("SHOW TABLES FROM paimon.default"))
-                .isEqualTo("[[empty_t], [t1], [t100], [t2], [t3], [t4], 
[t99]]");
+                .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3], 
[t4], [t99]]");
     }
 
     @Test
@@ -732,6 +768,13 @@ public abstract class TestTrinoITCase extends 
AbstractTestQueryFramework {
                                 + "[true, 1, 1, 1, 1, 1.0, 1.0, char1, 
varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 
2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]]]");
     }
 
+    @Test
+    public void testDeletionFile() {
+        assertThat(sql("SELECT * FROM paimon.default.t101"))
+                .isEqualTo(
+                        "[[a1, 1, 1], [a2, 2, 2], [a3, 3, 3], [a4, 4, 4], [a5, 
5, 5], [a6, 6, 6], [a7, 7, 7], [a8, 8, 8], [a9, 9, 9]]");
+    }
+
     protected String sql(String sql) {
         MaterializedResult result = getQueryRunner().execute(sql);
         return result.getMaterializedRows().toString();

Reply via email to