wangxianghu commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r484931707



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -716,32 +674,97 @@ private void rollbackPendingCommits() {
    * @param compactionInstantTime Compaction Instant Time
    * @return RDD of Write Status
    */
-  private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean 
shouldComplete) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      rollbackInflightCompaction(inflightInstant, table);
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    compactionTimer = metrics.getCompactionCtx();
-    HoodieWriteMetadata compactionMetadata = table.compact(jsc, 
compactionInstantTime);
-    JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
-    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeCompaction(compactionMetadata.getCommitMetadata().get(), 
statuses, table, compactionInstantTime);
-    }
-    return statuses;
-  }
+  protected abstract O compact(String compactionInstantTime, boolean 
shouldComplete);
 
   /**
    * Performs a compaction operation on a table, serially before or after an 
insert/upsert action.
    */
-  private Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
+  protected Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
     Option<String> compactionInstantTimeOpt = 
scheduleCompaction(extraMetadata);
     compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
       // inline compaction should auto commit as the user is never given 
control
       compact(compactionInstantTime, true);
     });
     return compactionInstantTimeOpt;
   }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable<T, I, K, O, P> table, String 
instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(context, instantTime, stats);
+      if (finalizeCtx != null) {
+        Option<Long> durationInMs = 
Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + 
instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
+  public HoodieMetrics getMetrics() {
+    return metrics;
+  }
+
+  public HoodieIndex<T, I, K, O, P> getIndex() {
+    return index;
+  }
+
+  /**
+   * Get HoodieTable and init {@link Timer.Context}.
+   *
+   * @param operationType write operation type
+   * @param instantTime   current inflight instant time
+   * @return HoodieTable
+   */
+  protected abstract HoodieTable<T, I, K, O, P> 
getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+
+  /**
+   * Sets write schema from last instant since deletes may not have schema set 
in the config.
+   */
+  protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+      Option<HoodieInstant> lastInstant =
+          activeTimeline.filterCompletedInstants().filter(s -> 
s.getAction().equals(metaClient.getCommitActionType()))
+              .lastInstant();
+      if (lastInstant.isPresent()) {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+            activeTimeline.getInstantDetails(lastInstant.get()).get(), 
HoodieCommitMetadata.class);
+        if 
(commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY))
 {
+          
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
+        } else {
+          throw new HoodieIOException("Latest commit does not have any schema 
in commit metadata");
+        }
+      } else {
+        throw new HoodieIOException("Deletes issued without any prior 
commits");
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException thrown while reading last 
commit metadata", e);
+    }
+  }
+
+  public abstract AsyncCleanerService 
startAsyncCleaningIfEnabled(AbstractHoodieWriteClient<T, I, K, O, P> client, 
String instantTime);
+
+  @Override
+  public void close() {

Review comment:
       > need to ensure the ordering of closing resources is the same as before/
   
   Yes, they are the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to