xushiyan commented on a change in pull request #4847:
URL: https://github.com/apache/hudi/pull/4847#discussion_r809894233



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = 
LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext 
context,
                                                     HoodieWriteConfig config, 
HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException("Unable to read clustering plan for instant: " + 
instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), 
writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;

Review comment:
       extracted to `executeClustering()` in BaseCommitActionExecutor.java

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = 
LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext 
context,
                                                     HoodieWriteConfig config, 
HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException("Unable to read clustering plan for instant: " + 
instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), 
writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;
-  }
-
-  /**
-   * Validate actions taken by clustering. In the first implementation, we 
validate at least one new file is written.
-   * But we can extend this to add more validation. E.g. number of records 
read = number of records written etc.
-   * We can also make these validations in BaseCommitActionExecutor to reuse 
pre-commit hooks for multiple actions.
-   */
-  private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    if (writeMetadata.getWriteStatuses().isEmpty()) {
-      throw new HoodieClusteringException("Clustering plan produced 0 
WriteStatus for " + instantTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
at least "
-          + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }
+    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = 
executeClustering(clusteringPlan);
+    JavaRDD<WriteStatus> transformedWriteStatuses = 
HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses());
+    return writeMetadata.clone(transformedWriteStatuses);
   }
 
   @Override
   protected String getCommitActionType() {
     return HoodieTimeline.REPLACE_COMMIT_ACTION;
   }
-
-  @Override
-  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    Set<HoodieFileGroupId> newFilesWritten = 
writeMetadata.getWriteStats().get().stream()
-        .map(s -> new HoodieFileGroupId(s.getPartitionPath(), 
s.getFileId())).collect(Collectors.toSet());
-    // for the below execution strategy, new file group id would be same as 
old file group id
-    if 
(SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass()))
 {
-      return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
-          .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), 
Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
-    }
-    return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
-        .filter(fg -> !newFilesWritten.contains(fg))
-        .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), 
Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
-  }

Review comment:
       extracted to BaseCommitActionExecutor.java

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
##########
@@ -18,111 +18,48 @@
 
 package org.apache.hudi.table.action.cluster;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
-import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
 
-import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 public class SparkExecuteClusteringCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
-  private static final Logger LOG = 
LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
   private final HoodieClusteringPlan clusteringPlan;
 
   public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext 
context,
                                                     HoodieWriteConfig config, 
HoodieTable table,
                                                     String instantTime) {
     super(context, config, table, instantTime, WriteOperationType.CLUSTER);
-    this.clusteringPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
-      .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException("Unable to read clustering plan for instant: " + 
instantTime));
+    this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+        table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+        .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+            "Unable to read clustering plan for instant: " + instantTime));
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
-    // Mark instant as clustering inflight
-    table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
-    table.getMetaClient().reloadActiveTimeline();
-
-    final Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = 
((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends 
HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
-        ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
-            new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config))
-        .performClustering(clusteringPlan, schema, instantTime);
-    JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
-    JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
-    writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
-    
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
-    commitOnAutoCommit(writeMetadata);
-    if (!writeMetadata.getCommitMetadata().isPresent()) {
-      HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), 
writeMetadata.getPartitionToReplaceFileIds(),
-          extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
-      writeMetadata.setCommitMetadata(Option.of(commitMetadata));
-    }
-    return writeMetadata;
-  }
-
-  /**
-   * Validate actions taken by clustering. In the first implementation, we 
validate at least one new file is written.
-   * But we can extend this to add more validation. E.g. number of records 
read = number of records written etc.
-   * We can also make these validations in BaseCommitActionExecutor to reuse 
pre-commit hooks for multiple actions.
-   */
-  private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    if (writeMetadata.getWriteStatuses().isEmpty()) {
-      throw new HoodieClusteringException("Clustering plan produced 0 
WriteStatus for " + instantTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
at least "
-          + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }

Review comment:
       extracted to BaseCommitActionExecutor.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to