This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 1b5dc97 [hotfix] Add logs to file store compact for better tunning
1b5dc97 is described below
commit 1b5dc9784bf647252cf5cb0f283e22a075be695c
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 11 15:35:18 2022 +0800
[hotfix] Add logs to file store compact for better tunning
This closes #88
---
.../file/mergetree/compact/CompactManager.java | 75 +++++++++++++++++++++-
.../mergetree/compact/UniversalCompaction.java | 14 ++++
2 files changed, 87 insertions(+), 2 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 4487d3f..0d1c1d3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -31,12 +34,15 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
/** Manager to submit compaction task. */
public class CompactManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactManager.class);
+
private final ExecutorService executor;
private final CompactStrategy strategy;
@@ -84,6 +90,31 @@ public class CompactManager {
boolean dropDelete =
unit.outputLevel() != 0
&& unit.outputLevel() >=
levels.nonEmptyHighestLevel();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Submit compaction with files (level,
size): "
+ +
levels.levelSortedRuns().stream()
+ .flatMap(lsr ->
lsr.run().files().stream())
+ .map(
+ file ->
+
String.format(
+
"(%d, %d)",
+
file.level(),
+
file.fileSize()))
+
.collect(Collectors.joining(", ")));
+ LOG.debug(
+ "Pick these files (level, size) for
compaction: "
+ + unit.files().stream()
+ .map(
+ file ->
+
String.format(
+
"(%d, %d)",
+
file.level(),
+
file.fileSize()))
+
.collect(Collectors.joining(", ")));
+ }
+
CompactTask task = new CompactTask(unit,
dropDelete);
taskFuture = executor.submit(task);
});
@@ -131,10 +162,21 @@ public class CompactManager {
private final boolean dropDelete;
+ // metrics
+ private long rewriteInputSize;
+ private long rewriteOutputSize;
+ private int rewriteFilesNum;
+ private int upgradeFilesNum;
+
private CompactTask(CompactUnit unit, boolean dropDelete) {
this.outputLevel = unit.outputLevel();
this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
this.dropDelete = dropDelete;
+
+ this.rewriteInputSize = 0;
+ this.rewriteOutputSize = 0;
+ this.rewriteFilesNum = 0;
+ this.upgradeFilesNum = 0;
}
@Override
@@ -143,6 +185,8 @@ public class CompactManager {
}
private CompactResult compact() throws Exception {
+ long startMillis = System.currentTimeMillis();
+
List<List<SortedRun>> candidate = new ArrayList<>();
List<SstFileMeta> before = new ArrayList<>();
List<SstFileMeta> after = new ArrayList<>();
@@ -172,6 +216,20 @@ public class CompactManager {
}
}
rewrite(candidate, before, after);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Done compacting {} files to {} files in {}ms. "
+ + "Rewrite input size = {}, output size = {},
rewrite file num = {}, upgrade file num = {}",
+ before.size(),
+ after.size(),
+ System.currentTimeMillis() - startMillis,
+ rewriteInputSize,
+ rewriteOutputSize,
+ rewriteFilesNum,
+ upgradeFilesNum);
+ }
+
return result(before, after);
}
@@ -179,6 +237,7 @@ public class CompactManager {
if (file.level() != outputLevel) {
before.add(file);
after.add(file.upgrade(outputLevel));
+ upgradeFilesNum++;
}
}
@@ -200,8 +259,20 @@ public class CompactManager {
return;
}
}
- candidate.forEach(runs -> runs.forEach(run ->
before.addAll(run.files())));
- after.addAll(rewriter.rewrite(outputLevel, dropDelete, candidate));
+ candidate.forEach(
+ runs ->
+ runs.forEach(
+ run -> {
+ before.addAll(run.files());
+ rewriteInputSize +=
+ run.files().stream()
+
.mapToLong(SstFileMeta::fileSize)
+ .sum();
+ rewriteFilesNum += run.files().size();
+ }));
+ List<SstFileMeta> result = rewriter.rewrite(outputLevel,
dropDelete, candidate);
+ after.addAll(result);
+ rewriteOutputSize +=
result.stream().mapToLong(SstFileMeta::fileSize).sum();
candidate.clear();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index d37c899..d9e2a4e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Optional;
@@ -34,6 +37,8 @@ import java.util.Optional;
*/
public class UniversalCompaction implements CompactStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniversalCompaction.class);
+
private final int maxSizeAmp;
private final int sizeRatio;
private final int maxRunNum;
@@ -51,12 +56,18 @@ public class UniversalCompaction implements CompactStrategy
{
// 1 checking for reducing size amplification
CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Universal compaction due to size amplification");
+ }
return Optional.of(unit);
}
// 2 checking for size ratio
unit = pickForSizeRatio(maxLevel, runs);
if (unit != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Universal compaction due to size ratio");
+ }
return Optional.of(unit);
}
@@ -64,6 +75,9 @@ public class UniversalCompaction implements CompactStrategy {
if (runs.size() > maxRunNum) {
// compacting for file num
int candidateCount = runs.size() - maxRunNum + 1;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Universal compaction due to file num");
+ }
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs,
candidateCount));
}