leesf commented on a change in pull request #1727:
URL: https://github.com/apache/hudi/pull/1727#discussion_r439719359



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -0,0 +1,537 @@
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.embedded.AbstractEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.model.*;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.*;
+import org.apache.hudi.index.AbstractHoodieIndex;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.AbstractHoodieTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author xianghu.wang
+ * @time 2020/6/10
+ * @description
+ */
+public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload<T>, I, K, O, P> extends AbstractHoodieClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHoodieWriteClient.class);
+  private static final long serialVersionUID = 1L;
+  private final transient HoodieMetrics metrics;
+  private final transient AbstractHoodieIndex<T, I, K, O, P> index;
+  private final transient AbstractHoodieTable<T, I, K, O, P> table;
+  private transient Timer.Context writeContext = null;
+  private transient WriteOperationType operationType;
+
+  private static final String LOOKUP_STR = "lookup";
+  private final boolean rollbackPending;
+  private transient Timer.Context compactionTimer;
+
+
+  public void setOperationType(WriteOperationType operationType) {
+    this.operationType = operationType;
+  }
+
+  public WriteOperationType getOperationType() {
+    return this.operationType;
+  }
+
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
AbstractHoodieIndex<T, I, K, O, P> index,
+                                   AbstractHoodieTable<T, I, K, O, P> 
table,HoodieWriteConfig clientConfig) {
+    this(context, index, table,clientConfig, Option.empty());
+  }
+
+  protected AbstractHoodieWriteClient(HoodieEngineContext context, 
AbstractHoodieIndex<T, I, K, O, P> index,
+                                      AbstractHoodieTable<T, I, K, O, P> 
table,HoodieWriteConfig clientConfig,
+                                      Option<AbstractEmbeddedTimelineService> 
timelineServer) {
+    this(context, index, table,clientConfig, timelineServer, false);
+  }
+
+  public AbstractHoodieWriteClient(HoodieEngineContext context, 
AbstractHoodieIndex<T, I, K, O, P> index,AbstractHoodieTable<T, I, K, O, P> 
table,
+                                   HoodieWriteConfig clientConfig, 
Option<AbstractEmbeddedTimelineService> timelineServer, boolean 
rollbackPending) {
+    super(context, clientConfig, timelineServer);
+    this.index = index;
+    this.table = table;
+    this.metrics = new HoodieMetrics(config, config.getTableName());
+    this.rollbackPending = rollbackPending;
+  }
+
+  public abstract I filterExists(I hoodieRecords);
+
+  public abstract O upsert(I records, final String instantTime);
+
+  public abstract O upsertPreppedRecords(I preppedRecords, final String 
instantTime);
+
+  public abstract O insert(I records, final String instantTime);
+
+  public abstract O insertPreppedRecords(I preppedRecords, final String 
instantTime);
+
+  public O bulkInsert(I records, final String instantTime) {
+    return bulkInsert(records, instantTime, Option.empty());
+  }
+
+  public abstract O bulkInsert(I records, final String instantTime,
+                                         
Option<UserDefinedBulkInsertPartitioner<T,I>> bulkInsertPartitioner);
+
+  public abstract O bulkInsertPreppedRecords(I preppedRecords, final String 
instantTime,
+                                                       
Option<UserDefinedBulkInsertPartitioner<T,I>> bulkInsertPartitioner);
+
+  public abstract O delete(K keys, final String instantTime);
+
+  /**
+   * Common method containing steps to be performed after write 
(upsert/insert/..) operations including auto-commit.
+   * @param result  Commit Action Result
+   * @param instantTime Instant Time
+   * @param hoodieTable Hoodie Table
+   * @return Write Status
+   */
+  private O postWrite(HoodieWriteMetadata<O> result, String instantTime, 
AbstractHoodieTable<T,I,K,O,P> hoodieTable) {
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(getOperationType().name(), 
result.getIndexUpdateDuration().get().toMillis());
+    }
+    if (result.isCommitted()) {
+      // Perform post commit operations.
+      if (result.getFinalizeDuration().isPresent()) {
+        
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+            result.getWriteStats().get().size());
+      }
+
+      postCommit(result.getCommitMetadata().get(), instantTime, 
Option.empty());
+
+      emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
+          hoodieTable.getMetaClient().getCommitActionType());
+    }
+    return result.getWriteStatuses();
+  }
+
+  protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
+                            Option<Map<String, String>> extraMetadata) {
+    try {
+      // Do an inline compaction if enabled
+      if (config.isInlineCompaction()) {
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"true");
+        inlineCompact(extraMetadata);
+      } else {
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, 
"false");
+      }
+      // We cannot have unbounded commit files. Archive commits if we have to 
archive
+      HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, createMetaClient(true),table);
+      archiveLog.archiveIfRequired(hadoopConf);
+      if (config.isAutoClean()) {
+        // Call clean to cleanup if there is anything to cleanup after the 
commit,
+        LOG.info("Auto cleaning is enabled. Running cleaner now");
+        clean(instantTime);
+      } else {
+        LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Create a savepoint based on the latest commit action on the timeline.
+   *
+   * @param user - User creating the savepoint
+   * @param comment - Comment for the savepoint
+   */
+  public void savepoint(String user, String comment) {
+    if (table.getCompletedCommitsTimeline().empty()) {
+      throw new HoodieSavepointException("Could not savepoint. Commit timeline 
is empty");
+    }
+
+    String latestCommit = 
table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
+    LOG.info("Savepointing latest commit " + latestCommit);
+    savepoint(latestCommit, user, comment);
+  }
+
+  /**
+   * Savepoint a specific commit instant time. Latest version of data files as 
of the passed in instantTime
+   * will be referenced in the savepoint and will never be cleaned. The 
savepointed commit will never be rolledback or archived.
+   * <p>
+   * This gives an option to rollback the state to the savepoint anytime. 
Savepoint needs to be manually created and
+   * deleted.
+   * <p>
+   * Savepoint should be on a commit that could not have been cleaned.
+   *
+   * @param instantTime - commit that should be savepointed
+   * @param user - User creating the savepoint
+   * @param comment - Comment for the savepoint
+   */
+  public void savepoint(String instantTime, String user, String comment) {
+    table.savepoint(context, instantTime, user, comment);
+  }
+
+  /**
+   * Delete a savepoint that was created. Once the savepoint is deleted, the 
commit can be rolledback and cleaner may
+   * clean up data files.
+   *
+   * @param savepointTime - delete the savepoint
+   * @return true if the savepoint was deleted successfully
+   */
+  public abstract void deleteSavepoint(String savepointTime);
+
+  /**
+   * Restore the data to the savepoint.
+   *
+   * WARNING: This rolls back recent commits and deleted data files and also 
pending compactions after savepoint time.
+   * Queries accessing the files will mostly fail. This is expected to be a 
manual operation and no concurrent write or
+   * compaction is expected to be running
+   *
+   * @param savepointTime - savepoint time to rollback to
+   * @return true if the savepoint was restored to successfully
+   */
+  public abstract void restoreToSavepoint(String savepointTime);
+
+  /**
+   * Rollback the inflight record changes with the given commit time.
+   *
+   * @param commitInstantTime Instant time of the commit
+   * @throws HoodieRollbackException if rollback cannot be performed 
successfully
+   */
+  public boolean rollback(final String commitInstantTime) throws 
HoodieRollbackException {
+    LOG.info("Begin rollback of instant " + commitInstantTime);
+    final String rollbackInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
+    final Timer.Context timerContext = this.metrics.getRollbackCtx();
+    try {
+      Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
+          .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+          .findFirst());
+      if (commitInstantOpt.isPresent()) {
+        HoodieRollbackMetadata rollbackMetadata = table.rollback(context, 
rollbackInstantTime, commitInstantOpt.get(), true);
+        if (timerContext != null) {
+          long durationInMs = metrics.getDurationInMs(timerContext.stop());
+          metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
+        }
+        return true;
+      } else {
+        LOG.warn("Cannot find instant " + commitInstantTime + " in the 
timeline, for rollback");
+        return false;
+      }
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Failed to rollback " + 
config.getBasePath() + " commits " + commitInstantTime, e);
+    }
+  }
+
+
+  /**
+   * NOTE : This action requires all writers (ingest and compact) to a table 
to be stopped before proceeding. Revert
+   * the (inflight/committed) record changes for all commits after the 
provided instant time.
+   *
+   * @param instantTime Instant time to which restoration is requested
+   */
+  public HoodieRestoreMetadata restoreToInstant(final String instantTime) 
throws HoodieRestoreException {
+    LOG.info("Begin restore to instant " + instantTime);
+    final String restoreInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
+    Timer.Context timerContext = metrics.getRollbackCtx();
+    try {
+      HoodieRestoreMetadata restoreMetadata = table.restore(context, 
restoreInstantTime, instantTime);
+      if (timerContext != null) {
+        final long durationInMs = metrics.getDurationInMs(timerContext.stop());
+        final long totalFilesDeleted = 
restoreMetadata.getHoodieRestoreMetadata().values().stream()
+            .flatMap(Collection::stream)
+            .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
+            .sum();
+        metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+      }
+      return restoreMetadata;
+    } catch (Exception e) {
+      throw new HoodieRestoreException("Failed to restore to " + instantTime, 
e);
+    }
+  }
+
+  /**
+   * Performs a compaction operation on a table, serially before or after an 
insert/upsert action.
+   */
+  private Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
+    Option<String> compactionInstantTimeOpt = 
scheduleCompaction(extraMetadata);
+    compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
+      // inline compaction should auto commit as the user is never given 
control
+      compact(compactionInstantTime, true);
+    });
+    return compactionInstantTimeOpt;
+  }
+  /**
+   * Ensures compaction instant is in expected state and performs Compaction 
for the workload stored in instant-time.
+   *
+   * @param compactionInstantTime Compaction Instant Time
+   * @return RDD of Write Status
+   */
+  private O compact(String compactionInstantTime, boolean shouldComplete) {

Review comment:
       I think we would change it to public for users to call the API directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to