yuzhaojing commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r1062133114


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -352,182 +303,23 @@ public void commitLogCompaction(String 
logCompactionInstantTime, HoodieCommitMet
   protected void completeLogCompaction(HoodieCommitMetadata metadata,
                                        HoodieTable table,
                                        String logCompactionCommitTime) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log 
compaction write status and commit compaction");
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
     final HoodieInstant logCompactionInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(logCompactionInstant), 
Option.empty());
-      preCommit(logCompactionInstant, metadata);
-      finalizeWrite(table, logCompactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      updateTableMetadata(table, metadata, logCompactionInstant);
-      LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". 
Finished with result " + metadata);
-      CompactHelpers.getInstance().completeInflightLogCompaction(table, 
logCompactionCommitTime, metadata);
-    } finally {
-      this.txnManager.endTransaction(Option.of(logCompactionInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      
HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant
 ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION)
-      );
-    }
-    LOG.info("Log Compacted successfully on commit " + 
logCompactionCommitTime);
+    preCommit(logCompactionInstant, metadata);
+    tableServiceClient.completeLogCompaction(metadata, table, 
logCompactionCommitTime);
   }
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String 
logCompactionInstantTime, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(logCompactionInstantTime, WriteOperationType.LOG_COMPACT, 
table.getMetaClient());
-    HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    HoodieInstant inflightInstant = 
HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      LOG.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Inflight logcompaction file exists");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = 
table.logCompact(context, logCompactionInstantTime);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = 
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    if (shouldComplete && 
logCompactionMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.LOG_COMPACT, 
logCompactionMetadata.getCommitMetadata().get(), table, 
logCompactionInstantTime);
-    }
-    return logCompactionMetadata;
+    return tableServiceClient.logCompact(logCompactionInstantTime, 
shouldComplete);
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String 
clusteringInstant, boolean shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(clusteringInstant, WriteOperationType.CLUSTER, 
table.getMetaClient());
-    HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
-    HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
-    if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    clusteringTimer = metrics.getClusteringCtx();
-    LOG.info("Starting clustering at " + clusteringInstant);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = 
table.cluster(context, clusteringInstant);
-    HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = 
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
-    // Validation has to be done after cloning. if not, it could result in 
dereferencing the write status twice which means clustering could get executed 
twice.
-    validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
-    // TODO : Where is shouldComplete used ?
-    if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
-      completeTableService(TableServiceType.CLUSTER, 
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
-    }
-    return clusteringMetadata;
-  }
-
-  private void completeClustering(HoodieReplaceCommitMetadata metadata,
-                                  HoodieTable table,
-                                  String clusteringCommitTime) {
-    List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
-        e.getValue().stream()).collect(Collectors.toList());
-
-    if 
(writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) 
{
-      throw new HoodieClusteringException("Clustering failed to write to 
files:"
-          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
-    }
-
-    final HoodieInstant clusteringInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
-    try {
-      this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());
-
-      finalizeWrite(table, clusteringCommitTime, writeStats);
-      // Update table's metadata (table)
-      updateTableMetadata(table, metadata, clusteringInstant);
-
-      LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished 
with result " + metadata);
-
-      table.getActiveTimeline().transitionReplaceInflightToComplete(
-          clusteringInstant,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-    } catch (Exception e) {
-      throw new HoodieClusteringException("unable to transition clustering 
inflight to complete: " + clusteringCommitTime, e);
-    } finally {
-      this.txnManager.endTransaction(Option.of(clusteringInstant));
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
clusteringCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (clusteringTimer != null) {
-      long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
-      
HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant
 ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
-      );
-    }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
-  }
-
-  private void 
validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
-    if (clusteringMetadata.getWriteStatuses().isEmpty()) {
-      HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
-              table.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
-          .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
-              "Unable to read clustering plan for instant: " + 
clusteringCommitTime));
-      throw new HoodieClusteringException("Clustering plan produced 0 
WriteStatus for " + clusteringCommitTime
-          + " #groups: " + clusteringPlan.getInputGroups().size() + " expected 
at least "
-          + 
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
-          + " write statuses");
-    }
-  }
-
-  private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata 
commitMetadata,
-                                   HoodieInstant hoodieInstant) {
-    boolean isTableServiceAction = 
table.isTableServiceAction(hoodieInstant.getAction(), 
hoodieInstant.getTimestamp());
-    // Do not do any conflict resolution here as we do with regular writes. We 
take the lock here to ensure all writes to metadata table happens within a
-    // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
-    table.getMetadataWriter(hoodieInstant.getTimestamp())
-        .ifPresent(writer -> ((HoodieTableMetadataWriter) 
writer).update(commitMetadata, hoodieInstant.getTimestamp(), 
isTableServiceAction));
-  }
-
-  @Override
-  protected void initMetadataTable(Option<String> instantTime) {
-    // Initialize Metadata Table to make sure it's bootstrapped _before_ the 
operation,
-    // if it didn't exist before
-    // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
-    initializeMetadataTable(instantTime);
-  }
-
-  /**
-   * Initialize the metadata table if needed. Creating the metadata table 
writer
-   * will trigger the initial bootstrapping from the data table.
-   *
-   * @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
-   */
-  private void initializeMetadataTable(Option<String> 
inFlightInstantTimestamp) {
-    if (config.isMetadataTableEnabled()) {
-      HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-          context, Option.empty(), inFlightInstantTimestamp);
-      try {
-        writer.close();
-      } catch (Exception e) {
-        throw new HoodieException("Failed to instantiate Metadata table ", e);
-      }
-    }
-  }

Review Comment:
   This is a problem and I'm going to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to