This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 75a732b4 [FLINK-29295] Clear RecordWriter slower to avoid causing
frequent compaction conflicts
75a732b4 is described below
commit 75a732b40c868bab026f45478ce33530861407d8
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 14 14:26:26 2022 +0800
[FLINK-29295] Clear RecordWriter slower to avoid causing frequent
compaction conflicts
This closes #294
---
.../table/store/file/mergetree/Increment.java | 4 +++
.../table/store/table/sink/AbstractTableWrite.java | 2 +-
.../table/store/table/FileStoreTableTestBase.java | 36 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 1cf74743..2f65e7ef 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -75,6 +75,10 @@ public class Increment {
return compactAfter;
}
+ public boolean isEmpty() {
+ return newFiles.isEmpty() && compactBefore.isEmpty() &&
compactAfter.isEmpty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 51b666c7..50eed358 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -99,7 +99,7 @@ public abstract class AbstractTableWrite<T> implements
TableWrite {
// clear if no update
// we need a mechanism to clear writers, otherwise there will
be more and more
// such as yesterday's partition that no longer needs to be
written.
- if (committable.increment().newFiles().isEmpty()) {
+ if (committable.increment().isEmpty()) {
writer.close();
bucketIter.remove();
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index d54f6703..74b824db 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.Split;
@@ -175,6 +176,41 @@ public abstract class FileStoreTableTestBase {
.hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
}
+ @Test
+ public void testPartitionEmptyWriter() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+
+ for (int i = 0; i < 4; i++) {
+ // write lots of records, let compaction be slower
+ for (int j = 0; j < 1000; j++) {
+ write.write(GenericRowData.of(1, 10 * i * j, 100L * i * j));
+ }
+ commit.commit(String.valueOf(i), write.prepareCommit(false));
+ }
+
+ write.write(GenericRowData.of(1, 40, 400L));
+ List<FileCommittable> commit4 = write.prepareCommit(false);
+ // trigger compaction, but not wait it.
+
+ write.write(GenericRowData.of(2, 20, 200L));
+ List<FileCommittable> commit5 = write.prepareCommit(true);
+ // wait compaction finish
+ // commit5 should be a compaction commit
+
+ write.write(GenericRowData.of(1, 60, 600L));
+ List<FileCommittable> commit6 = write.prepareCommit(true);
+ // if remove writer too fast, will see old files, do another compaction
+ // then will be conflicts
+
+ commit.commit("4", commit4);
+ commit.commit("5", commit5);
+ commit.commit("6", commit6);
+
+ write.close();
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,