This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 01bc12a0b [core] Rename emptyWriter to ignorePreviousFiles (#1381)
01bc12a0b is described below

commit 01bc12a0b40287f41bfa25e0ff45ed84cc476d78
Author: dohongdayi <[email protected]>
AuthorDate: Thu Jun 15 13:58:34 2023 +0800

    [core] Rename emptyWriter to ignorePreviousFiles (#1381)
---
 .../org/apache/paimon/operation/AbstractFileStoreWrite.java  | 12 ++++++------
 .../java/org/apache/paimon/operation/FileStoreWrite.java     |  6 +++---
 .../org/apache/paimon/table/AppendOnlyFileStoreTable.java    |  2 +-
 .../org/apache/paimon/table/sink/BatchWriteBuilderImpl.java  |  2 +-
 .../java/org/apache/paimon/table/sink/InnerTableWrite.java   |  2 +-
 .../java/org/apache/paimon/table/sink/TableWriteImpl.java    |  4 ++--
 .../src/test/java/org/apache/paimon/TestFileStore.java       |  7 ++++---
 .../java/org/apache/paimon/table/FileStoreTableTestBase.java |  4 ++--
 .../snapshot/ContinuousCompactorFollowUpScannerTest.java     |  2 +-
 .../main/java/org/apache/paimon/flink/sink/FlinkSink.java    | 10 +++++-----
 .../paimon/flink/sink/GlobalFullCompactionSinkWrite.java     |  4 ++--
 .../org/apache/paimon/flink/sink/StoreSinkWriteImpl.java     |  8 ++++----
 12 files changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 093b96fe7..c66a1d1ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -70,7 +70,7 @@ public abstract class AbstractFileStoreWrite<T>
     protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
 
     private ExecutorService lazyCompactExecutor;
-    private boolean emptyWriter = false;
+    private boolean ignorePreviousFiles = false;
 
     protected AbstractFileStoreWrite(
             String commitUser,
@@ -97,8 +97,8 @@ public abstract class AbstractFileStoreWrite<T>
     }
 
     @Override
-    public void fromEmptyWriter(boolean emptyWriter) {
-        this.emptyWriter = emptyWriter;
+    public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
+        this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
     @Override
@@ -303,19 +303,19 @@ public abstract class AbstractFileStoreWrite<T>
             writers.put(partition.copy(), buckets);
         }
         return buckets.computeIfAbsent(
-                bucket, k -> createWriterContainer(partition.copy(), bucket, 
emptyWriter));
+                bucket, k -> createWriterContainer(partition.copy(), bucket, 
ignorePreviousFiles));
     }
 
     @VisibleForTesting
     public WriterContainer<T> createWriterContainer(
-            BinaryRow partition, int bucket, boolean emptyWriter) {
+            BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating writer for partition {}, bucket {}", 
partition, bucket);
         }
 
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         List<DataFileMeta> restoreFiles = new ArrayList<>();
-        if (!emptyWriter && latestSnapshotId != null) {
+        if (!ignorePreviousFiles && latestSnapshotId != null) {
             restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, 
bucket);
         }
         IndexMaintainer<T> indexMaintainer =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 1e921186c..d7cd64250 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -47,11 +47,11 @@ public interface FileStoreWrite<T> {
     FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool);
 
     /**
-     * Set writer to be empty, the writer will not search restored files.
+     * Set whether the write operation should ignore previously stored files.
      *
-     * @param emptyWriter set flag to tag the writer is empty.
+     * @param ignorePreviousFiles whether the write operation should ignore 
previously stored files.
      */
-    void fromEmptyWriter(boolean emptyWriter);
+    void withIgnorePreviousFiles(boolean ignorePreviousFiles);
 
     /**
      * Write the data to the store according to the partition and bucket.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index cd342db5f..c5ceb8eab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -134,7 +134,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
         // if this table is non-bucket table, we skip compaction and restored 
files searching
         if (bucketMode() == BucketMode.UNAWARE) {
             writer.skipCompaction();
-            writer.fromEmptyWriter(true);
+            writer.withIgnorePreviousFiles(true);
         }
         return new TableWriteImpl<>(
                 writer,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 437a6561d..4acd6be33 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -58,7 +58,7 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableWrite newWrite() {
-        return table.newWrite(commitUser).fromEmptyWriter(staticPartition != 
null);
+        return 
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 87aca40c1..69eb61f97 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -21,5 +21,5 @@ package org.apache.paimon.table.sink;
 /** Inner {@link TableWrite} contains overwrite setter. */
 public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
 
-    InnerTableWrite fromEmptyWriter(boolean emptyWriter);
+    InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index e3b524b9b..1103509b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -56,8 +56,8 @@ public class TableWriteImpl<T>
     }
 
     @Override
-    public TableWriteImpl<T> fromEmptyWriter(boolean emptyWriter) {
-        write.fromEmptyWriter(emptyWriter);
+    public TableWriteImpl<T> withIgnorePreviousFiles(boolean 
ignorePreviousFiles) {
+        write.withIgnorePreviousFiles(ignorePreviousFiles);
         return this;
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 68ce6e190..3ab8227c3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -239,7 +239,7 @@ public class TestFileStore extends KeyValueFileStore {
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRow> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
-            boolean emptyWriter,
+            boolean ignorePreviousFiles,
             Long identifier,
             Long watermark,
             List<IndexFileMeta> indexFiles,
@@ -257,7 +257,7 @@ public class TestFileStore extends KeyValueFileStore {
                                 if (w == null) {
                                     RecordWriter<KeyValue> writer =
                                             write.createWriterContainer(
-                                                            partition, bucket, 
emptyWriter)
+                                                            partition, bucket, 
ignorePreviousFiles)
                                                     .writer;
                                     ((MemoryOwner) writer)
                                             .setMemoryPool(
@@ -280,7 +280,8 @@ public class TestFileStore extends KeyValueFileStore {
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
                     entryWithPartition.getValue().entrySet()) {
-                CommitIncrement increment = 
entryWithBucket.getValue().prepareCommit(emptyWriter);
+                CommitIncrement increment =
+                        
entryWithBucket.getValue().prepareCommit(ignorePreviousFiles);
                 committable.addFileCommittable(
                         new CommitMessageImpl(
                                 entryWithPartition.getKey(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f180e7e79..610f8cc6b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -282,7 +282,7 @@ public abstract class FileStoreTableTestBase {
         }
 
         // overwrite data
-        try (StreamTableWrite write = 
table.newWrite(commitUser).fromEmptyWriter(true);
+        try (StreamTableWrite write = 
table.newWrite(commitUser).withIgnorePreviousFiles(true);
                 InnerTableCommit commit = table.newCommit(commitUser)) {
             for (InternalRow row : overwriteData) {
                 write.write(row);
@@ -314,7 +314,7 @@ public abstract class FileStoreTableTestBase {
         commit.commit(0, write.prepareCommit(true, 0));
         write.close();
 
-        write = table.newWrite(commitUser).fromEmptyWriter(true);
+        write = table.newWrite(commitUser).withIgnorePreviousFiles(true);
         commit = table.newCommit(commitUser);
         write.write(rowData(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 58928b243..9e4ac18f4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -67,7 +67,7 @@ public class ContinuousCompactorFollowUpScannerTest extends 
ScannerTestBase {
 
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "1");
-        write = table.newWrite(commitUser).fromEmptyWriter(true);
+        write = table.newWrite(commitUser).withIgnorePreviousFiles(true);
         commit = table.newCommit(commitUser).withOverwrite(overwritePartition);
         write.write(rowData(1, 10, 101L));
         write.write(rowData(1, 20, 201L));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index a88523219..37b1f16ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -59,11 +59,11 @@ public abstract class FlinkSink<T> implements Serializable {
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
     protected final FileStoreTable table;
-    private final boolean emptyWriter;
+    private final boolean ignorePreviousFiles;
 
-    public FlinkSink(FileStoreTable table, boolean emptyWriter) {
+    public FlinkSink(FileStoreTable table, boolean ignorePreviousFiles) {
         this.table = table;
-        this.emptyWriter = emptyWriter;
+        this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
     private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig 
checkpointConfig) {
@@ -97,7 +97,7 @@ public abstract class FlinkSink<T> implements Serializable {
                                 commitUser,
                                 state,
                                 ioManager,
-                                emptyWriter,
+                                ignorePreviousFiles,
                                 waitCompaction,
                                 finalDeltaCommits,
                                 memoryPool);
@@ -110,7 +110,7 @@ public abstract class FlinkSink<T> implements Serializable {
                         commitUser,
                         state,
                         ioManager,
-                        emptyWriter,
+                        ignorePreviousFiles,
                         waitCompaction,
                         memoryPool);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8ed101b8f..fd568ae05 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -69,11 +69,11 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
             String commitUser,
             StoreSinkWriteState state,
             IOManager ioManager,
-            boolean emptyWriter,
+            boolean ignorePreviousFiles,
             boolean waitCompaction,
             int deltaCommits,
             @Nullable MemorySegmentPool memoryPool) {
-        super(table, commitUser, state, ioManager, emptyWriter, 
waitCompaction, memoryPool);
+        super(table, commitUser, state, ioManager, ignorePreviousFiles, 
waitCompaction, memoryPool);
 
         this.deltaCommits = deltaCommits;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 3b2baf0b3..3b376d707 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -48,7 +48,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
     protected final String commitUser;
     protected final StoreSinkWriteState state;
     private final IOManager ioManager;
-    private final boolean emptyWriter;
+    private final boolean ignorePreviousFiles;
     private final boolean waitCompaction;
     @Nullable private final MemorySegmentPool memoryPool;
 
@@ -59,13 +59,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             String commitUser,
             StoreSinkWriteState state,
             IOManager ioManager,
-            boolean emptyWriter,
+            boolean ignorePreviousFiles,
             boolean waitCompaction,
             @Nullable MemorySegmentPool memoryPool) {
         this.commitUser = commitUser;
         this.state = state;
         this.ioManager = ioManager;
-        this.emptyWriter = emptyWriter;
+        this.ignorePreviousFiles = ignorePreviousFiles;
         this.waitCompaction = waitCompaction;
         this.memoryPool = memoryPool;
         this.write = newTableWrite(table);
@@ -83,7 +83,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                                 : new HeapMemorySegmentPool(
                                         table.coreOptions().writeBufferSize(),
                                         table.coreOptions().pageSize()))
-                .fromEmptyWriter(emptyWriter);
+                .withIgnorePreviousFiles(ignorePreviousFiles);
     }
 
     @Override

Reply via email to