This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 2db2b8fa13 [core] Code Refactor for Data Evolution 2db2b8fa13 is described below commit 2db2b8fa136c806ae716a60f723630613093c15e Author: JingsongLi <jingsongl...@gmail.com> AuthorDate: Sat Aug 9 10:56:18 2025 +0800 [core] Code Refactor for Data Evolution --- .../main/java/org/apache/paimon/CoreOptions.java | 2 +- .../java/org/apache/paimon/schema/TableSchema.java | 16 +-- ...ileReader.java => DataEvolutionFileReader.java} | 33 +++--- .../org/apache/paimon/AppendOnlyFileStore.java | 8 +- .../org/apache/paimon/append/AppendOnlyWriter.java | 8 +- .../paimon/operation/BaseAppendFileStoreWrite.java | 13 +- ...eSplitRead.java => DataEvolutionSplitRead.java} | 12 +- .../org/apache/paimon/schema/SchemaValidation.java | 2 +- .../paimon/table/AppendOnlyFileStoreTable.java | 25 +++- .../apache/paimon/table/sink/BatchTableWrite.java | 5 +- .../paimon/table/source/AppendTableRead.java | 36 ++---- .../paimon/table/source/KeyValueTableRead.java | 16 +-- .../AppendTableRawFileSplitReadProvider.java | 7 +- ...er.java => DataEvolutionSplitReadProvider.java} | 54 +++++---- .../IncrementalChangelogReadProvider.java | 15 +-- .../splitread/IncrementalDiffReadProvider.java | 15 +-- .../splitread/MergeFieldSplitReadProvider.java | 78 ------------ .../splitread/MergeFileSplitReadProvider.java | 15 +-- .../PrimaryKeyTableRawFileSplitReadProvider.java | 7 +- .../source/splitread/RawFileSplitReadProvider.java | 16 +-- ...SplitReadProvider.java => SplitReadConfig.java} | 11 +- .../table/source/splitread/SplitReadProvider.java | 5 +- .../apache/paimon/append/AppendOnlyWriterTest.java | 2 +- .../apache/paimon/format/FileFormatSuffixTest.java | 2 +- ...eFieldTest.java => DataEvolutionTableTest.java} | 13 +- .../DataEvolutionSplitReadProviderTest.java | 132 +++++++++++++++++++++ .../AppendPreCommitCompactWorkerOperator.java | 2 +- 27 files changed, 288 insertions(+), 262 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index a464e6dba6..32eff542a4 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2901,7 +2901,7 @@ public class CoreOptions implements Serializable { return options.get(ROW_TRACKING_ENABLED); } - public boolean dataElolutionEnabled() { + public boolean dataEvolutionEnabled() { return options.get(DATA_EVOLUTION_ENABLED); } diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 0680a65675..6e012c016b 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -274,23 +274,11 @@ public class TableSchema implements Serializable { if (writeCols == null || writeCols.isEmpty()) { return this; } - Map<String, DataField> fieldMap = - fields.stream() - .collect(Collectors.toMap(DataField::name, field -> field, (a, b) -> a)); - List<DataField> fields = new ArrayList<>(); - for (String fieldId : writeCols) { - DataField dataField = fieldMap.get(fieldId); - if (dataField == null) { - throw new RuntimeException( - String.format( - "Projecting field %s, but not found in schema %s.", fieldId, this)); - } - fields.add(dataField); - } + return new TableSchema( version, id, - fields, + new RowType(fields).project(writeCols).getFields(), highestFieldId, partitionKeys, primaryKeys, diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java similarity index 89% rename from paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java rename to paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 65748f5161..9c75f0e114 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/CompoundFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -59,13 +59,13 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; * * </pre> */ -public class CompoundFileReader implements RecordReader<InternalRow> { +public class DataEvolutionFileReader implements RecordReader<InternalRow> { private final int[] rowOffsets; private final int[] fieldOffsets; private final RecordReader<InternalRow>[] innerReaders; - public CompoundFileReader( + public DataEvolutionFileReader( int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[] readers) { checkArgument(rowOffsets != null, "Row offsets must not be null"); checkArgument(fieldOffsets != null, "Field offsets must not be null"); @@ -82,17 +82,17 @@ public class CompoundFileReader implements RecordReader<InternalRow> { @Override @Nullable public RecordIterator<InternalRow> readBatch() throws IOException { - CompoundRecordIterator compundFileRecordIterator = - new CompoundRecordIterator(innerReaders.length, rowOffsets, fieldOffsets); + DataEvolutionIterator iterator = + new DataEvolutionIterator(innerReaders.length, rowOffsets, fieldOffsets); for (int i = 0; i < innerReaders.length; i++) { RecordIterator<InternalRow> batch = innerReaders[i].readBatch(); if (batch == null && !(innerReaders[i] instanceof EmptyFileRecordReader)) { return null; } - compundFileRecordIterator.compound(i, batch); + iterator.set(i, batch); } - return compundFileRecordIterator; + return iterator; } @Override @@ -105,20 +105,21 @@ public class CompoundFileReader implements RecordReader<InternalRow> { } /** The batch which is made up by several batches. */ - public static class CompoundRecordIterator implements RecordIterator<InternalRow> { + private static class DataEvolutionIterator implements RecordIterator<InternalRow> { - private final CompundInternalRow compundInternalRow; + private final DataEvolutionRow dataEvolutionRow; private final RecordIterator<InternalRow>[] iterators; - public CompoundRecordIterator( + private DataEvolutionIterator( int rowNumber, int[] rowOffsets, int[] fieldOffsets) { // Initialize with empty arrays, will be set later - this.compundInternalRow = new CompundInternalRow(rowNumber, rowOffsets, fieldOffsets); + this.dataEvolutionRow = new DataEvolutionRow(rowNumber, rowOffsets, fieldOffsets); + //noinspection unchecked this.iterators = new RecordIterator[rowNumber]; } - public void compound(int i, RecordIterator<InternalRow> iterator) { + public void set(int i, RecordIterator<InternalRow> iterator) { iterators[i] = iterator; } @@ -131,10 +132,10 @@ public class CompoundFileReader implements RecordReader<InternalRow> { if (next == null) { return null; } - compundInternalRow.setRow(i, next); + dataEvolutionRow.setRow(i, next); } } - return compundInternalRow; + return dataEvolutionRow; } @Override @@ -148,19 +149,19 @@ public class CompoundFileReader implements RecordReader<InternalRow> { } /** The row which is made up by several rows. */ - public static class CompundInternalRow implements InternalRow { + private static class DataEvolutionRow implements InternalRow { private final InternalRow[] rows; private final int[] rowOffsets; private final int[] fieldOffsets; - public CompundInternalRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { + private DataEvolutionRow(int rowNumber, int[] rowOffsets, int[] fieldOffsets) { this.rows = new InternalRow[rowNumber]; this.rowOffsets = rowOffsets; this.fieldOffsets = fieldOffsets; } - public void setRow(int pos, InternalRow row) { + private void setRow(int pos, InternalRow row) { if (pos >= rows.length) { throw new IndexOutOfBoundsException( "Position " + pos + " is out of bounds for rows size " + rows.length); diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 7f0a6b5f19..be3d1662c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -27,7 +27,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.BaseAppendFileStoreWrite; import org.apache.paimon.operation.BucketSelectConverter; import org.apache.paimon.operation.BucketedAppendFileStoreWrite; -import org.apache.paimon.operation.FieldMergeSplitRead; +import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; @@ -86,12 +86,12 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> { options.rowTrackingEnabled()); } - public FieldMergeSplitRead newFieldMergeRead() { - if (!options.dataElolutionEnabled()) { + public DataEvolutionSplitRead newDataEvolutionRead() { + if (!options.dataEvolutionEnabled()) { throw new IllegalStateException( "Field merge read is only supported when data-evolution.enabled is true."); } - return new FieldMergeSplitRead( + return new DataEvolutionSplitRead( fileIO, schemaManager, schema, diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 646ef382a4..f8a074cab8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -69,8 +69,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final long schemaId; private final FileFormat fileFormat; private final long targetFileSize; - private final RowType rowType; private final RowType writeSchema; + @Nullable private final List<String> writeCols; private final DataFilePathFactory pathFactory; private final CompactManager compactManager; private final IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> dataFileRead; @@ -99,8 +99,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { long schemaId, FileFormat fileFormat, long targetFileSize, - RowType rowType, RowType writeSchema, + @Nullable List<String> writeCols, long maxSequenceNumber, CompactManager compactManager, IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> dataFileRead, @@ -120,8 +120,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { this.schemaId = schemaId; this.fileFormat = fileFormat; this.targetFileSize = targetFileSize; - this.rowType = rowType; this.writeSchema = writeSchema; + this.writeCols = writeCols; this.pathFactory = pathFactory; this.compactManager = compactManager; this.dataFileRead = dataFileRead; @@ -304,7 +304,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { FileSource.APPEND, asyncFileWrite, statsDenseStore, - writeSchema.equals(rowType) ? null : writeSchema.getFieldNames()); + writeCols); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index eb2f1f669c..dca05a6522 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -75,6 +75,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte private final RowType rowType; private RowType writeType; + private @Nullable List<String> writeCols; private boolean forceBufferSpill = false; public BaseAppendFileStoreWrite( @@ -95,6 +96,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte this.schemaId = schemaId; this.rowType = rowType; this.writeType = rowType; + this.writeCols = null; this.fileFormat = fileFormat(options); this.pathFactory = pathFactory; @@ -116,8 +118,8 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte schemaId, fileFormat, options.targetFileSize(false), - rowType, writeType, + writeCols, restoredMaxSeqNumber, getCompactManager(partition, bucket, restoredFiles, compactExecutor, dvMaintainer), // it is only for new files, no dv @@ -139,6 +141,15 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte @Override public void withWriteType(RowType writeType) { this.writeType = writeType; + int fullCount = rowType.getFieldCount(); + List<String> fullNames = rowType.getFieldNames(); + this.writeCols = writeType.getFieldNames(); + // optimize writeCols to null in following cases: + // 1. writeType contains all columns + // 2. writeType contains all columns and append _ROW_ID cols + if (writeCols.size() >= fullCount && writeCols.subList(0, fullCount).equals(fullNames)) { + writeCols = null; + } } private SimpleColStatsCollector.Factory[] statsCollectors() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java similarity index 96% rename from paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java rename to paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 94b0d339ec..af1e5b767f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FieldMergeSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -27,7 +27,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.reader.CompoundFileReader; +import org.apache.paimon.reader.DataEvolutionFileReader; import org.apache.paimon.reader.EmptyFileRecordReader; import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; @@ -56,9 +56,9 @@ import static java.lang.String.format; import static org.apache.paimon.utils.Preconditions.checkArgument; /** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */ -public class FieldMergeSplitRead extends RawFileSplitRead { +public class DataEvolutionSplitRead extends RawFileSplitRead { - public FieldMergeSplitRead( + public DataEvolutionSplitRead( FileIO fileIO, SchemaManager schemaManager, TableSchema schema, @@ -106,7 +106,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead { } else { suppliers.add( () -> - createCompoundFileReader( + createFileReader( needMergeFiles, partition, dataFilePathFactory, @@ -118,7 +118,7 @@ public class FieldMergeSplitRead extends RawFileSplitRead { return ConcatRecordReader.create(suppliers); } - private CompoundFileReader createCompoundFileReader( + private DataEvolutionFileReader createFileReader( List<DataFileMeta> needMergeFiles, BinaryRow partition, DataFilePathFactory dataFilePathFactory, @@ -222,6 +222,6 @@ public class FieldMergeSplitRead extends RawFileSplitRead { allReadFields.get(i))); } } - return new CompoundFileReader(rowOffsets, fieldOffsets, fileRecordReaders); + return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 82e62896df..06d7d4117f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -650,7 +650,7 @@ public class SchemaValidation { } private static void validateDataEvolution(CoreOptions options) { - if (options.dataElolutionEnabled()) { + if (options.dataEvolutionEnabled()) { checkArgument( options.rowTrackingEnabled(), "Data evolution config must enabled with row-tracking.enabled"); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 9f59da690e..367eeb714b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -35,11 +35,18 @@ import org.apache.paimon.table.source.AppendTableRead; import org.apache.paimon.table.source.DataEvolutionSplitGenerator; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.SplitGenerator; +import org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider; +import org.apache.paimon.table.source.splitread.DataEvolutionSplitReadProvider; +import org.apache.paimon.table.source.splitread.SplitReadConfig; +import org.apache.paimon.table.source.splitread.SplitReadProvider; import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; import java.util.function.BiConsumer; +import java.util.function.Function; /** {@link FileStoreTable} for append table. */ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { @@ -82,7 +89,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { protected SplitGenerator splitGenerator() { long targetSplitSize = store().options().splitTargetSize(); long openFileCost = store().options().splitOpenFileCost(); - return coreOptions().dataElolutionEnabled() + return coreOptions().dataEvolutionEnabled() ? new DataEvolutionSplitGenerator(targetSplitSize, openFileCost) : new AppendOnlySplitGenerator(targetSplitSize, openFileCost, bucketMode()); } @@ -99,11 +106,17 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { @Override public InnerTableRead newRead() { - return new AppendTableRead( - () -> store().newRead(), - () -> store().newFieldMergeRead(), - schema(), - coreOptions()); + List<Function<SplitReadConfig, SplitReadProvider>> providerFactories = new ArrayList<>(); + if (coreOptions().dataEvolutionEnabled()) { + // add data evolution first + providerFactories.add( + config -> + new DataEvolutionSplitReadProvider( + () -> store().newDataEvolutionRead(), config)); + } + providerFactories.add( + config -> new AppendTableRawFileSplitReadProvider(() -> store().newRead(), config)); + return new AppendTableRead(providerFactories, schema()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java index d8616d27f5..416bb58a30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableWrite.java @@ -38,6 +38,9 @@ public interface BatchTableWrite extends TableWrite { */ List<CommitMessage> prepareCommit() throws Exception; - /** Specified the write rowType. */ + /** + * Specified the writing rowType, currently only work for table without primary key and row + * tracking enabled. + */ BatchTableWrite withWriteType(RowType writeType); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java index 063f09e841..83e26301ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java @@ -18,17 +18,13 @@ package org.apache.paimon.table.source; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.operation.FieldMergeSplitRead; import org.apache.paimon.operation.MergeFileSplitRead; -import org.apache.paimon.operation.RawFileSplitRead; import org.apache.paimon.operation.SplitRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.source.splitread.AppendTableRawFileSplitReadProvider; -import org.apache.paimon.table.source.splitread.MergeFieldSplitReadProvider; +import org.apache.paimon.table.source.splitread.SplitReadConfig; import org.apache.paimon.table.source.splitread.SplitReadProvider; import org.apache.paimon.types.RowType; @@ -37,7 +33,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; +import java.util.function.Function; +import java.util.stream.Collectors; /** * An abstraction layer above {@link MergeFileSplitRead} to provide reading of {@link InternalRow}. @@ -50,33 +47,26 @@ public final class AppendTableRead extends AbstractDataTableRead { private Predicate predicate = null; public AppendTableRead( - Supplier<RawFileSplitRead> batchRawReadSupplier, - Supplier<FieldMergeSplitRead> fieldMergeSplitReadSupplier, - TableSchema schema, - CoreOptions coreOptions) { + List<Function<SplitReadConfig, SplitReadProvider>> providerFactories, + TableSchema schema) { super(schema); - this.readProviders = new ArrayList<>(); - if (coreOptions.dataElolutionEnabled()) { - // MergeFieldSplitReadProvider is used to read the field merge split - readProviders.add( - new MergeFieldSplitReadProvider( - fieldMergeSplitReadSupplier, this::assignValues)); - } - readProviders.add( - new AppendTableRawFileSplitReadProvider(batchRawReadSupplier, this::assignValues)); + this.readProviders = + providerFactories.stream() + .map(factory -> factory.apply(this::config)) + .collect(Collectors.toList()); } private List<SplitRead<InternalRow>> initialized() { List<SplitRead<InternalRow>> readers = new ArrayList<>(); for (SplitReadProvider readProvider : readProviders) { - if (readProvider.initialized()) { - readers.add(readProvider.getOrCreate()); + if (readProvider.get().initialized()) { + readers.add(readProvider.get().get()); } } return readers; } - private void assignValues(SplitRead<InternalRow> read) { + private void config(SplitRead<InternalRow> read) { if (readType != null) { read = read.withReadType(readType); } @@ -101,7 +91,7 @@ public final class AppendTableRead extends AbstractDataTableRead { DataSplit dataSplit = (DataSplit) split; for (SplitReadProvider readProvider : readProviders) { if (readProvider.match(dataSplit, false)) { - return readProvider.getOrCreate().createReader(dataSplit); + return readProvider.get().get().createReader(dataSplit); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index 4f6c6515f7..c4bede61d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -63,23 +63,23 @@ public final class KeyValueTableRead extends AbstractDataTableRead { this.readProviders = Arrays.asList( new PrimaryKeyTableRawFileSplitReadProvider( - batchRawReadSupplier, this::assignValues), - new MergeFileSplitReadProvider(mergeReadSupplier, this::assignValues), - new IncrementalChangelogReadProvider(mergeReadSupplier, this::assignValues), - new IncrementalDiffReadProvider(mergeReadSupplier, this::assignValues)); + batchRawReadSupplier, this::config), + new MergeFileSplitReadProvider(mergeReadSupplier, this::config), + new IncrementalChangelogReadProvider(mergeReadSupplier, this::config), + new IncrementalDiffReadProvider(mergeReadSupplier, this::config)); } private List<SplitRead<InternalRow>> initialized() { List<SplitRead<InternalRow>> readers = new ArrayList<>(); for (SplitReadProvider readProvider : readProviders) { - if (readProvider.initialized()) { - readers.add(readProvider.getOrCreate()); + if (readProvider.get().initialized()) { + readers.add(readProvider.get().get()); } } return readers; } - private void assignValues(SplitRead<InternalRow> read) { + private void config(SplitRead<InternalRow> read) { if (forceKeepDelete) { read = read.forceKeepDelete(); } @@ -121,7 +121,7 @@ public final class KeyValueTableRead extends AbstractDataTableRead { DataSplit dataSplit = (DataSplit) split; for (SplitReadProvider readProvider : readProviders) { if (readProvider.match(dataSplit, forceKeepDelete)) { - return readProvider.getOrCreate().createReader(dataSplit); + return readProvider.get().get().createReader(dataSplit); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java index 4dd85e839d..4c6bf86008 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/AppendTableRawFileSplitReadProvider.java @@ -18,20 +18,17 @@ package org.apache.paimon.table.source.splitread; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.SplitRead; import org.apache.paimon.table.source.DataSplit; -import java.util.function.Consumer; import java.util.function.Supplier; /** Raw file split read for all append table. */ public class AppendTableRawFileSplitReadProvider extends RawFileSplitReadProvider { public AppendTableRawFileSplitReadProvider( - Supplier<RawFileSplitRead> supplier, Consumer<SplitRead<InternalRow>> valuesAssigner) { - super(supplier, valuesAssigner); + Supplier<RawFileSplitRead> supplier, SplitReadConfig splitReadConfig) { + super(supplier, splitReadConfig); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java similarity index 51% copy from paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java copy to paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java index a335a7c030..bcf2e9fa1b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProvider.java @@ -18,48 +18,56 @@ package org.apache.paimon.table.source.splitread; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.operation.MergeFileSplitRead; -import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.utils.LazyField; -import java.util.function.Consumer; +import java.util.List; import java.util.function.Supplier; -/** A {@link SplitReadProvider} to batch incremental diff read. */ -public class IncrementalDiffReadProvider implements SplitReadProvider { +/** A {@link SplitReadProvider} to create {@link DataEvolutionSplitRead}. */ +public class DataEvolutionSplitReadProvider implements SplitReadProvider { - private final LazyField<SplitRead<InternalRow>> splitRead; + private final LazyField<DataEvolutionSplitRead> splitRead; - public IncrementalDiffReadProvider( - Supplier<MergeFileSplitRead> supplier, - Consumer<SplitRead<InternalRow>> valuesAssigner) { + public DataEvolutionSplitReadProvider( + Supplier<DataEvolutionSplitRead> supplier, SplitReadConfig splitReadConfig) { this.splitRead = new LazyField<>( () -> { - SplitRead<InternalRow> read = create(supplier); - valuesAssigner.accept(read); + DataEvolutionSplitRead read = supplier.get(); + splitReadConfig.config(read); return read; }); } - private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) { - return new IncrementalDiffSplitRead(supplier.get()); - } - @Override public boolean match(DataSplit split, boolean forceKeepDelete) { - return !split.beforeFiles().isEmpty() && !split.isStreaming(); - } + List<DataFileMeta> files = split.dataFiles(); + if (files.size() < 2) { + return false; + } - @Override - public boolean initialized() { - return splitRead.initialized(); + Long firstRowId = null; + for (DataFileMeta file : files) { + Long current = file.firstRowId(); + if (current == null) { + return false; + } + + if (firstRowId == null) { + firstRowId = current; + } else if (!firstRowId.equals(current)) { + return false; + } + } + + return true; } @Override - public SplitRead<InternalRow> getOrCreate() { - return splitRead.get(); + public LazyField<DataEvolutionSplitRead> get() { + return splitRead; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java index eb41d02669..19d95001ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java @@ -30,7 +30,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IOFunction; import org.apache.paimon.utils.LazyField; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; @@ -41,13 +40,12 @@ public class IncrementalChangelogReadProvider implements SplitReadProvider { private final LazyField<SplitRead<InternalRow>> splitRead; public IncrementalChangelogReadProvider( - Supplier<MergeFileSplitRead> supplier, - Consumer<SplitRead<InternalRow>> valuesAssigner) { + Supplier<MergeFileSplitRead> supplier, SplitReadConfig splitReadConfig) { this.splitRead = new LazyField<>( () -> { SplitRead<InternalRow> read = create(supplier); - valuesAssigner.accept(read); + splitReadConfig.config(read); return read; }); } @@ -86,12 +84,7 @@ public class IncrementalChangelogReadProvider implements SplitReadProvider { } @Override - public boolean initialized() { - return splitRead.initialized(); - } - - @Override - public SplitRead<InternalRow> getOrCreate() { - return splitRead.get(); + public LazyField<SplitRead<InternalRow>> get() { + return splitRead; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java index a335a7c030..6d45aeed7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffReadProvider.java @@ -24,7 +24,6 @@ import org.apache.paimon.operation.SplitRead; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.utils.LazyField; -import java.util.function.Consumer; import java.util.function.Supplier; /** A {@link SplitReadProvider} to batch incremental diff read. */ @@ -33,13 +32,12 @@ public class IncrementalDiffReadProvider implements SplitReadProvider { private final LazyField<SplitRead<InternalRow>> splitRead; public IncrementalDiffReadProvider( - Supplier<MergeFileSplitRead> supplier, - Consumer<SplitRead<InternalRow>> valuesAssigner) { + Supplier<MergeFileSplitRead> supplier, SplitReadConfig splitReadConfig) { this.splitRead = new LazyField<>( () -> { SplitRead<InternalRow> read = create(supplier); - valuesAssigner.accept(read); + splitReadConfig.config(read); return read; }); } @@ -54,12 +52,7 @@ public class IncrementalDiffReadProvider implements SplitReadProvider { } @Override - public boolean initialized() { - return splitRead.initialized(); - } - - @Override - public SplitRead<InternalRow> getOrCreate() { - return splitRead.get(); + public LazyField<SplitRead<InternalRow>> get() { + return splitRead; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java deleted file mode 100644 index 59a1f31b2a..0000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFieldSplitReadProvider.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.source.splitread; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.operation.FieldMergeSplitRead; -import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.SplitRead; -import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.utils.LazyField; - -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** A {@link SplitReadProvider} to create {@link MergeFieldSplitReadProvider}. */ -public class MergeFieldSplitReadProvider implements SplitReadProvider { - - private final LazyField<FieldMergeSplitRead> splitRead; - - public MergeFieldSplitReadProvider( - Supplier<FieldMergeSplitRead> supplier, - Consumer<SplitRead<InternalRow>> valuesAssigner) { - this.splitRead = - new LazyField<>( - () -> { - FieldMergeSplitRead read = supplier.get(); - valuesAssigner.accept(read); - return read; - }); - } - - @Override - public boolean match(DataSplit split, boolean forceKeepDelete) { - List<DataFileMeta> files = split.dataFiles(); - boolean onlyAppendFiles = - files.stream() - .allMatch( - f -> - f.fileSource().isPresent() - && f.fileSource().get() == FileSource.APPEND - && f.firstRowId() != null); - if (onlyAppendFiles) { - // contains same first row id, need merge fields - return files.stream().mapToLong(DataFileMeta::firstRowId).distinct().count() - != files.size(); - } - return false; - } - - @Override - public boolean initialized() { - return splitRead.initialized(); - } - - @Override - public RawFileSplitRead getOrCreate() { - return splitRead.get(); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java index a748828912..5aa0a8d62e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java @@ -25,7 +25,6 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LazyField; -import java.util.function.Consumer; import java.util.function.Supplier; import static org.apache.paimon.table.source.KeyValueTableRead.unwrap; @@ -36,13 +35,12 @@ public class MergeFileSplitReadProvider implements SplitReadProvider { private final LazyField<SplitRead<InternalRow>> splitRead; public MergeFileSplitReadProvider( - Supplier<MergeFileSplitRead> supplier, - Consumer<SplitRead<InternalRow>> valuesAssigner) { + Supplier<MergeFileSplitRead> supplier, SplitReadConfig splitReadConfig) { this.splitRead = new LazyField<>( () -> { SplitRead<InternalRow> read = create(supplier); - valuesAssigner.accept(read); + splitReadConfig.config(read); return read; }); } @@ -58,12 +56,7 @@ public class MergeFileSplitReadProvider implements SplitReadProvider { } @Override - public boolean initialized() { - return splitRead.initialized(); - } - - @Override - public SplitRead<InternalRow> getOrCreate() { - return splitRead.get(); + public LazyField<SplitRead<InternalRow>> get() { + return splitRead; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java index d52e2a902b..ab8671c966 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/PrimaryKeyTableRawFileSplitReadProvider.java @@ -18,21 +18,18 @@ package org.apache.paimon.table.source.splitread; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.SplitRead; import org.apache.paimon.table.source.DataSplit; -import java.util.function.Consumer; import java.util.function.Supplier; /** Raw file split read for all primary key table. */ public class PrimaryKeyTableRawFileSplitReadProvider extends RawFileSplitReadProvider { public PrimaryKeyTableRawFileSplitReadProvider( - Supplier<RawFileSplitRead> supplier, Consumer<SplitRead<InternalRow>> valuesAssigner) { - super(supplier, valuesAssigner); + Supplier<RawFileSplitRead> supplier, SplitReadConfig splitReadConfig) { + super(supplier, splitReadConfig); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java index bba0b557ec..75ab25ca91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/RawFileSplitReadProvider.java @@ -18,12 +18,9 @@ package org.apache.paimon.table.source.splitread; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.SplitRead; import org.apache.paimon.utils.LazyField; -import java.util.function.Consumer; import java.util.function.Supplier; /** A {@link SplitReadProvider} to create {@link RawFileSplitRead}. */ @@ -32,23 +29,18 @@ public abstract class RawFileSplitReadProvider implements SplitReadProvider { private final LazyField<RawFileSplitRead> splitRead; public RawFileSplitReadProvider( - Supplier<RawFileSplitRead> supplier, Consumer<SplitRead<InternalRow>> valuesAssigner) { + Supplier<RawFileSplitRead> supplier, SplitReadConfig splitReadConfig) { this.splitRead = new LazyField<>( () -> { RawFileSplitRead read = supplier.get(); - valuesAssigner.accept(read); + splitReadConfig.config(read); return read; }); } @Override - public boolean initialized() { - return splitRead.initialized(); - } - - @Override - public SplitRead<InternalRow> getOrCreate() { - return splitRead.get(); + public LazyField<RawFileSplitRead> get() { + return splitRead; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java similarity index 78% copy from paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java copy to paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java index 2aaefb322f..83ebea0686 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadConfig.java @@ -20,14 +20,9 @@ package org.apache.paimon.table.source.splitread; import org.apache.paimon.data.InternalRow; import org.apache.paimon.operation.SplitRead; -import org.apache.paimon.table.source.DataSplit; -/** Provider to create {@link SplitRead}. */ -public interface SplitReadProvider { +/** Config {@link SplitRead} with projection, filter and others. */ +public interface SplitReadConfig { - boolean match(DataSplit split, boolean forceKeepDelete); - - boolean initialized(); - - SplitRead<InternalRow> getOrCreate(); + void config(SplitRead<InternalRow> read); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java index 2aaefb322f..dfc32501d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/SplitReadProvider.java @@ -21,13 +21,12 @@ package org.apache.paimon.table.source.splitread; import org.apache.paimon.data.InternalRow; import org.apache.paimon.operation.SplitRead; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.LazyField; /** Provider to create {@link SplitRead}. */ public interface SplitReadProvider { boolean match(DataSplit split, boolean forceKeepDelete); - boolean initialized(); - - SplitRead<InternalRow> getOrCreate(); + LazyField<? extends SplitRead<InternalRow>> get(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index bf4952bcd7..ad91a3c70a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -622,7 +622,7 @@ public class AppendOnlyWriterTest { fileFormat, targetFileSize, AppendOnlyWriterTest.SCHEMA, - AppendOnlyWriterTest.SCHEMA, + null, getMaxSequenceNumber(toCompact), compactManager, null, diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 55595d13d3..ca0b65b2ee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -85,7 +85,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest { fileFormat, 10, SCHEMA, - SCHEMA, + null, 0, new BucketedAppendCompactManager( null, toCompact, null, 4, 10, false, null, null), // not used diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java similarity index 97% rename from paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java rename to paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index cbf0fa31df..8cbc0413ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendMergeFieldTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -23,8 +23,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.operation.MergeFileSplitRead; -import org.apache.paimon.reader.CompoundFileReader; +import org.apache.paimon.reader.DataEvolutionFileReader; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.sink.BatchTableCommit; @@ -47,8 +46,8 @@ import java.util.stream.Collectors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -/** Test for {@link MergeFileSplitRead}. */ -public class AppendMergeFieldTest extends TableTestBase { +/** Test for table with data evolution. */ +public class DataEvolutionTableTest extends TableTestBase { @Test public void testBasic() throws Exception { @@ -76,7 +75,7 @@ public class AppendMergeFieldTest extends TableTestBase { ReadBuilder readBuilder = getTableDefault().newReadBuilder(); RecordReader<InternalRow> reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); - assertThat(reader).isInstanceOf(CompoundFileReader.class); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); reader.forEachRemaining( r -> { assertThat(r.getInt(0)).isEqualTo(1); @@ -203,7 +202,7 @@ public class AppendMergeFieldTest extends TableTestBase { ReadBuilder readBuilder = getTableDefault().newReadBuilder(); RecordReader<InternalRow> reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); - assertThat(reader).isInstanceOf(CompoundFileReader.class); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); reader.forEachRemaining( r -> { @@ -375,7 +374,7 @@ public class AppendMergeFieldTest extends TableTestBase { ReadBuilder readBuilder = getTableDefault().newReadBuilder(); RecordReader<InternalRow> reader = readBuilder.newRead().createReader(readBuilder.newScan().plan()); - assertThat(reader).isInstanceOf(CompoundFileReader.class); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); AtomicInteger i = new AtomicInteger(0); reader.forEachRemaining( r -> { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java new file mode 100644 index 0000000000..b53a33481a --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/splitread/DataEvolutionSplitReadProviderTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.splitread; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.DataEvolutionSplitRead; +import org.apache.paimon.table.source.DataSplit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Tests for {@link DataEvolutionSplitReadProvider}. */ +public class DataEvolutionSplitReadProviderTest { + + private Supplier<DataEvolutionSplitRead> mockSupplier; + private SplitReadConfig mockSplitReadConfig; + private DataEvolutionSplitRead mockSplitRead; + private DataEvolutionSplitReadProvider provider; + + @SuppressWarnings("unchecked") + @BeforeEach + public void setUp() { + mockSupplier = (Supplier<DataEvolutionSplitRead>) mock(Supplier.class); + mockSplitReadConfig = mock(SplitReadConfig.class); + mockSplitRead = mock(DataEvolutionSplitRead.class); + when(mockSupplier.get()).thenReturn(mockSplitRead); + + provider = new DataEvolutionSplitReadProvider(mockSupplier, mockSplitReadConfig); + } + + @Test + public void testGetAndInitialization() { + // Supplier should not be called yet due to lazy initialization + verify(mockSupplier, times(0)).get(); + + // First access, should trigger initialization + DataEvolutionSplitRead read = provider.get().get(); + + // Verify supplier and config were called + verify(mockSupplier, times(1)).get(); + verify(mockSplitReadConfig, times(1)).config(mockSplitRead); + assertThat(read).isSameAs(mockSplitRead); + + // Second access, should return cached instance without re-initializing + DataEvolutionSplitRead read2 = provider.get().get(); + verify(mockSupplier, times(1)).get(); + verify(mockSplitReadConfig, times(1)).config(mockSplitRead); + assertThat(read2).isSameAs(mockSplitRead); + } + + @Test + public void testMatchWithNoFiles() { + DataSplit split = mock(DataSplit.class); + when(split.dataFiles()).thenReturn(Collections.emptyList()); + assertThat(provider.match(split, false)).isFalse(); + } + + @Test + public void testMatchWithOneFile() { + DataSplit split = mock(DataSplit.class); + DataFileMeta file1 = mock(DataFileMeta.class); + when(split.dataFiles()).thenReturn(Collections.singletonList(file1)); + assertThat(provider.match(split, false)).isFalse(); + } + + @Test + public void testMatchWithNullFirstRowId() { + DataSplit split = mock(DataSplit.class); + DataFileMeta file1 = mock(DataFileMeta.class); + DataFileMeta file2 = mock(DataFileMeta.class); + + when(file1.firstRowId()).thenReturn(1L); + when(file2.firstRowId()).thenReturn(null); + when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2)); + + assertThat(provider.match(split, false)).isFalse(); + } + + @Test + public void testMatchWithDifferentFirstRowIds() { + DataSplit split = mock(DataSplit.class); + DataFileMeta file1 = mock(DataFileMeta.class); + DataFileMeta file2 = mock(DataFileMeta.class); + + when(file1.firstRowId()).thenReturn(1L); + when(file2.firstRowId()).thenReturn(2L); + when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2)); + + assertThat(provider.match(split, false)).isFalse(); + } + + @Test + public void testMatchSuccess() { + DataSplit split = mock(DataSplit.class); + DataFileMeta file1 = mock(DataFileMeta.class); + DataFileMeta file2 = mock(DataFileMeta.class); + + when(file1.firstRowId()).thenReturn(100L); + when(file2.firstRowId()).thenReturn(100L); + when(split.dataFiles()).thenReturn(Arrays.asList(file1, file2)); + + // The forceKeepDelete parameter is not used in match, so test both values + assertThat(provider.match(split, true)).isTrue(); + assertThat(provider.match(split, false)).isTrue(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java index 8648bf734e..8d8a668f79 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactWorkerOperator.java @@ -70,7 +70,7 @@ public class AppendPreCommitCompactWorkerOperator extends AbstractStreamOperator this.write = (AppendFileStoreWrite) table.store().newWrite(null); if (coreOptions.rowTrackingEnabled()) { checkArgument( - !coreOptions.dataElolutionEnabled(), + !coreOptions.dataEvolutionEnabled(), "Data evolution enabled table should not invoke compact yet."); this.write.withWriteType(SpecialFields.rowTypeWithRowLineage(table.rowType())); }