vinothchandar commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r569733140



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -46,6 +47,8 @@
   public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
   // Run a compaction every N delta commits
   public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = 
"hoodie.compact.inline.max.delta.commits";
+  public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = 
"hoodie.compact.inline.max.delta.seconds";

Review comment:
       please add comments/java docs explaining what this controls 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+public enum CompactionTriggerStrategy {
+    NUM, TIME_ELAPSED, NUM_AND_TIME, NUM_OR_TIME

Review comment:
       rename to `NUM_COMMITS` and add a line of description for each?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,97 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> 
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();

Review comment:
       I understand this is how it was. but overloading `lastCompactionTs` with 
the first delta commit and reusing this again is hard to grok. Can we atleast 
rename `lastCompactionTs` -> `latestInstantTs` or something more generic

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,97 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());

Review comment:
       IIUC this block is just moved, no changes to code here within the if 
block?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,97 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> 
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
+    if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      } else {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      }
+    }
+    return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);

Review comment:
       can we use `Pair` instead of Tuple?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
##########
@@ -90,6 +90,20 @@ public static String createNewInstantTime() {
     });
   }
 
+  /**
+   * Returns next instant time that adds milliseconds in the {@link 
#COMMIT_FORMATTER} format.
+   * Ensures each instant time is atleast 1 second apart since we create 
instant times at second granularity
+   */
+  public static String createNewInstantTime(long milliseconds) {

Review comment:
       can this call the method above or otherwise and reduce the code 
duplication?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,97 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> 
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
+    if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {

Review comment:
       can we always compute `deltaCommitsSinceLastCompaction` regardless of 
strategy. it should be a cheap in-memory operation. then we can merge these two 
blocks back together




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to