This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit de32dae41a165984d6fe121809e991ab75881ae5 Author: Jingsong Lee <[email protected]> AuthorDate: Thu May 16 17:03:12 2024 +0800 [core] Incremental-between tags should deduplicate records (#3338) --- .../org/apache/paimon/mergetree/MergeSorter.java | 4 + .../paimon/operation/MergeFileSplitRead.java | 109 ++++++++------------- .../apache/paimon/operation/RawFileSplitRead.java | 12 +++ .../org/apache/paimon/operation/SplitRead.java | 46 ++++++++- .../paimon/table/source/KeyValueTableRead.java | 79 +++++++-------- .../IncrementalChangelogReadProvider.java | 96 ++++++++++++++++++ .../splitread/IncrementalDiffReadProvider.java | 65 ++++++++++++ .../splitread/IncrementalDiffSplitRead.java} | 95 ++++++++++++++++-- .../splitread/MergeFileSplitReadProvider.java | 68 +++++++++++++ .../source/splitread/RawFileSplitReadProvider.java | 60 ++++++++++++ .../source/splitread/SplitReadProvider.java} | 21 ++-- .../paimon/operation/MergeFileSplitReadTest.java | 2 +- .../apache/paimon/table/IncrementalTableTest.java | 18 +++- 13 files changed, 541 insertions(+), 134 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index 0f54b40b6..420613899 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -95,6 +95,10 @@ public class MergeSorter { return memoryPool; } + public RowType valueType() { + return valueType; + } + public void setIOManager(IOManager ioManager) { this.ioManager = ioManager; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index d457c9093..8002b62f0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -116,7 +116,16 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { return this; } - public MergeFileSplitRead withValueProjection(@Nullable int[][] projectedFields) { + public Comparator<InternalRow> keyComparator() { + return keyComparator; + } + + public MergeSorter mergeSorter() { + return mergeSorter; + } + + @Override + public MergeFileSplitRead withProjection(@Nullable int[][] projectedFields) { if (projectedFields == null) { return this; } @@ -160,11 +169,13 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { return this; } + @Override public MergeFileSplitRead withIOManager(IOManager ioManager) { this.mergeSorter.setIOManager(ioManager); return this; } + @Override public MergeFileSplitRead forceKeepDelete() { this.forceKeepDelete = true; return this; @@ -211,75 +222,28 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { @Override public RecordReader<KeyValue> createReader(DataSplit split) throws IOException { - RecordReader<KeyValue> reader = createReaderWithoutOuterProjection(split); - if (outerProjection != null) { - ProjectedRow projectedRow = ProjectedRow.from(outerProjection); - reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + if (!split.beforeFiles().isEmpty()) { + throw new IllegalArgumentException("This read cannot accept split with before files."); } - return reader; - } - private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit split) - throws IOException { - if (split.beforeFiles().isEmpty()) { - if (split.isStreaming() || split.convertToRawFiles().isPresent()) { - return noMergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - split.isStreaming()); - } else { - return projectKey( - mergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - null, - forceKeepDelete)); - } - } else if (split.isStreaming()) { - // streaming concat read - return ConcatRecordReader.create( - () -> - new ReverseReader( - noMergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - split.beforeDeletionFiles().orElse(null), - true)), - () -> - noMergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - true)); + if (split.isStreaming()) { + return createNoMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + split.isStreaming()); } else { - // batch diff read - return projectKey( - DiffReader.readDiff( - mergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - split.beforeDeletionFiles().orElse(null), - false), - mergeRead( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - false), - keyComparator, - createUdsComparator(), - mergeSorter, - forceKeepDelete)); + return createMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + forceKeepDelete); } } - private RecordReader<KeyValue> mergeRead( + public RecordReader<KeyValue> createMergeReader( BinaryRow partition, int bucket, List<DataFileMeta> files, @@ -316,10 +280,10 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { reader = new DropDeleteReader(reader); } - return reader; + return projectOuter(projectKey(reader)); } - private RecordReader<KeyValue> noMergeRead( + public RecordReader<KeyValue> createNoMergeReader( BinaryRow partition, int bucket, List<DataFileMeta> files, @@ -344,7 +308,8 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { file.schemaId(), fileName, file.fileSize(), file.level()); }); } - return ConcatRecordReader.create(suppliers); + + return projectOuter(ConcatRecordReader.create(suppliers)); } private Optional<String> changelogFile(DataFileMeta fileMeta) { @@ -365,8 +330,16 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> { return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key()))); } + private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader) { + if (outerProjection != null) { + ProjectedRow projectedRow = ProjectedRow.from(outerProjection); + reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + } + return reader; + } + @Nullable - private UserDefinedSeqComparator createUdsComparator() { + public UserDefinedSeqComparator createUdsComparator() { return UserDefinedSeqComparator.create( readerFactoryBuilder.projectedValueType(), sequenceFields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index fb68823ba..083db39cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatKey; import org.apache.paimon.format.FormatReaderContext; @@ -98,6 +99,17 @@ public class RawFileSplitRead implements SplitRead<InternalRow> { this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes(); } + @Override + public SplitRead<InternalRow> forceKeepDelete() { + return this; + } + + @Override + public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager) { + return this; + } + + @Override public RawFileSplitRead withProjection(int[][] projectedFields) { if (projectedFields != null) { projection = projectedFields; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java index a5b05e8f4..c17c0f6b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java @@ -18,9 +18,13 @@ package org.apache.paimon.operation; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.IOFunction; + +import javax.annotation.Nullable; import java.io.IOException; @@ -31,8 +35,48 @@ import java.io.IOException; */ public interface SplitRead<T> { - SplitRead<T> withFilter(Predicate predicate); + SplitRead<T> forceKeepDelete(); + + SplitRead<T> withIOManager(@Nullable IOManager ioManager); + + SplitRead<T> withProjection(@Nullable int[][] projectedFields); + + SplitRead<T> withFilter(@Nullable Predicate predicate); /** Create a {@link RecordReader} from split. */ RecordReader<T> createReader(DataSplit split) throws IOException; + + static <L, R> SplitRead<R> convert( + SplitRead<L> read, IOFunction<DataSplit, RecordReader<R>> convertedFactory) { + return new SplitRead<R>() { + @Override + public SplitRead<R> forceKeepDelete() { + read.forceKeepDelete(); + return this; + } + + @Override + public SplitRead<R> withIOManager(@Nullable IOManager ioManager) { + read.withIOManager(ioManager); + return this; + } + + @Override + public SplitRead<R> withProjection(@Nullable int[][] projectedFields) { + read.withProjection(projectedFields); + return this; + } + + @Override + public SplitRead<R> withFilter(@Nullable Predicate predicate) { + read.withFilter(predicate); + return this; + } + + @Override + public RecordReader<R> createReader(DataSplit split) throws IOException { + return convertedFactory.apply(split); + } + }; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index f3f66c4aa..c674e4792 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -23,14 +23,22 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.utils.LazyField; +import org.apache.paimon.table.source.splitread.IncrementalChangelogReadProvider; +import org.apache.paimon.table.source.splitread.IncrementalDiffReadProvider; +import org.apache.paimon.table.source.splitread.MergeFileSplitReadProvider; +import org.apache.paimon.table.source.splitread.RawFileSplitReadProvider; +import org.apache.paimon.table.source.splitread.SplitReadProvider; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.function.Supplier; /** @@ -38,8 +46,7 @@ import java.util.function.Supplier; */ public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> { - private final LazyField<MergeFileSplitRead> mergeRead; - private final LazyField<RawFileSplitRead> batchRawRead; + private final List<SplitReadProvider> readProviders; private int[][] projection = null; private boolean forceKeepDelete = false; @@ -51,65 +58,54 @@ public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> { Supplier<RawFileSplitRead> batchRawReadSupplier, TableSchema schema) { super(schema); - this.mergeRead = new LazyField<>(() -> createMergeRead(mergeReadSupplier)); - this.batchRawRead = new LazyField<>(() -> createBatchRawRead(batchRawReadSupplier)); + this.readProviders = + Arrays.asList( + new RawFileSplitReadProvider(batchRawReadSupplier, this::assignValues), + new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues), + new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues), + new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues)); } - private MergeFileSplitRead createMergeRead(Supplier<MergeFileSplitRead> readSupplier) { - MergeFileSplitRead read = - readSupplier - .get() - .withKeyProjection(new int[0][]) - .withValueProjection(projection) - .withFilter(predicate) - .withIOManager(ioManager); - if (forceKeepDelete) { - read = read.forceKeepDelete(); + private List<SplitRead<InternalRow>> initialized() { + List<SplitRead<InternalRow>> readers = new ArrayList<>(); + for (SplitReadProvider readProvider : readProviders) { + if (readProvider.initialized()) { + readers.add(readProvider.getOrCreate()); + } } - return read; + return readers; } - private RawFileSplitRead createBatchRawRead(Supplier<RawFileSplitRead> readSupplier) { - return readSupplier.get().withProjection(projection).withFilter(predicate); + private void assignValues(SplitRead<InternalRow> read) { + if (forceKeepDelete) { + read = read.forceKeepDelete(); + } + read.withProjection(projection).withFilter(predicate).withIOManager(ioManager); } @Override public void projection(int[][] projection) { - if (mergeRead.initialized()) { - mergeRead.get().withValueProjection(projection); - } - if (batchRawRead.initialized()) { - batchRawRead.get().withProjection(projection); - } + initialized().forEach(r -> r.withProjection(projection)); this.projection = projection; } @Override public InnerTableRead forceKeepDelete() { - if (mergeRead.initialized()) { - mergeRead.get().forceKeepDelete(); - } + initialized().forEach(SplitRead::forceKeepDelete); this.forceKeepDelete = true; return this; } @Override protected InnerTableRead innerWithFilter(Predicate predicate) { - if (mergeRead.initialized()) { - mergeRead.get().withFilter(predicate); - } - if (batchRawRead.initialized()) { - batchRawRead.get().withFilter(predicate); - } + initialized().forEach(r -> r.withFilter(predicate)); this.predicate = predicate; return this; } @Override public TableRead withIOManager(IOManager ioManager) { - if (mergeRead.initialized()) { - mergeRead.get().withIOManager(ioManager); - } + initialized().forEach(r -> r.withIOManager(ioManager)); this.ioManager = ioManager; return this; } @@ -117,11 +113,16 @@ public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> { @Override public RecordReader<InternalRow> reader(Split split) throws IOException { DataSplit dataSplit = (DataSplit) split; - if (!forceKeepDelete && !dataSplit.isStreaming() && split.convertToRawFiles().isPresent()) { - return batchRawRead.get().createReader(dataSplit); + for (SplitReadProvider readProvider : readProviders) { + if (readProvider.match(dataSplit, forceKeepDelete)) { + return readProvider.getOrCreate().createReader(dataSplit); + } } - RecordReader<KeyValue> reader = mergeRead.get().createReader(dataSplit); + throw new RuntimeException("Should not happen."); + } + + public static RecordReader<InternalRow> unwrap(RecordReader<KeyValue> reader) { return new RecordReader<InternalRow>() { @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java new file mode 100644 index 000000000..bec95979d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.ReverseReader; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.IOFunction; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; + +/** A {@link SplitReadProvider} to incremental changelog read. */ +public class IncrementalChangelogReadProvider implements SplitReadProvider { + + private final LazyField<SplitRead<InternalRow>> splitRead; + + public IncrementalChangelogReadProvider( + Supplier<MergeFileSplitRead> supplier, + Consumer<SplitRead<InternalRow>> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead<InternalRow> read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) { + final MergeFileSplitRead read = supplier.get().withKeyProjection(new int[0][]); + IOFunction<DataSplit, RecordReader<InternalRow>> convertedFactory = + split -> { + RecordReader<KeyValue> reader = + ConcatRecordReader.create( + () -> + new ReverseReader( + read.createNoMergeReader( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles() + .orElse(null), + true)), + () -> + read.createNoMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + true)); + return unwrap(reader); + }; + + return SplitRead.convert(read, convertedFactory); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !split.beforeFiles().isEmpty() && split.isStreaming(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead<InternalRow> getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java new file mode 100644 index 000000000..a335a7c03 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** A {@link SplitReadProvider} to batch incremental diff read. */ +public class IncrementalDiffReadProvider implements SplitReadProvider { + + private final LazyField<SplitRead<InternalRow>> splitRead; + + public IncrementalDiffReadProvider( + Supplier<MergeFileSplitRead> supplier, + Consumer<SplitRead<InternalRow>> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead<InternalRow> read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) { + return new IncrementalDiffSplitRead(supplier.get()); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !split.beforeFiles().isEmpty() && !split.isStreaming(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead<InternalRow> getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java similarity index 55% rename from paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java rename to paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index bc5153600..0519da9fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DiffReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -16,15 +16,24 @@ * limitations under the License. */ -package org.apache.paimon.operation; +package org.apache.paimon.table.source.splitread; import org.apache.paimon.KeyValue; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.FieldsComparator; +import org.apache.paimon.utils.ProjectedRow; import javax.annotation.Nullable; @@ -34,13 +43,73 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; -/** A {@link RecordReader} util to read diff between before reader and after reader. */ -public class DiffReader { +/** A {@link SplitRead} for batch incremental diff. */ +public class IncrementalDiffSplitRead implements SplitRead<InternalRow> { private static final int BEFORE_LEVEL = Integer.MIN_VALUE; private static final int AFTER_LEVEL = Integer.MAX_VALUE; - public static RecordReader<KeyValue> readDiff( + private final MergeFileSplitRead mergeRead; + + private boolean forceKeepDelete = false; + @Nullable private int[][] projectedFields; + + public IncrementalDiffSplitRead(MergeFileSplitRead mergeRead) { + this.mergeRead = mergeRead; + } + + @Override + public SplitRead<InternalRow> forceKeepDelete() { + this.forceKeepDelete = true; + return this; + } + + @Override + public SplitRead<InternalRow> withIOManager(@Nullable IOManager ioManager) { + mergeRead.withIOManager(ioManager); + return this; + } + + @Override + public SplitRead<InternalRow> withProjection(@Nullable int[][] projectedFields) { + this.projectedFields = projectedFields; + return this; + } + + @Override + public SplitRead<InternalRow> withFilter(@Nullable Predicate predicate) { + mergeRead.withFilter(predicate); + return this; + } + + @Override + public RecordReader<InternalRow> createReader(DataSplit split) throws IOException { + RecordReader<KeyValue> reader = + readDiff( + mergeRead.createMergeReader( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles().orElse(null), + false), + mergeRead.createMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + false), + mergeRead.keyComparator(), + mergeRead.createUdsComparator(), + mergeRead.mergeSorter(), + forceKeepDelete); + if (projectedFields != null) { + ProjectedRow projectedRow = ProjectedRow.from(projectedFields); + reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value()))); + } + return KeyValueTableRead.unwrap(reader); + } + + private static RecordReader<KeyValue> readDiff( RecordReader<KeyValue> beforeReader, RecordReader<KeyValue> afterReader, Comparator<InternalRow> keyComparator, @@ -54,7 +123,7 @@ public class DiffReader { () -> wrapLevelToReader(afterReader, AFTER_LEVEL)), keyComparator, userDefinedSeqComparator, - new DiffMerger(keepDelete)); + new DiffMerger(keepDelete, InternalSerializers.create(sorter.valueType()))); } private static RecordReader<KeyValue> wrapLevelToReader( @@ -96,11 +165,15 @@ public class DiffReader { private static class DiffMerger implements MergeFunctionWrapper<KeyValue> { private final boolean keepDelete; + private final InternalRowSerializer serializer1; + private final InternalRowSerializer serializer2; private final List<KeyValue> kvs = new ArrayList<>(); - public DiffMerger(boolean keepDelete) { + public DiffMerger(boolean keepDelete, InternalRowSerializer serializer) { this.keepDelete = keepDelete; + this.serializer1 = serializer; + this.serializer2 = serializer.duplicate(); } @Override @@ -128,7 +201,9 @@ public class DiffReader { } else if (kvs.size() == 2) { KeyValue latest = kvs.get(1); if (latest.level() == AFTER_LEVEL) { - return latest; + if (!valueEquals()) { + return latest; + } } } else { throw new IllegalArgumentException("Illegal kv number: " + kvs.size()); @@ -136,5 +211,11 @@ public class DiffReader { return null; } + + private boolean valueEquals() { + return serializer1 + .toBinaryRow(kvs.get(0).value()) + .equals(serializer2.toBinaryRow(kvs.get(1).value())); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java new file mode 100644 index 000000000..abed0f33c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.MergeFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; + +/** A {@link SplitReadProvider} to merge files. */ +public class MergeFileSplitReadProvider implements SplitReadProvider { + + private final LazyField<SplitRead<InternalRow>> splitRead; + + public MergeFileSplitReadProvider( + Supplier<MergeFileSplitRead> supplier, + Consumer<SplitRead<InternalRow>> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + SplitRead<InternalRow> read = create(supplier); + valuesAssigner.accept(read); + return read; + }); + } + + private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) { + final MergeFileSplitRead read = supplier.get().withKeyProjection(new int[0][]); + return SplitRead.convert(read, split -> unwrap(read.createReader(split))); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return split.beforeFiles().isEmpty(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead<InternalRow> getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java new file mode 100644 index 000000000..9959ee555 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */ +public class RawFileSplitReadProvider implements SplitReadProvider { + + private final LazyField<RawFileSplitRead> splitRead; + + public RawFileSplitReadProvider( + Supplier<RawFileSplitRead> supplier, Consumer<SplitRead<InternalRow>> valuesAssigner) { + this.splitRead = + new LazyField<>( + () -> { + RawFileSplitRead read = supplier.get(); + valuesAssigner.accept(read); + return read; + }); + } + + @Override + public boolean match(DataSplit split, boolean forceKeepDelete) { + return !forceKeepDelete && !split.isStreaming() && split.rawConvertible(); + } + + @Override + public boolean initialized() { + return splitRead.initialized(); + } + + @Override + public SplitRead<InternalRow> getOrCreate() { + return splitRead.get(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java similarity index 64% copy from paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java copy to paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java index a5b05e8f4..2aaefb322 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java @@ -16,23 +16,18 @@ * limitations under the License. */ -package org.apache.paimon.operation; +package org.apache.paimon.table.source.splitread; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.operation.SplitRead; import org.apache.paimon.table.source.DataSplit; -import java.io.IOException; +/** Provider to create {@link SplitRead}. */ +public interface SplitReadProvider { -/** - * Read operation which provides {@link RecordReader} creation. - * - * @param <T> type of record to read. - */ -public interface SplitRead<T> { + boolean match(DataSplit split, boolean forceKeepDelete); - SplitRead<T> withFilter(Predicate predicate); + boolean initialized(); - /** Create a {@link RecordReader} from split. */ - RecordReader<T> createReader(DataSplit split) throws IOException; + SplitRead<InternalRow> getOrCreate(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 5652fcd43..1794e8aea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -231,7 +231,7 @@ public class MergeFileSplitReadTest { read.withKeyProjection(keyProjection); } if (valueProjection != null) { - read.withValueProjection(valueProjection); + read.withProjection(valueProjection); } List<KeyValue> result = new ArrayList<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index f4dac9d11..b4b905d36 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -225,6 +225,8 @@ public class IncrementalTableTest extends TableTestBase { GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1), + GenericRow.of(1, 4, 1), + GenericRow.of(1, 5, 1), GenericRow.of(2, 1, 1)); // snapshot 2: append @@ -235,7 +237,7 @@ public class IncrementalTableTest extends TableTestBase { // UPDATE GenericRow.of(1, 2, 2), // NEW - GenericRow.of(1, 4, 1)); + GenericRow.of(1, 6, 1)); // snapshot 3: compact compact(table, row(1), 0); @@ -247,24 +249,30 @@ public class IncrementalTableTest extends TableTestBase { // read tag1 tag2 List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")); assertThat(result) - .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 6, 1)); result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")); assertThat(result) .containsExactlyInAnyOrder( GenericRow.of(fromString("-D"), 1, 1, 1), GenericRow.of(fromString("+I"), 1, 2, 2), - GenericRow.of(fromString("+I"), 1, 4, 1)); + GenericRow.of(fromString("+I"), 1, 6, 1)); // read tag1 tag3 result = read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result) - .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + .containsExactlyInAnyOrder(GenericRow.of(1, 2, 2), GenericRow.of(1, 6, 1)); + + // read tag1 tag3 auditLog result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); assertThat(result) .containsExactlyInAnyOrder( GenericRow.of(fromString("-D"), 1, 1, 1), GenericRow.of(fromString("+I"), 1, 2, 2), - GenericRow.of(fromString("+I"), 1, 4, 1)); + GenericRow.of(fromString("+I"), 1, 6, 1)); + + // read tag1 tag3 projection + result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3")); + assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6)); assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1"))) .hasMessageContaining(
