This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 66981715e6 [core] AbstractFileStoreWrite should not provide 
createWriterContainer with ignorePreviousFiles
66981715e6 is described below

commit 66981715e6fccc587cbf26d76fb723bb085465d9
Author: JingsongLi <jingsongl...@gmail.com>
AuthorDate: Tue Aug 19 16:53:03 2025 +0800

    [core] AbstractFileStoreWrite should not provide createWriterContainer with 
ignorePreviousFiles
---
 .../paimon/operation/AbstractFileStoreWrite.java   | 14 ++++---
 .../apache/paimon/table/sink/TableWriteImpl.java   |  4 ++
 .../test/java/org/apache/paimon/TestFileStore.java |  5 +--
 .../paimon/operation/FileStoreTestUtils.java       |  3 +-
 .../operation/KeyValueFileStoreWriteTest.java      |  2 +-
 .../apache/paimon/operation/TestCommitThread.java  |  4 +-
 .../paimon/table/DynamicBucketTableTest.java       |  7 ++--
 .../flink/source/TestChangelogDataReadWrite.java   | 46 +++++++++++-----------
 8 files changed, 44 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 0b8797cd08..2e61c3d2f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -401,16 +401,14 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
             writers.put(partition.copy(), buckets);
         }
         return buckets.computeIfAbsent(
-                bucket, k -> createWriterContainer(partition.copy(), bucket, 
ignorePreviousFiles));
+                bucket, k -> createWriterContainer(partition.copy(), bucket));
     }
 
-    private long writerNumber() {
-        return writers.values().stream().mapToLong(Map::size).sum();
+    public RecordWriter<T> createWriter(BinaryRow partition, int bucket) {
+        return createWriterContainer(partition, bucket).writer;
     }
 
-    @VisibleForTesting
-    public WriterContainer<T> createWriterContainer(
-            BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
+    public WriterContainer<T> createWriterContainer(BinaryRow partition, int 
bucket) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating writer for partition {}, bucket {}", 
partition, bucket);
         }
@@ -461,6 +459,10 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 previousSnapshot == null ? null : previousSnapshot.id());
     }
 
+    private long writerNumber() {
+        return writers.values().stream().mapToLong(Map::size).sum();
+    }
+
     @Override
     public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) 
{
         this.compactionMetrics = new CompactionMetrics(metricRegistry, 
tableName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 9d3f4e8b51..ac1499c75d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -90,6 +90,10 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         this.defaultValueRow = DefaultValueRow.create(rowType);
     }
 
+    public FileStoreWrite<T> fileStoreWrite() {
+        return write;
+    }
+
     @Override
     public InnerTableWrite withWriteRestore(WriteRestore writeRestore) {
         this.write.withWriteRestore(writeRestore);
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 7d303cad2d..8e4cafc7de 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -321,10 +321,9 @@ public class TestFileStore extends KeyValueFileStore {
                             bucket,
                             (b, w) -> {
                                 if (w == null) {
+                                    
write.withIgnorePreviousFiles(ignorePreviousFiles);
                                     RecordWriter<KeyValue> writer =
-                                            write.createWriterContainer(
-                                                            partition, bucket, 
ignorePreviousFiles)
-                                                    .writer;
+                                            
write.createWriterContainer(partition, bucket).writer;
                                     ((MemoryOwner) writer)
                                             .setMemoryPool(
                                                     new HeapMemorySegmentPool(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index b874f07237..54fef645cb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -74,8 +74,7 @@ public class FileStoreTestUtils {
             TestFileStore store, List<KeyValue> keyValues, BinaryRow 
partition, int bucket)
             throws Exception {
         AbstractFileStoreWrite<KeyValue> write = store.newWrite();
-        RecordWriter<KeyValue> writer =
-                write.createWriterContainer(partition, bucket, false).writer;
+        RecordWriter<KeyValue> writer = write.createWriterContainer(partition, 
bucket).writer;
         ((MemoryOwner) writer)
                 .setMemoryPool(
                         new HeapMemorySegmentPool(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
index baeb6e9b4e..c13fcc1f64 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -98,7 +98,7 @@ public class KeyValueFileStoreWriteTest {
 
         KeyValue keyValue = gen.next();
         AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer =
-                write.createWriterContainer(gen.getPartition(keyValue), 1, 
false);
+                write.createWriterContainer(gen.getPartition(keyValue), 1);
         MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
         try (MergeTreeCompactManager compactManager =
                 (MergeTreeCompactManager) writer.compactManager()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index e1f9cb3994..2f409a9176 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -289,8 +289,8 @@ public class TestCommitThread extends Thread {
     }
 
     private MergeTreeWriter createWriter(BinaryRow partition, boolean empty) {
-        MergeTreeWriter writer =
-                (MergeTreeWriter) write.createWriterContainer(partition, 0, 
empty).writer;
+        write.withIgnorePreviousFiles(empty);
+        MergeTreeWriter writer = (MergeTreeWriter) 
write.createWriterContainer(partition, 0).writer;
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(
                         WRITE_BUFFER_SIZE.getBytes(), (int) 
PAGE_SIZE.getBytes()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 79f2c34100..29e71e2b44 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -51,11 +51,10 @@ public class DynamicBucketTableTest extends TableTestBase {
         Table table = getTableDefault();
         BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl) 
table.newBatchWriteBuilder();
         TableWriteImpl batchTableWrite = (TableWriteImpl) 
builder.withOverwrite().newWrite();
+        AbstractFileStoreWrite<?> write = (AbstractFileStoreWrite<?>) 
(batchTableWrite.getWrite());
+        write.withIgnorePreviousFiles(true);
         DynamicBucketIndexMaintainer indexMaintainer =
-                (DynamicBucketIndexMaintainer)
-                        ((AbstractFileStoreWrite<?>) 
(batchTableWrite.getWrite()))
-                                .createWriterContainer(BinaryRow.EMPTY_ROW, 0, 
true)
-                                .dynamicBucketMaintainer;
+                write.createWriterContainer(BinaryRow.EMPTY_ROW, 
0).dynamicBucketMaintainer;
 
         assertThat(indexMaintainer.isEmpty()).isTrue();
         Pair<InternalRow, Integer> rowWithBucket = data(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 2e9695a525..cb6ef1c40b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -173,30 +173,30 @@ public class TestChangelogDataReadWrite {
                 new 
CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
 
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
-        RecordWriter<KeyValue> writer =
+        KeyValueFileStoreWrite write =
                 new KeyValueFileStoreWrite(
-                                LocalFileIO.create(),
-                                schemaManager,
-                                schemaManager.schema(0),
-                                commitUser,
-                                PARTITION_TYPE,
-                                KEY_TYPE,
-                                VALUE_TYPE,
-                                () -> COMPARATOR,
-                                () -> null,
-                                () -> EQUALISER,
-                                DeduplicateMergeFunction.factory(),
-                                pathFactory,
-                                (coreOptions, format) -> pathFactory,
-                                snapshotManager,
-                                null, // not used, we only create an empty 
writer
-                                null,
-                                null,
-                                options,
-                                EXTRACTOR,
-                                tablePath.getName())
-                        .createWriterContainer(partition, bucket, true)
-                        .writer;
+                        LocalFileIO.create(),
+                        schemaManager,
+                        schemaManager.schema(0),
+                        commitUser,
+                        PARTITION_TYPE,
+                        KEY_TYPE,
+                        VALUE_TYPE,
+                        () -> COMPARATOR,
+                        () -> null,
+                        () -> EQUALISER,
+                        DeduplicateMergeFunction.factory(),
+                        pathFactory,
+                        (coreOptions, format) -> pathFactory,
+                        snapshotManager,
+                        null, // not used, we only create an empty writer
+                        null,
+                        null,
+                        options,
+                        EXTRACTOR,
+                        tablePath.getName());
+        write.withIgnorePreviousFiles(true);
+        RecordWriter<KeyValue> writer = write.createWriterContainer(partition, 
bucket).writer;
         ((MemoryOwner) writer)
                 .setMemoryPool(
                         new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));

Reply via email to