This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new eb9fdfb  [FLINK-25770] Delete file is not correct in MergeTreeWriter
eb9fdfb is described below

commit eb9fdfb05cf68ed641124c6b5fe5164d7fcf927f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 25 09:59:07 2022 +0800

    [FLINK-25770] Delete file is not correct in MergeTreeWriter
    
    This closes #13
---
 .../store/file/mergetree/MergeTreeWriter.java      | 41 +++++++++----
 .../table/store/file/mergetree/MergeTreeTest.java  | 69 ++++++++++++++++++----
 .../store/file/operation/TestCommitThread.java     |  3 +-
 3 files changed, 86 insertions(+), 27 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 5cb4a3d..0a857f7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -32,10 +32,13 @@ import org.apache.flink.util.CloseableIterator;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /** A {@link RecordWriter} to write records and generate {@link Increment}. */
 public class MergeTreeWriter implements RecordWriter {
@@ -56,7 +59,7 @@ public class MergeTreeWriter implements RecordWriter {
 
     private final LinkedHashSet<SstFileMeta> newFiles;
 
-    private final LinkedHashSet<SstFileMeta> compactBefore;
+    private final LinkedHashMap<String, SstFileMeta> compactBefore;
 
     private final LinkedHashSet<SstFileMeta> compactAfter;
 
@@ -80,7 +83,7 @@ public class MergeTreeWriter implements RecordWriter {
         this.sstFile = sstFile;
         this.commitForceCompact = commitForceCompact;
         this.newFiles = new LinkedHashSet<>();
-        this.compactBefore = new LinkedHashSet<>();
+        this.compactBefore = new LinkedHashMap<>();
         this.compactAfter = new LinkedHashSet<>();
     }
 
@@ -137,7 +140,7 @@ public class MergeTreeWriter implements RecordWriter {
         Increment increment =
                 new Increment(
                         new ArrayList<>(newFiles),
-                        new ArrayList<>(compactBefore),
+                        new ArrayList<>(compactBefore.values()),
                         new ArrayList<>(compactAfter));
         newFiles.clear();
         compactBefore.clear();
@@ -146,14 +149,21 @@ public class MergeTreeWriter implements RecordWriter {
     }
 
     private void updateCompactResult(CompactManager.CompactResult result) {
+        Set<String> afterFiles =
+                
result.after().stream().map(SstFileMeta::fileName).collect(Collectors.toSet());
         for (SstFileMeta file : result.before()) {
-            boolean removed = compactAfter.remove(file);
-            if (removed) {
+            if (compactAfter.remove(file)) {
                 // This is an intermediate file (not a new data file), which 
is no longer needed
-                // after compaction and can be deleted directly
-                sstFile.delete(file);
+                // after compaction and can be deleted directly, but upgrade 
file is required by
+                // previous snapshot and following snapshot, so we should 
ensure:
+                // 1. This file is not the output of upgraded.
+                // 2. This file is not the input of upgraded.
+                if (!compactBefore.containsKey(file.fileName())
+                        && !afterFiles.contains(file.fileName())) {
+                    sstFile.delete(file);
+                }
             } else {
-                compactBefore.add(file);
+                compactBefore.put(file.fileName(), file);
             }
         }
         compactAfter.addAll(result.after());
@@ -165,16 +175,21 @@ public class MergeTreeWriter implements RecordWriter {
 
     private void finishCompaction() throws ExecutionException, 
InterruptedException {
         Optional<CompactManager.CompactResult> result = 
compactManager.finishCompaction(levels);
-        if (result.isPresent()) {
-            updateCompactResult(result.get());
-        }
+        result.ifPresent(this::updateCompactResult);
     }
 
     @Override
-    public List<SstFileMeta> close() {
+    public List<SstFileMeta> close() throws Exception {
+        sync();
         // delete temporary files
         List<SstFileMeta> delete = new ArrayList<>(newFiles);
-        delete.addAll(compactAfter);
+        for (SstFileMeta file : compactAfter) {
+            // upgrade file is required by previous snapshot, so we should 
ensure that this file is
+            // not the output of upgraded.
+            if (!compactBefore.containsKey(file.fileName())) {
+                delete.add(file);
+            }
+        }
         for (SstFileMeta file : delete) {
             sstFile.delete(file);
         }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 5c00ccc..74cabd1 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -43,6 +43,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -85,9 +87,14 @@ public class MergeTreeTest {
         bucketDir.getFileSystem().mkdirs(bucketDir);
 
         comparator = Comparator.comparingInt(o -> o.getInt(0));
+        recreateWriter(1024 * 1024);
+    }
+
+    private void recreateWriter(long targetFileSize) {
         Configuration configuration = new Configuration();
         configuration.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new 
MemorySize(4096 * 3));
         configuration.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(4096));
+        configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new 
MemorySize(targetFileSize));
         MergeTreeOptions options = new MergeTreeOptions(configuration);
         sstFile =
                 new SstFile(
@@ -158,6 +165,36 @@ public class MergeTreeTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(longs = {1, 1024 * 1024})
+    public void testCloseUpgrade(long targetFileSize) throws Exception {
+        // To generate a large number of upgrade files
+        recreateWriter(targetFileSize);
+
+        List<TestRecord> expected = new ArrayList<>();
+        Random random = new Random();
+        int perBatch = 1_000;
+        Set<String> newFileNames = new HashSet<>();
+        List<SstFileMeta> compactedFiles = new ArrayList<>();
+        for (int i = 0; i < 6; i++) {
+            List<TestRecord> records = new ArrayList<>(perBatch);
+            for (int j = 0; j < perBatch; j++) {
+                records.add(
+                        new TestRecord(
+                                random.nextBoolean() ? ValueKind.ADD : 
ValueKind.DELETE,
+                                random.nextInt(perBatch / 2) - i * (perBatch / 
2),
+                                random.nextInt()));
+            }
+            writeAll(records);
+            expected.addAll(records);
+            Increment increment = writer.prepareCommit();
+            mergeCompacted(newFileNames, compactedFiles, increment);
+        }
+        writer.close();
+
+        assertRecords(expected, compactedFiles, true);
+    }
+
     @Test
     public void testWriteMany() throws Exception {
         doTestWriteRead(3, 20_000);
@@ -183,18 +220,7 @@ public class MergeTreeTest {
 
             Increment increment = writer.prepareCommit();
             newFiles.addAll(increment.newFiles());
-            
increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
-
-            // merge compacted
-            compactedFiles.addAll(increment.newFiles());
-            for (SstFileMeta file : increment.compactBefore()) {
-                boolean remove = compactedFiles.remove(file);
-                assertThat(remove).isTrue();
-                if (!newFileNames.contains(file.fileName())) {
-                    sstFile.delete(file);
-                }
-            }
-            compactedFiles.addAll(increment.compactAfter());
+            mergeCompacted(newFileNames, compactedFiles, increment);
         }
 
         // assert records from writer
@@ -218,6 +244,25 @@ public class MergeTreeTest {
         assertThat(files).isEqualTo(Collections.emptySet());
     }
 
+    private void mergeCompacted(
+            Set<String> newFileNames, List<SstFileMeta> compactedFiles, 
Increment increment) {
+        
increment.newFiles().stream().map(SstFileMeta::fileName).forEach(newFileNames::add);
+        compactedFiles.addAll(increment.newFiles());
+        Set<String> afterFiles =
+                increment.compactAfter().stream()
+                        .map(SstFileMeta::fileName)
+                        .collect(Collectors.toSet());
+        for (SstFileMeta file : increment.compactBefore()) {
+            boolean remove = compactedFiles.remove(file);
+            assertThat(remove).isTrue();
+            // See MergeTreeWriter.updateCompactResult
+            if (!newFileNames.contains(file.fileName()) && 
!afterFiles.contains(file.fileName())) {
+                sstFile.delete(file);
+            }
+        }
+        compactedFiles.addAll(increment.compactAfter());
+    }
+
     private List<TestRecord> writeBatch() throws Exception {
         return writeBatch(200);
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index f7f4c86..334acfd 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -169,11 +169,10 @@ public class TestCommitThread extends Thread {
 
         for (MergeTreeWriter writer : writers.values()) {
             try {
-                writer.sync();
+                writer.close();
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
-            writer.close();
         }
     }
 

Reply via email to