vinothchandar commented on a change in pull request #4037:
URL: https://github.com/apache/hudi/pull/4037#discussion_r756334432



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -255,30 +255,18 @@ private void rollBackIndex() {
 
   protected void finishRollback(HoodieInstant inflightInstant, 
HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
     try {
-      writeToMetadata(rollbackMetadata);
+      if (!skipLocking && config.isMetadataTableEnabled()) {
+        this.txnManager.beginTransaction(Option.empty(), Option.empty());
+      }
+      writeTableMetadata(rollbackMetadata);
       
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,

Review comment:
       yeah probably good to fence the entire timeline update as well

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -255,30 +255,18 @@ private void rollBackIndex() {
 
   protected void finishRollback(HoodieInstant inflightInstant, 
HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
     try {
-      writeToMetadata(rollbackMetadata);
+      if (!skipLocking && config.isMetadataTableEnabled()) {
+        this.txnManager.beginTransaction(Option.empty(), Option.empty());
+      }
+      writeTableMetadata(rollbackMetadata);
       
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
           TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
       LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() 
+ " is complete");
     } catch (IOException e) {
       throw new HoodieIOException("Error executing rollback at instant " + 
instantTime, e);
-    }
-  }
-
-  /**
-   * Update metadata table if available. Any update to metadata table happens 
within data table lock.
-   * @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be 
applied to metadata.
-   */
-  private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) {
-    if (config.isMetadataTableEnabled()) {
-      try {
-        if (!skipLocking) {
-          this.txnManager.beginTransaction(Option.empty(), Option.empty());
-        }
-        writeTableMetadata(rollbackMetadata);
-      } finally {
-        if (!skipLocking) {
-          this.txnManager.endTransaction();
-        }
+    } finally {
+      if (!skipLocking && config.isMetadataTableEnabled()) {

Review comment:
       same here.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
##########
@@ -205,31 +205,19 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, 
O> table, HoodieInstan
           Option.of(timer.endTimer()),
           cleanStats
       );
-      writeMetadata(metadata);
+      if (!skipLocking && config.isMetadataTableEnabled()) {

Review comment:
       can we have a `needsLocking` member which is initialized to 
`needsLocking = !skipLocking && config.isMetadataTableEnabled()` and just reuse 
here and below?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -314,11 +314,21 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, JavaRDD<WriteSt
                                     String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    writeTableMetadata(table, metadata, new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionCommitTime));
-    // commit to data table after committing to metadata table.
-    finalizeWrite(table, compactionCommitTime, writeStats);
-    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished 
with result " + metadata);
-    CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
+    try {
+      HoodieInstant compactionInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionCommitTime);
+      if (config.isMetadataTableEnabled()) { // when metadata is not enabled, 
we don't want to take a lock here even with multi-writers.
+        this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
+      }
+      writeTableMetadata(table, metadata, compactionInstant);
+      // commit to data table after committing to metadata table.
+      finalizeWrite(table, compactionCommitTime, writeStats);
+      LOG.info("Committing Compaction " + compactionCommitTime + ". Finished 
with result " + metadata);
+      CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
+    } finally {
+      if (config.isMetadataTableEnabled()) {

Review comment:
       Somehow I don't like connecting locking to metadata table? what if we 
are just doing a single writer process which calls these APIs. why would we 
need a lock then? Can't we always take locks, except that for multi writer, 
users are expected to configure a `LockProvider`

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -386,9 +396,13 @@ private void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
       throw new HoodieClusteringException("Clustering failed to write to 
files:"
           + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
     }
-    writeTableMetadata(table, metadata, new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
-    finalizeWrite(table, clusteringCommitTime, writeStats);
     try {
+      HoodieInstant clusteringInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+      if (config.isMetadataTableEnabled()) { // when metadata is not enabled, 
we don't want to take a lock here even with multi-writers.
+        this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());

Review comment:
       Kind of surprised this much logic sits in SparkRDDWriteClient and not in 
abstract write client




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to