leesf commented on a change in pull request #1727:
URL: https://github.com/apache/hudi/pull/1727#discussion_r439719359
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -0,0 +1,537 @@
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.embedded.AbstractEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.model.*;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.*;
+import org.apache.hudi.index.AbstractHoodieIndex;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.AbstractHoodieTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author xianghu.wang
+ * @time 2020/6/10
+ * @description
+ */
+public abstract class AbstractHoodieWriteClient<T extends
HoodieRecordPayload<T>, I, K, O, P> extends AbstractHoodieClient {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractHoodieWriteClient.class);
+ private static final long serialVersionUID = 1L;
+ private final transient HoodieMetrics metrics;
+ private final transient AbstractHoodieIndex<T, I, K, O, P> index;
+ private final transient AbstractHoodieTable<T, I, K, O, P> table;
+ private transient Timer.Context writeContext = null;
+ private transient WriteOperationType operationType;
+
+ private static final String LOOKUP_STR = "lookup";
+ private final boolean rollbackPending;
+ private transient Timer.Context compactionTimer;
+
+
+ public void setOperationType(WriteOperationType operationType) {
+ this.operationType = operationType;
+ }
+
+ public WriteOperationType getOperationType() {
+ return this.operationType;
+ }
+
+ public AbstractHoodieWriteClient(HoodieEngineContext context,
AbstractHoodieIndex<T, I, K, O, P> index,
+ AbstractHoodieTable<T, I, K, O, P>
table,HoodieWriteConfig clientConfig) {
+ this(context, index, table,clientConfig, Option.empty());
+ }
+
+ protected AbstractHoodieWriteClient(HoodieEngineContext context,
AbstractHoodieIndex<T, I, K, O, P> index,
+ AbstractHoodieTable<T, I, K, O, P>
table,HoodieWriteConfig clientConfig,
+ Option<AbstractEmbeddedTimelineService>
timelineServer) {
+ this(context, index, table,clientConfig, timelineServer, false);
+ }
+
+ public AbstractHoodieWriteClient(HoodieEngineContext context,
AbstractHoodieIndex<T, I, K, O, P> index,AbstractHoodieTable<T, I, K, O, P>
table,
+ HoodieWriteConfig clientConfig,
Option<AbstractEmbeddedTimelineService> timelineServer, boolean
rollbackPending) {
+ super(context, clientConfig, timelineServer);
+ this.index = index;
+ this.table = table;
+ this.metrics = new HoodieMetrics(config, config.getTableName());
+ this.rollbackPending = rollbackPending;
+ }
+
+ public abstract I filterExists(I hoodieRecords);
+
+ public abstract O upsert(I records, final String instantTime);
+
+ public abstract O upsertPreppedRecords(I preppedRecords, final String
instantTime);
+
+ public abstract O insert(I records, final String instantTime);
+
+ public abstract O insertPreppedRecords(I preppedRecords, final String
instantTime);
+
+ public O bulkInsert(I records, final String instantTime) {
+ return bulkInsert(records, instantTime, Option.empty());
+ }
+
+ public abstract O bulkInsert(I records, final String instantTime,
+
Option<UserDefinedBulkInsertPartitioner<T,I>> bulkInsertPartitioner);
+
+ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String
instantTime,
+
Option<UserDefinedBulkInsertPartitioner<T,I>> bulkInsertPartitioner);
+
+ public abstract O delete(K keys, final String instantTime);
+
+ /**
+ * Common method containing steps to be performed after write
(upsert/insert/..) operations including auto-commit.
+ * @param result Commit Action Result
+ * @param instantTime Instant Time
+ * @param hoodieTable Hoodie Table
+ * @return Write Status
+ */
+ private O postWrite(HoodieWriteMetadata<O> result, String instantTime,
AbstractHoodieTable<T,I,K,O,P> hoodieTable) {
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(getOperationType().name(),
result.getIndexUpdateDuration().get().toMillis());
+ }
+ if (result.isCommitted()) {
+ // Perform post commit operations.
+ if (result.getFinalizeDuration().isPresent()) {
+
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+ result.getWriteStats().get().size());
+ }
+
+ postCommit(result.getCommitMetadata().get(), instantTime,
Option.empty());
+
+ emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
+ hoodieTable.getMetaClient().getCommitActionType());
+ }
+ return result.getWriteStatuses();
+ }
+
+ protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
+ Option<Map<String, String>> extraMetadata) {
+ try {
+ // Do an inline compaction if enabled
+ if (config.isInlineCompaction()) {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"true");
+ inlineCompact(extraMetadata);
+ } else {
+ metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP,
"false");
+ }
+ // We cannot have unbounded commit files. Archive commits if we have to
archive
+ HoodieTimelineArchiveLog archiveLog = new
HoodieTimelineArchiveLog(config, createMetaClient(true),table);
+ archiveLog.archiveIfRequired(hadoopConf);
+ if (config.isAutoClean()) {
+ // Call clean to cleanup if there is anything to cleanup after the
commit,
+ LOG.info("Auto cleaning is enabled. Running cleaner now");
+ clean(instantTime);
+ } else {
+ LOG.info("Auto cleaning is not enabled. Not running cleaner now");
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Create a savepoint based on the latest commit action on the timeline.
+ *
+ * @param user - User creating the savepoint
+ * @param comment - Comment for the savepoint
+ */
+ public void savepoint(String user, String comment) {
+ if (table.getCompletedCommitsTimeline().empty()) {
+ throw new HoodieSavepointException("Could not savepoint. Commit timeline
is empty");
+ }
+
+ String latestCommit =
table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
+ LOG.info("Savepointing latest commit " + latestCommit);
+ savepoint(latestCommit, user, comment);
+ }
+
+ /**
+ * Savepoint a specific commit instant time. Latest version of data files as
of the passed in instantTime
+ * will be referenced in the savepoint and will never be cleaned. The
savepointed commit will never be rolledback or archived.
+ * <p>
+ * This gives an option to rollback the state to the savepoint anytime.
Savepoint needs to be manually created and
+ * deleted.
+ * <p>
+ * Savepoint should be on a commit that could not have been cleaned.
+ *
+ * @param instantTime - commit that should be savepointed
+ * @param user - User creating the savepoint
+ * @param comment - Comment for the savepoint
+ */
+ public void savepoint(String instantTime, String user, String comment) {
+ table.savepoint(context, instantTime, user, comment);
+ }
+
+ /**
+ * Delete a savepoint that was created. Once the savepoint is deleted, the
commit can be rolledback and cleaner may
+ * clean up data files.
+ *
+ * @param savepointTime - delete the savepoint
+ * @return true if the savepoint was deleted successfully
+ */
+ public abstract void deleteSavepoint(String savepointTime);
+
+ /**
+ * Restore the data to the savepoint.
+ *
+ * WARNING: This rolls back recent commits and deleted data files and also
pending compactions after savepoint time.
+ * Queries accessing the files will mostly fail. This is expected to be a
manual operation and no concurrent write or
+ * compaction is expected to be running
+ *
+ * @param savepointTime - savepoint time to rollback to
+ * @return true if the savepoint was restored to successfully
+ */
+ public abstract void restoreToSavepoint(String savepointTime);
+
+ /**
+ * Rollback the inflight record changes with the given commit time.
+ *
+ * @param commitInstantTime Instant time of the commit
+ * @throws HoodieRollbackException if rollback cannot be performed
successfully
+ */
+ public boolean rollback(final String commitInstantTime) throws
HoodieRollbackException {
+ LOG.info("Begin rollback of instant " + commitInstantTime);
+ final String rollbackInstantTime =
HoodieActiveTimeline.createNewInstantTime();
+ final Timer.Context timerContext = this.metrics.getRollbackCtx();
+ try {
+ Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
+ .filter(instant ->
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+ .findFirst());
+ if (commitInstantOpt.isPresent()) {
+ HoodieRollbackMetadata rollbackMetadata = table.rollback(context,
rollbackInstantTime, commitInstantOpt.get(), true);
+ if (timerContext != null) {
+ long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
+ } else {
+ LOG.warn("Cannot find instant " + commitInstantTime + " in the
timeline, for rollback");
+ return false;
+ }
+ } catch (Exception e) {
+ throw new HoodieRollbackException("Failed to rollback " +
config.getBasePath() + " commits " + commitInstantTime, e);
+ }
+ }
+
+
+ /**
+ * NOTE : This action requires all writers (ingest and compact) to a table
to be stopped before proceeding. Revert
+ * the (inflight/committed) record changes for all commits after the
provided instant time.
+ *
+ * @param instantTime Instant time to which restoration is requested
+ */
+ public HoodieRestoreMetadata restoreToInstant(final String instantTime)
throws HoodieRestoreException {
+ LOG.info("Begin restore to instant " + instantTime);
+ final String restoreInstantTime =
HoodieActiveTimeline.createNewInstantTime();
+ Timer.Context timerContext = metrics.getRollbackCtx();
+ try {
+ HoodieRestoreMetadata restoreMetadata = table.restore(context,
restoreInstantTime, instantTime);
+ if (timerContext != null) {
+ final long durationInMs = metrics.getDurationInMs(timerContext.stop());
+ final long totalFilesDeleted =
restoreMetadata.getHoodieRestoreMetadata().values().stream()
+ .flatMap(Collection::stream)
+ .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
+ .sum();
+ metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
+ }
+ return restoreMetadata;
+ } catch (Exception e) {
+ throw new HoodieRestoreException("Failed to restore to " + instantTime,
e);
+ }
+ }
+
+ /**
+ * Performs a compaction operation on a table, serially before or after an
insert/upsert action.
+ */
+ private Option<String> inlineCompact(Option<Map<String, String>>
extraMetadata) {
+ Option<String> compactionInstantTimeOpt =
scheduleCompaction(extraMetadata);
+ compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
+ // inline compaction should auto commit as the user is never given
control
+ compact(compactionInstantTime, true);
+ });
+ return compactionInstantTimeOpt;
+ }
+ /**
+ * Ensures compaction instant is in expected state and performs Compaction
for the workload stored in instant-time.
+ *
+ * @param compactionInstantTime Compaction Instant Time
+ * @return RDD of Write Status
+ */
+ private O compact(String compactionInstantTime, boolean shouldComplete) {
Review comment:
I think we would change it to public for users to call the API directly
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]