yanghua commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r559350017



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.config;
 
+import org.apache.hadoop.hbase.io.compress.Compression;

Review comment:
       Please revert this change.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -468,7 +477,7 @@ public String getClusteringExecutionStrategyClass() {
   public long getClusteringMaxBytesInGroup() {
     return 
Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP));
   }
-  

Review comment:
       please revert this change.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,90 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = needCompact(config.getInlineCompactType());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> checkCompact(CompactType compactType) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+            .filterCompletedInstants().lastInstant();
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
-
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
-      return new HoodieCompactionPlan();
+    if (compactType != CompactType.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      } else {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      }
     }
+    return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+  }
 
-    LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
-    HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
-    try {
-      SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
-      Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
-          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
-          .collect(Collectors.toSet());
-      // exclude files in pending clustering from compaction.
-      
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
-      return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+  public boolean needCompact(CompactType compactType) {
+    boolean compactable;
+    // return deltaCommitsSinceLastCompaction and lastCompactionTs
+    Tuple2<Integer, String> threshold = checkCompact(compactType);
+    switch (compactType) {
+      case COMMIT_NUM:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1;
+        break;
+      case TIME_ELAPSED:
+        compactable = parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_OR_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            || parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_AND_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            && parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      default:
+        throw new HoodieCompactionException("Unsupported compact type: " + 
config.getInlineCompactType());
+    }
 
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+    if (compactable) {
+      LOG.info(String.format("Scheduling compaction: %s. Delta commits found: 
%s times, and last compaction time is %s.",
+              compactType.name(), threshold._1, threshold._2));
+    } else {
+      LOG.info(String.format("Not scheduling compaction as only %s delta 
commits was found since last compaction %s."

Review comment:
       We may not tell why does not trigger compaction. It means nothing 
happened, right.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -60,17 +63,32 @@ protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
     }
-
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean numCommitEnabled = config.getInlineCompactDeltaNumCommitEnabled();
+    boolean timeEnabled = config.getInlineCompactDeltaElapsedEnabled();
+    boolean compactable;
+    if (numCommitEnabled && !timeEnabled) {
+      compactable = config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction;
+    } else if (!numCommitEnabled && timeEnabled) {
+      compactable = parseToTimestamp(lastCompactionTs) + 
config.getInlineCompactDeltaElapsedTimeMax() > parseToTimestamp(instantTime);
+    } else {
+      compactable = config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction
+          && parseToTimestamp(lastCompactionTs) + 
config.getInlineCompactDeltaElapsedTimeMax() > parseToTimestamp(instantTime);
+    }

Review comment:
       > if we call this function. it means we turn 
`ASYNC_COMPACT_ENABLE_OPT_KEY` on. Do we still need to check first?
   
   The switch of `ASYNC_COMPACT_ENABLE_OPT_KEY ` must be the pre-condition. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,90 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = needCompact(config.getInlineCompactType());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> checkCompact(CompactType compactType) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+            .filterCompletedInstants().lastInstant();
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
-
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
-      return new HoodieCompactionPlan();
+    if (compactType != CompactType.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      } else {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      }
     }
+    return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+  }
 
-    LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
-    HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
-    try {
-      SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
-      Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
-          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
-          .collect(Collectors.toSet());
-      // exclude files in pending clustering from compaction.
-      
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
-      return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+  public boolean needCompact(CompactType compactType) {
+    boolean compactable;
+    // return deltaCommitsSinceLastCompaction and lastCompactionTs
+    Tuple2<Integer, String> threshold = checkCompact(compactType);
+    switch (compactType) {
+      case COMMIT_NUM:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1;
+        break;
+      case TIME_ELAPSED:
+        compactable = parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_OR_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            || parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_AND_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            && parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      default:
+        throw new HoodieCompactionException("Unsupported compact type: " + 
config.getInlineCompactType());
+    }
 
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+    if (compactable) {
+      LOG.info(String.format("Scheduling compaction: %s. Delta commits found: 
%s times, and last compaction time is %s.",

Review comment:
       We need to describe why it caused compaction, not only some runtime 
information. e.g. add the log into the case statement?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -484,7 +493,7 @@ public long getClusteringTargetFileMaxBytes() {
   public int getTargetPartitionsForClustering() {
     return 
Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS));
   }
-  

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to