This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 66981715e6 [core] AbstractFileStoreWrite should not provide createWriterContainer with ignorePreviousFiles 66981715e6 is described below commit 66981715e6fccc587cbf26d76fb723bb085465d9 Author: JingsongLi <jingsongl...@gmail.com> AuthorDate: Tue Aug 19 16:53:03 2025 +0800 [core] AbstractFileStoreWrite should not provide createWriterContainer with ignorePreviousFiles --- .../paimon/operation/AbstractFileStoreWrite.java | 14 ++++--- .../apache/paimon/table/sink/TableWriteImpl.java | 4 ++ .../test/java/org/apache/paimon/TestFileStore.java | 5 +-- .../paimon/operation/FileStoreTestUtils.java | 3 +- .../operation/KeyValueFileStoreWriteTest.java | 2 +- .../apache/paimon/operation/TestCommitThread.java | 4 +- .../paimon/table/DynamicBucketTableTest.java | 7 ++-- .../flink/source/TestChangelogDataReadWrite.java | 46 +++++++++++----------- 8 files changed, 44 insertions(+), 41 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 0b8797cd08..2e61c3d2f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -401,16 +401,14 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { writers.put(partition.copy(), buckets); } return buckets.computeIfAbsent( - bucket, k -> createWriterContainer(partition.copy(), bucket, ignorePreviousFiles)); + bucket, k -> createWriterContainer(partition.copy(), bucket)); } - private long writerNumber() { - return writers.values().stream().mapToLong(Map::size).sum(); + public RecordWriter<T> createWriter(BinaryRow partition, int bucket) { + return createWriterContainer(partition, bucket).writer; } - @VisibleForTesting - public WriterContainer<T> createWriterContainer( - BinaryRow partition, int bucket, boolean ignorePreviousFiles) { + public WriterContainer<T> createWriterContainer(BinaryRow partition, int bucket) { if (LOG.isDebugEnabled()) { LOG.debug("Creating writer for partition {}, bucket {}", partition, bucket); } @@ -461,6 +459,10 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { previousSnapshot == null ? null : previousSnapshot.id()); } + private long writerNumber() { + return writers.values().stream().mapToLong(Map::size).sum(); + } + @Override public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) { this.compactionMetrics = new CompactionMetrics(metricRegistry, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 9d3f4e8b51..ac1499c75d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -90,6 +90,10 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State this.defaultValueRow = DefaultValueRow.create(rowType); } + public FileStoreWrite<T> fileStoreWrite() { + return write; + } + @Override public InnerTableWrite withWriteRestore(WriteRestore writeRestore) { this.write.withWriteRestore(writeRestore); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 7d303cad2d..8e4cafc7de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -321,10 +321,9 @@ public class TestFileStore extends KeyValueFileStore { bucket, (b, w) -> { if (w == null) { + write.withIgnorePreviousFiles(ignorePreviousFiles); RecordWriter<KeyValue> writer = - write.createWriterContainer( - partition, bucket, ignorePreviousFiles) - .writer; + write.createWriterContainer(partition, bucket).writer; ((MemoryOwner) writer) .setMemoryPool( new HeapMemorySegmentPool( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java index b874f07237..54fef645cb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java @@ -74,8 +74,7 @@ public class FileStoreTestUtils { TestFileStore store, List<KeyValue> keyValues, BinaryRow partition, int bucket) throws Exception { AbstractFileStoreWrite<KeyValue> write = store.newWrite(); - RecordWriter<KeyValue> writer = - write.createWriterContainer(partition, bucket, false).writer; + RecordWriter<KeyValue> writer = write.createWriterContainer(partition, bucket).writer; ((MemoryOwner) writer) .setMemoryPool( new HeapMemorySegmentPool( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java index baeb6e9b4e..c13fcc1f64 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java @@ -98,7 +98,7 @@ public class KeyValueFileStoreWriteTest { KeyValue keyValue = gen.next(); AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer = - write.createWriterContainer(gen.getPartition(keyValue), 1, false); + write.createWriterContainer(gen.getPartition(keyValue), 1); MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer; try (MergeTreeCompactManager compactManager = (MergeTreeCompactManager) writer.compactManager()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java index e1f9cb3994..2f409a9176 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java @@ -289,8 +289,8 @@ public class TestCommitThread extends Thread { } private MergeTreeWriter createWriter(BinaryRow partition, boolean empty) { - MergeTreeWriter writer = - (MergeTreeWriter) write.createWriterContainer(partition, 0, empty).writer; + write.withIgnorePreviousFiles(empty); + MergeTreeWriter writer = (MergeTreeWriter) write.createWriterContainer(partition, 0).writer; writer.setMemoryPool( new HeapMemorySegmentPool( WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes())); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java index 79f2c34100..29e71e2b44 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java @@ -51,11 +51,10 @@ public class DynamicBucketTableTest extends TableTestBase { Table table = getTableDefault(); BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl) table.newBatchWriteBuilder(); TableWriteImpl batchTableWrite = (TableWriteImpl) builder.withOverwrite().newWrite(); + AbstractFileStoreWrite<?> write = (AbstractFileStoreWrite<?>) (batchTableWrite.getWrite()); + write.withIgnorePreviousFiles(true); DynamicBucketIndexMaintainer indexMaintainer = - (DynamicBucketIndexMaintainer) - ((AbstractFileStoreWrite<?>) (batchTableWrite.getWrite())) - .createWriterContainer(BinaryRow.EMPTY_ROW, 0, true) - .dynamicBucketMaintainer; + write.createWriterContainer(BinaryRow.EMPTY_ROW, 0).dynamicBucketMaintainer; assertThat(indexMaintainer.isEmpty()).isTrue(); Pair<InternalRow, Integer> rowWithBucket = data(0); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 2e9695a525..cb6ef1c40b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -173,30 +173,30 @@ public class TestChangelogDataReadWrite { new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro")); SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); - RecordWriter<KeyValue> writer = + KeyValueFileStoreWrite write = new KeyValueFileStoreWrite( - LocalFileIO.create(), - schemaManager, - schemaManager.schema(0), - commitUser, - PARTITION_TYPE, - KEY_TYPE, - VALUE_TYPE, - () -> COMPARATOR, - () -> null, - () -> EQUALISER, - DeduplicateMergeFunction.factory(), - pathFactory, - (coreOptions, format) -> pathFactory, - snapshotManager, - null, // not used, we only create an empty writer - null, - null, - options, - EXTRACTOR, - tablePath.getName()) - .createWriterContainer(partition, bucket, true) - .writer; + LocalFileIO.create(), + schemaManager, + schemaManager.schema(0), + commitUser, + PARTITION_TYPE, + KEY_TYPE, + VALUE_TYPE, + () -> COMPARATOR, + () -> null, + () -> EQUALISER, + DeduplicateMergeFunction.factory(), + pathFactory, + (coreOptions, format) -> pathFactory, + snapshotManager, + null, // not used, we only create an empty writer + null, + null, + options, + EXTRACTOR, + tablePath.getName()); + write.withIgnorePreviousFiles(true); + RecordWriter<KeyValue> writer = write.createWriterContainer(partition, bucket).writer; ((MemoryOwner) writer) .setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));