This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 75a732b4 [FLINK-29295] Clear RecordWriter slower to avoid causing 
frequent compaction conflicts
75a732b4 is described below

commit 75a732b40c868bab026f45478ce33530861407d8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 14 14:26:26 2022 +0800

    [FLINK-29295] Clear RecordWriter slower to avoid causing frequent 
compaction conflicts
    
    This closes #294
---
 .../table/store/file/mergetree/Increment.java      |  4 +++
 .../table/store/table/sink/AbstractTableWrite.java |  2 +-
 .../table/store/table/FileStoreTableTestBase.java  | 36 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 1cf74743..2f65e7ef 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -75,6 +75,10 @@ public class Increment {
         return compactAfter;
     }
 
+    public boolean isEmpty() {
+        return newFiles.isEmpty() && compactBefore.isEmpty() && 
compactAfter.isEmpty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 51b666c7..50eed358 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -99,7 +99,7 @@ public abstract class AbstractTableWrite<T> implements 
TableWrite {
                 // clear if no update
                 // we need a mechanism to clear writers, otherwise there will 
be more and more
                 // such as yesterday's partition that no longer needs to be 
written.
-                if (committable.increment().newFiles().isEmpty()) {
+                if (committable.increment().isEmpty()) {
                     writer.close();
                     bucketIter.remove();
                 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index d54f6703..74b824db 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
@@ -175,6 +176,41 @@ public abstract class FileStoreTableTestBase {
                 .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
     }
 
+    @Test
+    public void testPartitionEmptyWriter() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+
+        for (int i = 0; i < 4; i++) {
+            // write lots of records, let compaction be slower
+            for (int j = 0; j < 1000; j++) {
+                write.write(GenericRowData.of(1, 10 * i * j, 100L * i * j));
+            }
+            commit.commit(String.valueOf(i), write.prepareCommit(false));
+        }
+
+        write.write(GenericRowData.of(1, 40, 400L));
+        List<FileCommittable> commit4 = write.prepareCommit(false);
+        // trigger compaction, but not wait it.
+
+        write.write(GenericRowData.of(2, 20, 200L));
+        List<FileCommittable> commit5 = write.prepareCommit(true);
+        // wait compaction finish
+        // commit5 should be a compaction commit
+
+        write.write(GenericRowData.of(1, 60, 600L));
+        List<FileCommittable> commit6 = write.prepareCommit(true);
+        // if remove writer too fast, will see old files, do another compaction
+        // then will be conflicts
+
+        commit.commit("4", commit4);
+        commit.commit("5", commit5);
+        commit.commit("6", commit6);
+
+        write.close();
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,

Reply via email to