This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5dc97  [hotfix] Add logs to file store compact for better tunning
1b5dc97 is described below

commit 1b5dc9784bf647252cf5cb0f283e22a075be695c
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 11 15:35:18 2022 +0800

    [hotfix] Add logs to file store compact for better tunning
    
    This closes #88
---
 .../file/mergetree/compact/CompactManager.java     | 75 +++++++++++++++++++++-
 .../mergetree/compact/UniversalCompaction.java     | 14 ++++
 2 files changed, 87 insertions(+), 2 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 4487d3f..0d1c1d3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -23,6 +23,9 @@ import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -31,12 +34,15 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 
 /** Manager to submit compaction task. */
 public class CompactManager {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactManager.class);
+
     private final ExecutorService executor;
 
     private final CompactStrategy strategy;
@@ -84,6 +90,31 @@ public class CompactManager {
                             boolean dropDelete =
                                     unit.outputLevel() != 0
                                             && unit.outputLevel() >= 
levels.nonEmptyHighestLevel();
+
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(
+                                        "Submit compaction with files (level, 
size): "
+                                                + 
levels.levelSortedRuns().stream()
+                                                        .flatMap(lsr -> 
lsr.run().files().stream())
+                                                        .map(
+                                                                file ->
+                                                                        
String.format(
+                                                                               
 "(%d, %d)",
+                                                                               
 file.level(),
+                                                                               
 file.fileSize()))
+                                                        
.collect(Collectors.joining(", ")));
+                                LOG.debug(
+                                        "Pick these files (level, size) for 
compaction: "
+                                                + unit.files().stream()
+                                                        .map(
+                                                                file ->
+                                                                        
String.format(
+                                                                               
 "(%d, %d)",
+                                                                               
 file.level(),
+                                                                               
 file.fileSize()))
+                                                        
.collect(Collectors.joining(", ")));
+                            }
+
                             CompactTask task = new CompactTask(unit, 
dropDelete);
                             taskFuture = executor.submit(task);
                         });
@@ -131,10 +162,21 @@ public class CompactManager {
 
         private final boolean dropDelete;
 
+        // metrics
+        private long rewriteInputSize;
+        private long rewriteOutputSize;
+        private int rewriteFilesNum;
+        private int upgradeFilesNum;
+
         private CompactTask(CompactUnit unit, boolean dropDelete) {
             this.outputLevel = unit.outputLevel();
             this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
             this.dropDelete = dropDelete;
+
+            this.rewriteInputSize = 0;
+            this.rewriteOutputSize = 0;
+            this.rewriteFilesNum = 0;
+            this.upgradeFilesNum = 0;
         }
 
         @Override
@@ -143,6 +185,8 @@ public class CompactManager {
         }
 
         private CompactResult compact() throws Exception {
+            long startMillis = System.currentTimeMillis();
+
             List<List<SortedRun>> candidate = new ArrayList<>();
             List<SstFileMeta> before = new ArrayList<>();
             List<SstFileMeta> after = new ArrayList<>();
@@ -172,6 +216,20 @@ public class CompactManager {
                 }
             }
             rewrite(candidate, before, after);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Done compacting {} files to {} files in {}ms. "
+                                + "Rewrite input size = {}, output size = {}, 
rewrite file num = {}, upgrade file num = {}",
+                        before.size(),
+                        after.size(),
+                        System.currentTimeMillis() - startMillis,
+                        rewriteInputSize,
+                        rewriteOutputSize,
+                        rewriteFilesNum,
+                        upgradeFilesNum);
+            }
+
             return result(before, after);
         }
 
@@ -179,6 +237,7 @@ public class CompactManager {
             if (file.level() != outputLevel) {
                 before.add(file);
                 after.add(file.upgrade(outputLevel));
+                upgradeFilesNum++;
             }
         }
 
@@ -200,8 +259,20 @@ public class CompactManager {
                     return;
                 }
             }
-            candidate.forEach(runs -> runs.forEach(run -> 
before.addAll(run.files())));
-            after.addAll(rewriter.rewrite(outputLevel, dropDelete, candidate));
+            candidate.forEach(
+                    runs ->
+                            runs.forEach(
+                                    run -> {
+                                        before.addAll(run.files());
+                                        rewriteInputSize +=
+                                                run.files().stream()
+                                                        
.mapToLong(SstFileMeta::fileSize)
+                                                        .sum();
+                                        rewriteFilesNum += run.files().size();
+                                    }));
+            List<SstFileMeta> result = rewriter.rewrite(outputLevel, 
dropDelete, candidate);
+            after.addAll(result);
+            rewriteOutputSize += 
result.stream().mapToLong(SstFileMeta::fileSize).sum();
             candidate.clear();
         }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index d37c899..d9e2a4e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -34,6 +37,8 @@ import java.util.Optional;
  */
 public class UniversalCompaction implements CompactStrategy {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniversalCompaction.class);
+
     private final int maxSizeAmp;
     private final int sizeRatio;
     private final int maxRunNum;
@@ -51,12 +56,18 @@ public class UniversalCompaction implements CompactStrategy 
{
         // 1 checking for reducing size amplification
         CompactUnit unit = pickForSizeAmp(maxLevel, runs);
         if (unit != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Universal compaction due to size amplification");
+            }
             return Optional.of(unit);
         }
 
         // 2 checking for size ratio
         unit = pickForSizeRatio(maxLevel, runs);
         if (unit != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Universal compaction due to size ratio");
+            }
             return Optional.of(unit);
         }
 
@@ -64,6 +75,9 @@ public class UniversalCompaction implements CompactStrategy {
         if (runs.size() > maxRunNum) {
             // compacting for file num
             int candidateCount = runs.size() - maxRunNum + 1;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Universal compaction due to file num");
+            }
             return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, 
candidateCount));
         }
 

Reply via email to