This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new 04058be Support deletion file for primary key table (#59)
04058be is described below
commit 04058be38fbbfdfbe22c6589ca30ae02032a41e4
Author: YeJunHao <[email protected]>
AuthorDate: Mon Mar 18 20:17:31 2024 +0800
Support deletion file for primary key table (#59)
---
.../paimon/trino/TrinoPageSourceProvider.java | 141 ++++++++++++---------
.../paimon/trino/TrinoPageSourceWrapper.java | 126 ++++++++++++++++++
.../org/apache/paimon/trino/TestTrinoITCase.java | 49 ++++++-
3 files changed, 255 insertions(+), 61 deletions(-)
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
index 15fad2b..5510128 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java
@@ -19,9 +19,11 @@
package org.apache.paimon.trino;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -57,6 +59,7 @@ import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.joda.time.DateTimeZone;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -64,13 +67,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.stream.Collector;
import java.util.stream.Collectors;
import static
io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static java.util.Objects.requireNonNull;
-import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexMapping;
import static
org.apache.paimon.trino.ClassLoaderUtils.runWithContextClassLoader;
/** Trino {@link ConnectorPageSourceProvider}. */
@@ -126,60 +127,77 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
.map(TrinoColumnHandle.class::cast)
.map(TrinoColumnHandle::getColumnName)
.toList();
- int[] columnIndex =
- // the column index, very important
-
projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
-
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
Split paimonSplit = split.decodeSplit();
Optional<List<RawFile>> optionalRawFiles =
paimonSplit.convertToRawFiles();
if (checkRawFile(optionalRawFiles)) {
+ Optional<List<DeletionFile>> deletionFiles =
paimonSplit.deletionFiles();
+
FileStoreTable fileStoreTable = (FileStoreTable) table;
SchemaManager schemaManager =
new SchemaManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+
List<Type> type =
columns.stream()
.map(s -> ((TrinoColumnHandle)
s).getTrinoType())
.collect(Collectors.toList());
+
try {
- return new DirectTrinoPageSource(
- optionalRawFiles.orElseThrow().stream()
- .map(
- rawFile ->
- createDataPageSource(
- rawFile.format(),
-
fileSystem.newInputFile(
-
Location.of(rawFile.path())),
-
fileStoreTable.coreOptions(),
- // map table
column index to data column
- // index, if
column does not exist in
- // data columns,
set it to -1
- // columns those
set to -1 will generate
- // a null vector
in orc page
- mapping(
-
columnIndex,
-
rowType.getFields(),
-
schemaManager
-
.schema(
-
rawFile
-
.schemaId())
-
.fields()),
- type,
-
orderDomains(projectedFields, filter)))
- .collect(
- Collector.of(
- LinkedList::new,
- List::add,
- (left, right) -> {
- left.addAll(right);
- return left;
- })));
+ List<RawFile> files = optionalRawFiles.orElseThrow();
+ LinkedList<ConnectorPageSource> sources = new
LinkedList<>();
+
+ for (int i = 0; i < files.size(); i++) {
+ RawFile rawFile = files.get(i);
+ ConnectorPageSource source =
+ createDataPageSource(
+ rawFile.format(),
+
fileSystem.newInputFile(Location.of(rawFile.path())),
+ fileStoreTable.coreOptions(),
+ // map table column name to data column
+ // name, if column does not exist in
+ // data columns, set it to null
+ // columns those set to null will
generate
+ // a null vector in orc page
+ fileStoreTable.schema().id() ==
rawFile.schemaId()
+ ? projectedFields
+ : schemaEvolutionFieldNames(
+ projectedFields,
+ rowType.getFields(),
+ schemaManager
+
.schema(rawFile.schemaId())
+ .fields()),
+ type,
+ orderDomains(projectedFields, filter));
+
+ if (deletionFiles.isPresent()) {
+ source =
+ TrinoPageSourceWrapper.wrap(
+ source,
+
Optional.ofNullable(deletionFiles.get().get(i))
+ .map(
+ deletionFile -> {
+ try {
+ return
DeletionVector.read(
+
fileStoreTable.fileIO(),
+
deletionFile);
+ } catch
(IOException e) {
+ throw new
RuntimeException(e);
+ }
+ }));
+ }
+ sources.add(source);
+ }
+
+ return new DirectTrinoPageSource(sources);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
+ int[] columnIndex =
+
projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
+
// old read way
ReadBuilder read = table.newReadBuilder();
new
TrinoFilterConverter(rowType).convert(filter).ifPresent(read::withFilter);
@@ -226,19 +244,20 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
return true;
}
- // map the table schema columnsIndex to data schema columnsIndex
- private int[] mapping(
- int[] tableSchemaColumnIndex, List<DataField> tableFields,
List<DataField> dataFields) {
+ // map the table schema column names to data schema column names
+ private List<String> schemaEvolutionFieldNames(
+ List<String> fieldNames, List<DataField> tableFields,
List<DataField> dataFields) {
- int[] mapping = createIndexMapping(tableFields, dataFields);
- if (mapping == null) {
- return tableSchemaColumnIndex;
- }
- int[] result = new int[tableSchemaColumnIndex.length];
+ Map<String, Integer> fieldNameToId = new HashMap<>();
+ Map<Integer, String> idToFieldName = new HashMap<>();
+ List<String> result = new ArrayList<>();
+
+ tableFields.forEach(field -> fieldNameToId.put(field.name(),
field.id()));
+ dataFields.forEach(field -> idToFieldName.put(field.id(),
field.name()));
- for (int i = 0; i < tableSchemaColumnIndex.length; i++) {
- int po = tableSchemaColumnIndex[i];
- result[i] = mapping[po];
+ for (String fieldName : fieldNames) {
+ Integer id = fieldNameToId.get(fieldName);
+ result.add(idToFieldName.getOrDefault(id, null));
}
return result;
}
@@ -248,7 +267,7 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
TrinoInputFile inputFile,
// TODO construct read option by core-options
CoreOptions coreOptions,
- int[] columns,
+ List<String> columns,
List<Type> types,
List<Domain> domains) {
switch (format) {
@@ -287,7 +306,7 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
private ConnectorPageSource createOrcDataPageSource(
TrinoInputFile inputFile,
OrcReaderOptions options,
- int[] columns,
+ List<String> columns,
List<Type> types,
List<Domain> domains) {
try {
@@ -297,22 +316,28 @@ public class TrinoPageSourceProvider implements
ConnectorPageSourceProvider {
.orElseThrow(() -> new RuntimeException("ORC file
is zero length"));
List<OrcColumn> fileColumns =
reader.getRootColumn().getNestedColumns();
+ Map<String, OrcColumn> fieldsMap = new HashMap<>();
+ fileColumns.forEach(column ->
fieldsMap.put(column.getColumnName(), column));
TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder
predicateBuilder =
TupleDomainOrcPredicate.builder();
List<OrcPageSource.ColumnAdaptation> columnAdaptations = new
ArrayList<>();
- List<OrcColumn> fileReadColumns = new ArrayList<>(columns.length);
- List<Type> fileReadTypes = new ArrayList<>(columns.length);
+ List<OrcColumn> fileReadColumns = new ArrayList<>(columns.size());
+ List<Type> fileReadTypes = new ArrayList<>(columns.size());
- for (int i = 0; i < columns.length; i++) {
- if (columns[i] >= 0) {
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i) != null) {
// column exists
columnAdaptations.add(
OrcPageSource.ColumnAdaptation.sourceColumn(fileReadColumns.size()));
- fileReadColumns.add(fileColumns.get(columns[i]));
+ OrcColumn orcColumn = fieldsMap.get(columns.get(i));
+ if (orcColumn == null) {
+ throw new RuntimeException(
+ "Column " + columns.get(i) + " does not exist
in orc file.");
+ }
+ fileReadColumns.add(orcColumn);
fileReadTypes.add(types.get(i));
if (domains.get(i) != null) {
- predicateBuilder.addColumn(
- fileColumns.get(columns[i]).getColumnId(),
domains.get(i));
+ predicateBuilder.addColumn(orcColumn.getColumnId(),
domains.get(i));
}
} else {
columnAdaptations.add(OrcPageSource.ColumnAdaptation.nullColumn(types.get(i)));
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
new file mode 100644
index 0000000..2b03285
--- /dev/null
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoPageSourceWrapper.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.trino;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.deletionvectors.DeletionVector;
+
+import io.trino.spi.Page;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.metrics.Metrics;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+/** Wrap {@link ConnectorPageSource} using deletion vector. */
+public class TrinoPageSourceWrapper implements ConnectorPageSource {
+
+ private final ConnectorPageSource source;
+
+ private final Optional<DeletionVector> deletionVector;
+
+ public TrinoPageSourceWrapper(
+ ConnectorPageSource source, Optional<DeletionVector>
deletionVector) {
+ this.source = source;
+ this.deletionVector = deletionVector;
+ }
+
+ @Override
+ public long getCompletedBytes() {
+ return source.getCompletedBytes();
+ }
+
+ @Override
+ public OptionalLong getCompletedPositions() {
+ return source.getCompletedPositions();
+ }
+
+ @Override
+ public long getReadTimeNanos() {
+ return source.getReadTimeNanos();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return source.isFinished();
+ }
+
+ @Override
+ public Page getNextPage() {
+ int startPosition = (int) source.getCompletedPositions().orElseThrow();
+ Page next = source.getNextPage();
+ if (next == null) {
+ return next;
+ }
+
+ int pageCount = next.getPositionCount();
+
+ return deletionVector
+ .map(
+ deletionVector ->
+ convertToRetained(next, deletionVector,
startPosition, pageCount))
+ .orElse(next);
+ }
+
+ @VisibleForTesting
+ Page convertToRetained(
+ Page page, DeletionVector deletionVector, int startPosition, int
pageCount) {
+ int[] retained = new int[pageCount];
+ int retainedLength = 0;
+ for (int pagePosition = 0; pagePosition < pageCount; pagePosition++) {
+ if (!deletionVector.isDeleted(startPosition + pagePosition)) {
+ retained[retainedLength++] = pagePosition;
+ }
+ }
+ if (retainedLength == pageCount) {
+ return page;
+ }
+
+ return page.getPositions(retained, 0, retainedLength);
+ }
+
+ @Override
+ public long getMemoryUsage() {
+ return source.getMemoryUsage();
+ }
+
+ @Override
+ public void close() throws IOException {
+ source.close();
+ }
+
+ @Override
+ public CompletableFuture<?> isBlocked() {
+ return source.isBlocked();
+ }
+
+ @Override
+ public Metrics getMetrics() {
+ return source.getMetrics();
+ }
+
+ public static ConnectorPageSource wrap(
+ ConnectorPageSource connectorPageSource, Optional<DeletionVector>
deletionVector) {
+ return new TrinoPageSourceWrapper(connectorPageSource, deletionVector);
+ }
+}
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 14cdeea..78fb313 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -18,12 +18,14 @@
package org.apache.paimon.trino;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
@@ -411,6 +413,40 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
commit.commit(1, writer.prepareCommit(true, 1));
}
+ {
+ Path tablePath6 = new Path(warehouse, "default.db/t101");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", DataTypes.STRING()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c", DataTypes.INT())));
+ new SchemaManager(LocalFileIO.create(), tablePath6)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ List.of("a"),
+ new HashMap<>() {
+ {
+ put(CoreOptions.BUCKET.key(), "1");
+
put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+ }
+ },
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath6);
+ InnerTableWrite writer = table.newWrite("user");
+ writer.withIOManager(new IOManagerImpl("/tmp"));
+ InnerTableCommit commit = table.newCommit("user");
+ for (int i = 0; i < 10; i++) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i),
i, i));
+ }
+ commit.commit(0, writer.prepareCommit(true, 0));
+
+ writer.write(GenericRow.ofKind(RowKind.DELETE,
BinaryString.fromString("a0"), 0, 0));
+ commit.commit(1, writer.prepareCommit(true, 1));
+ }
+
DistributedQueryRunner queryRunner = null;
try {
queryRunner =
@@ -543,7 +579,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "changelog_producer = 'input'"
+ ")");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [orders], [t1], [t100], [t2], [t3],
[t4], [t99]]");
+ .isEqualTo("[[empty_t], [orders], [t1], [t100], [t101], [t2],
[t3], [t4], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.orders");
}
@@ -566,7 +602,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ ")");
sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [t1], [t100], [t2], [t3], [t4], [t6],
[t99]]");
+ .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3],
[t4], [t6], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.t6");
}
@@ -589,7 +625,7 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ ")");
sql("DROP TABLE IF EXISTS paimon.default.t5");
assertThat(sql("SHOW TABLES FROM paimon.default"))
- .isEqualTo("[[empty_t], [t1], [t100], [t2], [t3], [t4],
[t99]]");
+ .isEqualTo("[[empty_t], [t1], [t100], [t101], [t2], [t3],
[t4], [t99]]");
}
@Test
@@ -732,6 +768,13 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
+ "[true, 1, 1, 1, 1, 1.0, 1.0, char1,
varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001,
2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]]]");
}
+ @Test
+ public void testDeletionFile() {
+ assertThat(sql("SELECT * FROM paimon.default.t101"))
+ .isEqualTo(
+ "[[a1, 1, 1], [a2, 2, 2], [a3, 3, 3], [a4, 4, 4], [a5,
5, 5], [a6, 6, 6], [a7, 7, 7], [a8, 8, 8], [a9, 9, 9]]");
+ }
+
protected String sql(String sql) {
MaterializedResult result = getQueryRunner().execute(sql);
return result.getMaterializedRows().toString();