This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3d6c5fcba [core] add deletedFiles in NewFilesIncrement (#3117)
3d6c5fcba is described below
commit 3d6c5fcba939757f53c38bf43ffd9cc7d699a65c
Author: Yann Byron <[email protected]>
AuthorDate: Fri Mar 29 14:39:03 2024 +0800
[core] add deletedFiles in NewFilesIncrement (#3117)
---
.../java/org/apache/paimon/append/AppendOnlyWriter.java | 9 ++++++++-
.../main/java/org/apache/paimon/io/NewFilesIncrement.java | 14 ++++++++++++--
.../java/org/apache/paimon/mergetree/MergeTreeWriter.java | 9 ++++++++-
.../main/java/org/apache/paimon/migrate/FileMetaUtils.java | 3 ++-
.../org/apache/paimon/operation/FileStoreCommitImpl.java | 7 +++++++
.../apache/paimon/table/sink/CommitMessageSerializer.java | 2 ++
.../org/apache/paimon/index/HashBucketAssignerTest.java | 3 ++-
.../paimon/manifest/ManifestCommittableSerializerTest.java | 1 +
.../apache/paimon/flink/sink/CommitterOperatorTest.java | 6 ++++++
9 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index f61083f11..544e766ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -66,6 +66,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final CompactManager compactManager;
private final boolean forceCompact;
private final List<DataFileMeta> newFiles;
+ private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
@@ -101,6 +102,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.compactManager = compactManager;
this.forceCompact = forceCompact;
this.newFiles = new ArrayList<>();
+ this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
@@ -113,6 +115,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
+ deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
@@ -233,7 +236,10 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
- new NewFilesIncrement(new ArrayList<>(newFiles),
Collections.emptyList());
+ new NewFilesIncrement(
+ new ArrayList<>(newFiles),
+ new ArrayList<>(deletedFiles),
+ Collections.emptyList());
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore),
@@ -241,6 +247,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
Collections.emptyList());
newFiles.clear();
+ deletedFiles.clear();
compactBefore.clear();
compactAfter.clear();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
index 4e980bd31..b2f63070d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/NewFilesIncrement.java
@@ -27,21 +27,31 @@ import java.util.stream.Collectors;
public class NewFilesIncrement {
private final List<DataFileMeta> newFiles;
+ private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;
- public NewFilesIncrement(List<DataFileMeta> newFiles, List<DataFileMeta>
changelogFiles) {
+ public NewFilesIncrement(
+ List<DataFileMeta> newFiles,
+ List<DataFileMeta> deletedFiles,
+ List<DataFileMeta> changelogFiles) {
this.newFiles = newFiles;
+ this.deletedFiles = deletedFiles;
this.changelogFiles = changelogFiles;
}
public static NewFilesIncrement emptyIncrement() {
- return new NewFilesIncrement(Collections.emptyList(),
Collections.emptyList());
+ return new NewFilesIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
}
public List<DataFileMeta> newFiles() {
return newFiles;
}
+ public List<DataFileMeta> deletedFiles() {
+ return deletedFiles;
+ }
+
public List<DataFileMeta> changelogFiles() {
return changelogFiles;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 537d838d0..1c94a2108 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -69,6 +69,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Nullable private final FieldsComparator userDefinedSeqComparator;
private final LinkedHashSet<DataFileMeta> newFiles;
+ private final LinkedHashSet<DataFileMeta> deletedFiles;
private final LinkedHashSet<DataFileMeta> newFilesChangelog;
private final LinkedHashMap<String, DataFileMeta> compactBefore;
private final LinkedHashSet<DataFileMeta> compactAfter;
@@ -107,12 +108,14 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.newFiles = new LinkedHashSet<>();
+ this.deletedFiles = new LinkedHashSet<>();
this.newFilesChangelog = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
this.compactChangelog = new LinkedHashSet<>();
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
+ deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
newFilesChangelog.addAll(increment.newFilesIncrement().changelogFiles());
increment
.compactIncrement()
@@ -253,7 +256,9 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private CommitIncrement drainIncrement() {
NewFilesIncrement newFilesIncrement =
new NewFilesIncrement(
- new ArrayList<>(newFiles), new
ArrayList<>(newFilesChangelog));
+ new ArrayList<>(newFiles),
+ new ArrayList<>(deletedFiles),
+ new ArrayList<>(newFilesChangelog));
CompactIncrement compactIncrement =
new CompactIncrement(
new ArrayList<>(compactBefore.values()),
@@ -261,6 +266,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
new ArrayList<>(compactChangelog));
newFiles.clear();
+ deletedFiles.clear();
newFilesChangelog.clear();
compactBefore.clear();
compactAfter.clear();
@@ -306,6 +312,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
// delete temporary files
List<DataFileMeta> delete = new ArrayList<>(newFiles);
newFiles.clear();
+ deletedFiles.clear();
for (DataFileMeta file : newFilesChangelog) {
writerFactory.deleteFile(file.fileName(), file.level());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 2b4612229..ddb11a658 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -86,7 +86,8 @@ public class FileMetaUtils {
return new CommitMessageImpl(
partition,
0,
- new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
+ new NewFilesIncrement(
+ dataFileMetas, Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 5c6343956..3f2ad87ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -572,6 +572,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.newFilesIncrement()
.newFiles()
.forEach(m -> appendTableFiles.add(makeEntry(FileKind.ADD,
commitMessage, m)));
+ commitMessage
+ .newFilesIncrement()
+ .deletedFiles()
+ .forEach(
+ m ->
+ appendTableFiles.add(
+ makeEntry(FileKind.DELETE,
commitMessage, m)));
commitMessage
.newFilesIncrement()
.changelogFiles()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index da70c766d..65a647edb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -75,6 +75,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
serializeBinaryRow(obj.partition(), view);
view.writeInt(obj.bucket());
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
+
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(),
view);
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(),
view);
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(),
view);
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(),
view);
@@ -116,6 +117,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
deserializeBinaryRow(view),
view.readInt(),
new NewFilesIncrement(
+ dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
new CompactIncrement(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 7bb11260a..0ed83d3e0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -111,7 +111,8 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
return new CommitMessageImpl(
partition,
bucket,
- new NewFilesIncrement(Collections.emptyList(),
Collections.emptyList()),
+ new NewFilesIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
new IndexIncrement(Collections.singletonList(file)));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index ee279c097..ae01b5832 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -90,6 +90,7 @@ public class ManifestCommittableSerializerTest {
public static NewFilesIncrement randomNewFilesIncrement() {
return new NewFilesIncrement(
+ Arrays.asList(newFile(ID.incrementAndGet(), 0),
newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0),
newFile(ID.incrementAndGet(), 0)),
Arrays.asList(newFile(ID.incrementAndGet(), 0),
newFile(ID.incrementAndGet(), 0)));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 0a3a8fae7..26a771732 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -302,6 +302,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -318,6 +319,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -333,6 +335,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -367,6 +370,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -383,6 +387,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(
@@ -398,6 +403,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
BinaryRow.EMPTY_ROW,
0,
new NewFilesIncrement(
+
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()),
new CompactIncrement(