xushiyan commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r730342767



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
##########
@@ -19,65 +19,65 @@
 package org.apache.hudi.table.action.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.spark.api.java.JavaRDD;
-
 import java.io.IOException;
 import java.util.List;
 
 @SuppressWarnings("checkstyle:LineLength")
-public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload> 
extends
-    BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> {
+public class RunCompactionActionExecutor<T extends HoodieRecordPayload> extends
+    BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
+
+  private final AbstractHoodieWriteClient writeClient;
+  private final HoodieCompactor compactor;
+  private final HoodieCopyOnWriteTableOperation copyOnWriteTableOperation;
 
-  public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context,
-                                          HoodieWriteConfig config,
-                                          HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
-                                          String instantTime) {
+  public RunCompactionActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable table,
+                                     String instantTime,
+                                     AbstractHoodieWriteClient writeClient,
+                                     HoodieCompactor compactor,
+                                     HoodieCopyOnWriteTableOperation 
copyOnWriteTableOperation) {
     super(context, config, table, instantTime);
+    this.writeClient = writeClient;
+    this.compactor = compactor;
+    this.copyOnWriteTableOperation = copyOnWriteTableOperation;
   }
 
   @Override
-  public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(instantTime);
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    if (!pendingCompactionTimeline.containsInstant(instant)) {
-      throw new IllegalStateException(
-          "No Compaction request available at " + instantTime + " to run 
compaction");
-    }
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    compactor.checkCompactionTimeline(table, instantTime, writeClient);
 
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new 
HoodieWriteMetadata<>();
+    HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new 
HoodieWriteMetadata<>();
     try {
-      HoodieActiveTimeline timeline = table.getActiveTimeline();
+      // generate compaction plan
+      // should support configurable commit metadata
       HoodieCompactionPlan compactionPlan =
           CompactionUtils.getCompactionPlan(table.getMetaClient(), 
instantTime);
-      // Mark instant as compaction inflight
-      timeline.transitionCompactionRequestedToInflight(instant);
-      table.getMetaClient().reloadActiveTimeline();
 
-      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
-      JavaRDD<WriteStatus> statuses = compactor.compact(context, 
compactionPlan, table, config, instantTime);
+      HoodieData<WriteStatus> statuses = compactor.compact(
+          context, compactionPlan, table, config, instantTime, 
copyOnWriteTableOperation);
 
-      
statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+      statuses.persist(config.getProps());

Review comment:
       should it be passing storage level config value? `persist()` taking a 
string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to