This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new eb9fdfb [FLINK-25770] Delete file is not correct in MergeTreeWriter
eb9fdfb is described below
commit eb9fdfb05cf68ed641124c6b5fe5164d7fcf927f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 25 09:59:07 2022 +0800
[FLINK-25770] Delete file is not correct in MergeTreeWriter
This closes #13
---
.../store/file/mergetree/MergeTreeWriter.java | 41 +++++++++----
.../table/store/file/mergetree/MergeTreeTest.java | 69 ++++++++++++++++++----
.../store/file/operation/TestCommitThread.java | 3 +-
3 files changed, 86 insertions(+), 27 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 5cb4a3d..0a857f7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -32,10 +32,13 @@ import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
/** A {@link RecordWriter} to write records and generate {@link Increment}. */
public class MergeTreeWriter implements RecordWriter {
@@ -56,7 +59,7 @@ public class MergeTreeWriter implements RecordWriter {
private final LinkedHashSet<SstFileMeta> newFiles;
- private final LinkedHashSet<SstFileMeta> compactBefore;
+ private final LinkedHashMap<String, SstFileMeta> compactBefore;
private final LinkedHashSet<SstFileMeta> compactAfter;
@@ -80,7 +83,7 @@ public class MergeTreeWriter implements RecordWriter {
this.sstFile = sstFile;
this.commitForceCompact = commitForceCompact;
this.newFiles = new LinkedHashSet<>();
- this.compactBefore = new LinkedHashSet<>();
+ this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
}
@@ -137,7 +140,7 @@ public class MergeTreeWriter implements RecordWriter {
Increment increment =
new Increment(
new ArrayList<>(newFiles),
- new ArrayList<>(compactBefore),
+ new ArrayList<>(compactBefore.values()),
new ArrayList<>(compactAfter));
newFiles.clear();
compactBefore.clear();
@@ -146,14 +149,21 @@ public class MergeTreeWriter implements RecordWriter {
}
private void updateCompactResult(CompactManager.CompactResult result) {
+ Set<String> afterFiles =
+
result.after().stream().map(SstFileMeta::fileName).collect(Collectors.toSet());
for (SstFileMeta file : result.before()) {
- boolean removed = compactAfter.remove(file);
- if (removed) {
+ if (compactAfter.remove(file)) {
// This is an intermediate file (not a new data file), which
is no longer needed
- // after compaction and can be deleted directly
- sstFile.delete(file);
+ // after compaction and can be deleted directly, but upgrade
file is required by
+ // previous snapshot and following snapshot, so we should
ensure:
+ // 1. This file is not the output of upgraded.
+ // 2. This file is not the input of upgraded.
+ if (!compactBefore.containsKey(file.fileName())
+ && !afterFiles.contains(file.fileName())) {
+ sstFile.delete(file);
+ }
} else {
- compactBefore.add(file);
+ compactBefore.put(file.fileName(), file);
}
}
compactAfter.addAll(result.after());
@@ -165,16 +175,21 @@ public class MergeTreeWriter implements RecordWriter {
private void finishCompaction() throws ExecutionException,
InterruptedException {
Optional<CompactManager.CompactResult> result =
compactManager.finishCompaction(levels);
- if (result.isPresent()) {
- updateCompactResult(result.get());
- }
+ result.ifPresent(this::updateCompactResult);
}
@Override
- public List<SstFileMeta> close() {
+ public List<SstFileMeta> close() throws Exception {
+ sync();
// delete temporary files
List<SstFileMeta> delete = new ArrayList<>(newFiles);
- delete.addAll(compactAfter);
+ for (SstFileMeta file : compactAfter) {
+ // upgrade file is required by previous snapshot, so we should
ensure that this file is
+ // not the output of upgraded.
+ if (!compactBefore.containsKey(file.fileName())) {
+ delete.add(file);
+ }
+ }
for (SstFileMeta file : delete) {
sstFile.delete(file);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 5c00ccc..74cabd1 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -43,6 +43,8 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -85,9 +87,14 @@ public class MergeTreeTest {
bucketDir.getFileSystem().mkdirs(bucketDir);
comparator = Comparator.comparingInt(o -> o.getInt(0));
+ recreateWriter(1024 * 1024);
+ }
+
+ private void recreateWriter(long targetFileSize) {
Configuration configuration = new Configuration();
configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new
MemorySize(4096 * 3));
configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
+ configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new
MemorySize(targetFileSize));
MergeTreeOptions options = new MergeTreeOptions(configuration);
sstFile =
new SstFile(
@@ -158,6 +165,36 @@ public class MergeTreeTest {
}
}
+ @ParameterizedTest
+ @ValueSource(longs = {1, 1024 * 1024})
+ public void testCloseUpgrade(long targetFileSize) throws Exception {
+ // To generate a large number of upgrade files
+ recreateWriter(targetFileSize);
+
+ List<TestRecord> expected = new ArrayList<>();
+ Random random = new Random();
+ int perBatch = 1_000;
+ Set<String> newFileNames = new HashSet<>();
+ List<SstFileMeta> compactedFiles = new ArrayList<>();
+ for (int i = 0; i < 6; i++) {
+ List<TestRecord> records = new ArrayList<>(perBatch);
+ for (int j = 0; j < perBatch; j++) {
+ records.add(
+ new TestRecord(
+ random.nextBoolean() ? ValueKind.ADD :
ValueKind.DELETE,
+ random.nextInt(perBatch / 2) - i * (perBatch /
2),
+ random.nextInt()));
+ }
+ writeAll(records);
+ expected.addAll(records);
+ Increment increment = writer.prepareCommit();
+ mergeCompacted(newFileNames, compactedFiles, increment);
+ }
+ writer.close();
+
+ assertRecords(expected, compactedFiles, true);
+ }
+
@Test
public void testWriteMany() throws Exception {
doTestWriteRead(3, 20_000);
@@ -183,18 +220,7 @@ public class MergeTreeTest {
Increment increment = writer.prepareCommit();
newFiles.addAll(increment.newFiles());
-
increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
-
- // merge compacted
- compactedFiles.addAll(increment.newFiles());
- for (SstFileMeta file : increment.compactBefore()) {
- boolean remove = compactedFiles.remove(file);
- assertThat(remove).isTrue();
- if (!newFileNames.contains(file.fileName())) {
- sstFile.delete(file);
- }
- }
- compactedFiles.addAll(increment.compactAfter());
+ mergeCompacted(newFileNames, compactedFiles, increment);
}
// assert records from writer
@@ -218,6 +244,25 @@ public class MergeTreeTest {
assertThat(files).isEqualTo(Collections.emptySet());
}
+ private void mergeCompacted(
+ Set<String> newFileNames, List<SstFileMeta> compactedFiles,
Increment increment) {
+
increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
+ compactedFiles.addAll(increment.newFiles());
+ Set<String> afterFiles =
+ increment.compactAfter().stream()
+ .map(SstFileMeta::fileName)
+ .collect(Collectors.toSet());
+ for (SstFileMeta file : increment.compactBefore()) {
+ boolean remove = compactedFiles.remove(file);
+ assertThat(remove).isTrue();
+ // See MergeTreeWriter.updateCompactResult
+ if (!newFileNames.contains(file.fileName()) &&
!afterFiles.contains(file.fileName())) {
+ sstFile.delete(file);
+ }
+ }
+ compactedFiles.addAll(increment.compactAfter());
+ }
+
private List<TestRecord> writeBatch() throws Exception {
return writeBatch(200);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index f7f4c86..334acfd 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -169,11 +169,10 @@ public class TestCommitThread extends Thread {
for (MergeTreeWriter writer : writers.values()) {
try {
- writer.sync();
+ writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
- writer.close();
}
}