This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1d15a [FLINK-26217] Introduce manifest.merge-min-count in commit
3c1d15a is described below
commit 3c1d15a794092c799c5d60a9adc432332d10223d
Author: Shen Zhu <[email protected]>
AuthorDate: Mon Feb 28 19:26:41 2022 -0800
[FLINK-26217] Introduce manifest.merge-min-count in commit
This closes #24
---
.../flink/table/store/file/FileStoreOptions.java | 18 +++++++++--
.../store/file/manifest/ManifestFileMeta.java | 20 +++++++------
.../store/file/operation/FileStoreCommitImpl.java | 3 +-
.../store/file/manifest/ManifestFileMetaTest.java | 35 ++++++++++++++++------
4 files changed, 55 insertions(+), 21 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 028b678..ea6d0d6 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -38,15 +38,29 @@ public class FileStoreOptions {
.defaultValue(MemorySize.ofMebiBytes(8))
.withDescription("Suggested file size of a manifest
file.");
+ public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT =
+ ConfigOptions.key("manifest.merge-min-count")
+ .intType()
+ .defaultValue(30)
+ .withDescription(
+ "To avoid frequent manifest merges, this parameter
specifies the minimum number "
+ + "of ManifestFileMeta to merge.");
+
public final int bucket;
public final MemorySize manifestSuggestedSize;
+ public final int manifestMergeMinCount;
- public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) {
+ public FileStoreOptions(
+ int bucket, MemorySize manifestSuggestedSize, int
manifestMergeMinCount) {
this.bucket = bucket;
this.manifestSuggestedSize = manifestSuggestedSize;
+ this.manifestMergeMinCount = manifestMergeMinCount;
}
public FileStoreOptions(ReadableConfig config) {
- this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE));
+ this(
+ config.get(BUCKET),
+ config.get(MANIFEST_TARGET_FILE_SIZE),
+ config.get(MANIFEST_MERGE_MIN_COUNT));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index d6deadd..b3ad481 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -135,19 +135,22 @@ public class ManifestFileMeta {
List<ManifestFileMeta> metas,
List<ManifestEntry> entries,
ManifestFile manifestFile,
- long suggestedMetaSize) {
+ long suggestedMetaSize,
+ int suggestedMinMetaCount) {
List<ManifestFileMeta> result = new ArrayList<>();
// these are the newly created manifest files, clean them up if
exception occurs
List<ManifestFileMeta> newMetas = new ArrayList<>();
List<ManifestFileMeta> candidate = new ArrayList<>();
long totalSize = 0;
+ int metaCount = 0;
try {
// merge existing manifests first
for (ManifestFileMeta manifest : metas) {
totalSize += manifest.fileSize;
+ metaCount += 1;
candidate.add(manifest);
- if (totalSize >= suggestedMetaSize) {
+ if (totalSize >= suggestedMetaSize || metaCount >=
suggestedMinMetaCount) {
// reach suggested file size, perform merging and produce
new file
if (candidate.size() == 1) {
result.add(candidate.get(0));
@@ -162,16 +165,15 @@ public class ManifestFileMeta {
candidate.clear();
totalSize = 0;
+ metaCount = 0;
}
}
- // merge the last bit of metas with entries
- mergeIntoOneFile(candidate, entries, manifestFile)
- .ifPresent(
- merged -> {
- newMetas.add(merged);
- result.add(merged);
- });
+ // both size and count conditions not satisfied, create new file
from entries
+ ManifestFileMeta newManifestFileMeta = manifestFile.write(entries);
+ result.addAll(candidate);
+ newMetas.add(newManifestFileMeta);
+ result.add(newManifestFileMeta);
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newMetas) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 15129b3..316b6c4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -307,7 +307,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
oldMetas,
changes,
manifestFile,
-
fileStoreOptions.manifestSuggestedSize.getBytes()));
+ fileStoreOptions.manifestSuggestedSize.getBytes(),
+ fileStoreOptions.manifestMergeMinCount));
// prepare snapshot file
manifestListName = manifestList.write(newMetas);
newSnapshot =
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index cced937..b594802 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -77,7 +77,8 @@ public class ManifestFileMetaTest {
List<ManifestFileMeta> expected = new ArrayList<>();
createData(input, entries, expected);
- List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, entries,
manifestFile, 500);
+ List<ManifestFileMeta> actual =
+ ManifestFileMeta.merge(input, entries, manifestFile, 500, 3);
assertThat(actual).hasSameSizeAs(expected);
// these three manifest files are merged from the input
@@ -85,9 +86,14 @@ public class ManifestFileMetaTest {
assertSameContent(expected.get(1), actual.get(1), manifestFile);
assertSameContent(expected.get(4), actual.get(4), manifestFile);
- // these two manifest files should be kept without modification
+ // these four manifest files should be kept without modification
assertThat(actual.get(2)).isEqualTo(input.get(5));
assertThat(actual.get(3)).isEqualTo(input.get(6));
+ assertThat(actual.get(5)).isEqualTo(input.get(10));
+ assertThat(actual.get(6)).isEqualTo(input.get(11));
+
+ // this manifest file should be created from entries
+ assertSameContent(expected.get(7), actual.get(7), manifestFile);
}
private void assertSameContent(
@@ -115,7 +121,7 @@ public class ManifestFileMetaTest {
FailingAtomicRenameFileSystem.SCHEME + "://" +
tempDir.toString());
try {
- ManifestFileMeta.merge(input, entries, failingManifestFile, 500);
+ ManifestFileMeta.merge(input, entries, failingManifestFile, 500,
30);
} catch (Throwable e) {
assertThat(e)
.hasRootCauseExactlyInstanceOf(
@@ -150,13 +156,16 @@ public class ManifestFileMetaTest {
List<ManifestFileMeta> input,
List<ManifestEntry> entries,
List<ManifestFileMeta> expected) {
- // suggested size 500
+ // suggested size 500 and suggested count 3
// file sizes:
// 200, 300, -- multiple files exactly the suggested size
// 100, 200, 300, -- multiple files exceeding the suggested size
// 500, -- single file exactly the suggested size
// 600, -- single file exceeding the suggested size
- // 100, 300 -- not enough sizes, but the last bit
+ // 100, 100, 100, -- multiple files exceeding the suggested count
+ // 100, -- the last bit, not enough size or count, won't merge
+ // 300, -- the last bit, not enough size or count, won't merge
+ // 200, -- file created from entries
input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"),
makeEntry(true, "D")));
@@ -183,10 +192,14 @@ public class ManifestFileMetaTest {
makeEntry(true, "L")));
input.add(makeManifest(makeEntry(true, "M")));
- input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N"),
makeEntry(true, "O")));
+ input.add(makeManifest(makeEntry(true, "N")));
+ input.add(makeManifest(makeEntry(true, "O")));
+
+ input.add(makeManifest(makeEntry(true, "P")));
+ input.add(makeManifest(makeEntry(false, "Q"), makeEntry(true, "R"),
makeEntry(true, "S")));
- entries.add(makeEntry(false, "O"));
- entries.add(makeEntry(true, "P"));
+ entries.add(makeEntry(false, "S"));
+ entries.add(makeEntry(true, "T"));
if (expected == null) {
return;
@@ -197,7 +210,11 @@ public class ManifestFileMetaTest {
expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true,
"F")));
expected.add(input.get(5));
expected.add(input.get(6));
- expected.add(makeManifest(makeEntry(true, "N"), makeEntry(true, "P")));
+ expected.add(
+ makeManifest(makeEntry(true, "M"), makeEntry(true, "N"),
makeEntry(true, "O")));
+ expected.add(input.get(10));
+ expected.add(input.get(11));
+ expected.add(makeManifest(makeEntry(false, "S"), makeEntry(true,
"T")));
}
private ManifestFileMeta makeManifest(ManifestEntry... entries) {