This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1d15a  [FLINK-26217] Introduce manifest.merge-min-count in commit
3c1d15a is described below

commit 3c1d15a794092c799c5d60a9adc432332d10223d
Author: Shen Zhu <[email protected]>
AuthorDate: Mon Feb 28 19:26:41 2022 -0800

    [FLINK-26217] Introduce manifest.merge-min-count in commit
    
    This closes #24
---
 .../flink/table/store/file/FileStoreOptions.java   | 18 +++++++++--
 .../store/file/manifest/ManifestFileMeta.java      | 20 +++++++------
 .../store/file/operation/FileStoreCommitImpl.java  |  3 +-
 .../store/file/manifest/ManifestFileMetaTest.java  | 35 ++++++++++++++++------
 4 files changed, 55 insertions(+), 21 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 028b678..ea6d0d6 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -38,15 +38,29 @@ public class FileStoreOptions {
                     .defaultValue(MemorySize.ofMebiBytes(8))
                     .withDescription("Suggested file size of a manifest 
file.");
 
+    public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
+            ConfigOptions.key("manifest.merge-min-count")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription(
+                            "To avoid frequent manifest merges, this parameter 
specifies the minimum number "
+                                    + "of ManifestFileMeta to merge.");
+
     public final int bucket;
     public final MemorySize manifestSuggestedSize;
+    public final int manifestMergeMinCount;
 
-    public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) {
+    public FileStoreOptions(
+            int bucket, MemorySize manifestSuggestedSize, int 
manifestMergeMinCount) {
         this.bucket = bucket;
         this.manifestSuggestedSize = manifestSuggestedSize;
+        this.manifestMergeMinCount = manifestMergeMinCount;
     }
 
     public FileStoreOptions(ReadableConfig config) {
-        this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE));
+        this(
+                config.get(BUCKET),
+                config.get(MANIFEST_TARGET_FILE_SIZE),
+                config.get(MANIFEST_MERGE_MIN_COUNT));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index d6deadd..b3ad481 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -135,19 +135,22 @@ public class ManifestFileMeta {
             List<ManifestFileMeta> metas,
             List<ManifestEntry> entries,
             ManifestFile manifestFile,
-            long suggestedMetaSize) {
+            long suggestedMetaSize,
+            int suggestedMinMetaCount) {
         List<ManifestFileMeta> result = new ArrayList<>();
         // these are the newly created manifest files, clean them up if 
exception occurs
         List<ManifestFileMeta> newMetas = new ArrayList<>();
         List<ManifestFileMeta> candidate = new ArrayList<>();
         long totalSize = 0;
+        int metaCount = 0;
 
         try {
             // merge existing manifests first
             for (ManifestFileMeta manifest : metas) {
                 totalSize += manifest.fileSize;
+                metaCount += 1;
                 candidate.add(manifest);
-                if (totalSize >= suggestedMetaSize) {
+                if (totalSize >= suggestedMetaSize || metaCount >= 
suggestedMinMetaCount) {
                     // reach suggested file size, perform merging and produce 
new file
                     if (candidate.size() == 1) {
                         result.add(candidate.get(0));
@@ -162,16 +165,15 @@ public class ManifestFileMeta {
 
                     candidate.clear();
                     totalSize = 0;
+                    metaCount = 0;
                 }
             }
 
-            // merge the last bit of metas with entries
-            mergeIntoOneFile(candidate, entries, manifestFile)
-                    .ifPresent(
-                            merged -> {
-                                newMetas.add(merged);
-                                result.add(merged);
-                            });
+            // both size and count conditions not satisfied, create new file 
from entries
+            ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
+            result.addAll(candidate);
+            newMetas.add(newManifestFileMeta);
+            result.add(newManifestFileMeta);
         } catch (Throwable e) {
             // exception occurs, clean up and rethrow
             for (ManifestFileMeta manifest : newMetas) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 15129b3..316b6c4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -307,7 +307,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             oldMetas,
                             changes,
                             manifestFile,
-                            
fileStoreOptions.manifestSuggestedSize.getBytes()));
+                            fileStoreOptions.manifestSuggestedSize.getBytes(),
+                            fileStoreOptions.manifestMergeMinCount));
             // prepare snapshot file
             manifestListName = manifestList.write(newMetas);
             newSnapshot =
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index cced937..b594802 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -77,7 +77,8 @@ public class ManifestFileMetaTest {
         List<ManifestFileMeta> expected = new ArrayList<>();
         createData(input, entries, expected);
 
-        List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, entries, 
manifestFile, 500);
+        List<ManifestFileMeta> actual =
+                ManifestFileMeta.merge(input, entries, manifestFile, 500, 3);
         assertThat(actual).hasSameSizeAs(expected);
 
         // these three manifest files are merged from the input
@@ -85,9 +86,14 @@ public class ManifestFileMetaTest {
         assertSameContent(expected.get(1), actual.get(1), manifestFile);
         assertSameContent(expected.get(4), actual.get(4), manifestFile);
 
-        // these two manifest files should be kept without modification
+        // these four manifest files should be kept without modification
         assertThat(actual.get(2)).isEqualTo(input.get(5));
         assertThat(actual.get(3)).isEqualTo(input.get(6));
+        assertThat(actual.get(5)).isEqualTo(input.get(10));
+        assertThat(actual.get(6)).isEqualTo(input.get(11));
+
+        // this manifest file should be created from entries
+        assertSameContent(expected.get(7), actual.get(7), manifestFile);
     }
 
     private void assertSameContent(
@@ -115,7 +121,7 @@ public class ManifestFileMetaTest {
                         FailingAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString());
 
         try {
-            ManifestFileMeta.merge(input, entries, failingManifestFile, 500);
+            ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 
30);
         } catch (Throwable e) {
             assertThat(e)
                     .hasRootCauseExactlyInstanceOf(
@@ -150,13 +156,16 @@ public class ManifestFileMetaTest {
             List<ManifestFileMeta> input,
             List<ManifestEntry> entries,
             List<ManifestFileMeta> expected) {
-        // suggested size 500
+        // suggested size 500 and suggested count 3
         // file sizes:
         // 200, 300, -- multiple files exactly the suggested size
         // 100, 200, 300, -- multiple files exceeding the suggested size
         // 500, -- single file exactly the suggested size
         // 600, -- single file exceeding the suggested size
-        // 100, 300 -- not enough sizes, but the last bit
+        // 100, 100, 100, -- multiple files exceeding the suggested count
+        // 100, -- the last bit, not enough size or count, won't merge
+        // 300, -- the last bit, not enough size or count, won't merge
+        // 200, -- file created from entries
 
         input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
         input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), 
makeEntry(true, "D")));
@@ -183,10 +192,14 @@ public class ManifestFileMetaTest {
                         makeEntry(true, "L")));
 
         input.add(makeManifest(makeEntry(true, "M")));
-        input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N"), 
makeEntry(true, "O")));
+        input.add(makeManifest(makeEntry(true, "N")));
+        input.add(makeManifest(makeEntry(true, "O")));
+
+        input.add(makeManifest(makeEntry(true, "P")));
+        input.add(makeManifest(makeEntry(false, "Q"), makeEntry(true, "R"), 
makeEntry(true, "S")));
 
-        entries.add(makeEntry(false, "O"));
-        entries.add(makeEntry(true, "P"));
+        entries.add(makeEntry(false, "S"));
+        entries.add(makeEntry(true, "T"));
 
         if (expected == null) {
             return;
@@ -197,7 +210,11 @@ public class ManifestFileMetaTest {
         expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, 
"F")));
         expected.add(input.get(5));
         expected.add(input.get(6));
-        expected.add(makeManifest(makeEntry(true, "N"), makeEntry(true, "P")));
+        expected.add(
+                makeManifest(makeEntry(true, "M"), makeEntry(true, "N"), 
makeEntry(true, "O")));
+        expected.add(input.get(10));
+        expected.add(input.get(11));
+        expected.add(makeManifest(makeEntry(false, "S"), makeEntry(true, 
"T")));
     }
 
     private ManifestFileMeta makeManifest(ManifestEntry... entries) {

Reply via email to