This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6c5fcba [core] add deletedFiles in NewFilesIncrement (#3117)
3d6c5fcba is described below

commit 3d6c5fcba939757f53c38bf43ffd9cc7d699a65c
Author: Yann Byron <[email protected]>
AuthorDate: Fri Mar 29 14:39:03 2024 +0800

    [core] add deletedFiles in NewFilesIncrement (#3117)
---
 .../java/org/apache/paimon/append/AppendOnlyWriter.java    |  9 ++++++++-
 .../main/java/org/apache/paimon/io/NewFilesIncrement.java  | 14 ++++++++++++--
 .../java/org/apache/paimon/mergetree/MergeTreeWriter.java  |  9 ++++++++-
 .../main/java/org/apache/paimon/migrate/FileMetaUtils.java |  3 ++-
 .../org/apache/paimon/operation/FileStoreCommitImpl.java   |  7 +++++++
 .../apache/paimon/table/sink/CommitMessageSerializer.java  |  2 ++
 .../org/apache/paimon/index/HashBucketAssignerTest.java    |  3 ++-
 .../paimon/manifest/ManifestCommittableSerializerTest.java |  1 +
 .../apache/paimon/flink/sink/CommitterOperatorTest.java    |  6 ++++++
 9 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index f61083f11..544e766ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -66,6 +66,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final CompactManager compactManager;
     private final boolean forceCompact;
     private final List<DataFileMeta> newFiles;
+    private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
@@ -101,6 +102,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.compactManager = compactManager;
         this.forceCompact = forceCompact;
         this.newFiles = new ArrayList<>();
+        this.deletedFiles = new ArrayList<>();
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
@@ -113,6 +115,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
         if (increment != null) {
             newFiles.addAll(increment.newFilesIncrement().newFiles());
+            deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
             compactBefore.addAll(increment.compactIncrement().compactBefore());
             compactAfter.addAll(increment.compactIncrement().compactAfter());
         }
@@ -233,7 +236,10 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
     private CommitIncrement drainIncrement() {
         NewFilesIncrement newFilesIncrement =
-                new NewFilesIncrement(new ArrayList<>(newFiles), 
Collections.emptyList());
+                new NewFilesIncrement(
+                        new ArrayList<>(newFiles),
+                        new ArrayList<>(deletedFiles),
+                        Collections.emptyList());
         CompactIncrement compactIncrement =
                 new CompactIncrement(
                         new ArrayList<>(compactBefore),
@@ -241,6 +247,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                         Collections.emptyList());
 
         newFiles.clear();
+        deletedFiles.clear();
         compactBefore.clear();
         compactAfter.clear();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
index 4e980bd31..b2f63070d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
@@ -27,21 +27,31 @@ import java.util.stream.Collectors;
 public class NewFilesIncrement {
 
     private final List<DataFileMeta> newFiles;
+    private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> changelogFiles;
 
-    public NewFilesIncrement(List<DataFileMeta> newFiles, List<DataFileMeta> 
changelogFiles) {
+    public NewFilesIncrement(
+            List<DataFileMeta> newFiles,
+            List<DataFileMeta> deletedFiles,
+            List<DataFileMeta> changelogFiles) {
         this.newFiles = newFiles;
+        this.deletedFiles = deletedFiles;
         this.changelogFiles = changelogFiles;
     }
 
     public static NewFilesIncrement emptyIncrement() {
-        return new NewFilesIncrement(Collections.emptyList(), 
Collections.emptyList());
+        return new NewFilesIncrement(
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
     }
 
     public List<DataFileMeta> newFiles() {
         return newFiles;
     }
 
+    public List<DataFileMeta> deletedFiles() {
+        return deletedFiles;
+    }
+
     public List<DataFileMeta> changelogFiles() {
         return changelogFiles;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 537d838d0..1c94a2108 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -69,6 +69,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     @Nullable private final FieldsComparator userDefinedSeqComparator;
 
     private final LinkedHashSet<DataFileMeta> newFiles;
+    private final LinkedHashSet<DataFileMeta> deletedFiles;
     private final LinkedHashSet<DataFileMeta> newFilesChangelog;
     private final LinkedHashMap<String, DataFileMeta> compactBefore;
     private final LinkedHashSet<DataFileMeta> compactAfter;
@@ -107,12 +108,14 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         this.userDefinedSeqComparator = userDefinedSeqComparator;
 
         this.newFiles = new LinkedHashSet<>();
+        this.deletedFiles = new LinkedHashSet<>();
         this.newFilesChangelog = new LinkedHashSet<>();
         this.compactBefore = new LinkedHashMap<>();
         this.compactAfter = new LinkedHashSet<>();
         this.compactChangelog = new LinkedHashSet<>();
         if (increment != null) {
             newFiles.addAll(increment.newFilesIncrement().newFiles());
+            deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
             
newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
             increment
                     .compactIncrement()
@@ -253,7 +256,9 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private CommitIncrement drainIncrement() {
         NewFilesIncrement newFilesIncrement =
                 new NewFilesIncrement(
-                        new ArrayList<>(newFiles), new 
ArrayList<>(newFilesChangelog));
+                        new ArrayList<>(newFiles),
+                        new ArrayList<>(deletedFiles),
+                        new ArrayList<>(newFilesChangelog));
         CompactIncrement compactIncrement =
                 new CompactIncrement(
                         new ArrayList<>(compactBefore.values()),
@@ -261,6 +266,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                         new ArrayList<>(compactChangelog));
 
         newFiles.clear();
+        deletedFiles.clear();
         newFilesChangelog.clear();
         compactBefore.clear();
         compactAfter.clear();
@@ -306,6 +312,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         // delete temporary files
         List<DataFileMeta> delete = new ArrayList<>(newFiles);
         newFiles.clear();
+        deletedFiles.clear();
 
         for (DataFileMeta file : newFilesChangelog) {
             writerFactory.deleteFile(file.fileName(), file.level());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 2b4612229..ddb11a658 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -86,7 +86,8 @@ public class FileMetaUtils {
         return new CommitMessageImpl(
                 partition,
                 0,
-                new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
+                new NewFilesIncrement(
+                        dataFileMetas, Collections.emptyList(), 
Collections.emptyList()),
                 new CompactIncrement(
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 5c6343956..3f2ad87ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -572,6 +572,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .newFilesIncrement()
                     .newFiles()
                     .forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
+            commitMessage
+                    .newFilesIncrement()
+                    .deletedFiles()
+                    .forEach(
+                            m ->
+                                    appendTableFiles.add(
+                                            makeEntry(FileKind.DELETE, 
commitMessage, m)));
             commitMessage
                     .newFilesIncrement()
                     .changelogFiles()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index da70c766d..65a647edb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -75,6 +75,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
         serializeBinaryRow(obj.partition(), view);
         view.writeInt(obj.bucket());
         
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
+        
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), 
view);
         
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), 
view);
@@ -116,6 +117,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
                 deserializeBinaryRow(view),
                 view.readInt(),
                 new NewFilesIncrement(
+                        dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view)),
                 new CompactIncrement(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 7bb11260a..0ed83d3e0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -111,7 +111,8 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         return new CommitMessageImpl(
                 partition,
                 bucket,
-                new NewFilesIncrement(Collections.emptyList(), 
Collections.emptyList()),
+                new NewFilesIncrement(
+                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
                 new CompactIncrement(
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
                 new IndexIncrement(Collections.singletonList(file)));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index ee279c097..ae01b5832 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -90,6 +90,7 @@ public class ManifestCommittableSerializerTest {
 
     public static NewFilesIncrement randomNewFilesIncrement() {
         return new NewFilesIncrement(
+                Arrays.asList(newFile(ID.incrementAndGet(), 0), 
newFile(ID.incrementAndGet(), 0)),
                 Arrays.asList(newFile(ID.incrementAndGet(), 0), 
newFile(ID.incrementAndGet(), 0)),
                 Arrays.asList(newFile(ID.incrementAndGet(), 0), 
newFile(ID.incrementAndGet(), 0)));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 0a3a8fae7..26a771732 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -302,6 +302,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(
@@ -318,6 +319,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(
@@ -333,6 +335,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(
@@ -367,6 +370,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(
@@ -383,6 +387,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(
@@ -398,6 +403,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                                                     BinaryRow.EMPTY_ROW,
                                                     0,
                                                     new NewFilesIncrement(
+                                                            
Collections.emptyList(),
                                                             
Collections.emptyList(),
                                                             
Collections.emptyList()),
                                                     new CompactIncrement(

Reply via email to