This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6652a84aa [HUDI-5407][HUDI-5408] Fixing rollback in MDT to be eager 
(#7490)
6f6652a84aa is described below

commit 6f6652a84aa5e738cdb6932f41880ae24415b06c
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 20 17:17:26 2023 -0800

    [HUDI-5407][HUDI-5408] Fixing rollback in MDT to be eager (#7490)
    
    Metadata table could deem some invalid data as valid in some rare 
conditions. Especially when there are partially failed commits in MDT and the 
commit that failed refers to compaction or clustering in data table, we might 
see some anomalies.
    
    Scenarios where this could fail w/ inline compaction.
    
    Data table timeline
    t1.dc t2.comp.req. |Crash t3.dc t2.comp.inflight t2.commit
    
    MDT timeline
    t1.dc. t2.comp.inflight |Crash t3.dc t4.rb(t2) t2.dc
    
    The first attempt of t2 in MDT should be rolled back since it crashed 
mid-way. in other words, if there are any log blocks written by t2 in MDT, it 
should be deemed invalid.
    
    But what happens is, here is how the log blocks are laid out.
    
    log1(t1). log2(t2 first attempt) crash.... log3 (t3) log4(t4.rb rolling 
back t2) ... log5 (t2)
    
    So, when we read the log blocks via AbstractLogRecordReader, ideally we 
want to ignore log2. but when we encounter log4 for a rollback block, we only 
check the previous log block for matching commit to rollback. since it does not 
match w/ t2, we assume log4 is a duplicate rollback and hence still deem log2 
as a valid log block.
    
    hence MDT could serve more data files which are not valid from a FS based 
listing standpoint.
    
    Fix: switching failed writes cleaning policy in MDT to EAGER will solve 
this issue. Prior to this patch, rollback was set to lazy and hence happens 
only when we trigger clean at the end after delta commit succeeds in MDT.
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   4 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   8 ++
 .../metadata/HoodieBackedTableMetadataWriter.java  |   9 +-
 .../rollback/BaseRollbackActionExecutor.java       |   4 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |   9 +-
 .../functional/TestHoodieBackedMetadata.java       | 107 +++++++++++++++++++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  34 +++++--
 .../apache/hudi/utilities/HoodieClusteringJob.java |   5 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  26 ++++-
 9 files changed, 189 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 7ea70f63998..ec1041e3ab0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -667,6 +667,7 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
 
   /**
    * Rollback all failed writes.
+   * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites() {
     return rollbackFailedWrites(false);
@@ -675,6 +676,7 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
   /**
    * Rollback all failed writes.
    * @param skipLocking if this is triggered by another parent transaction, 
locking can be skipped.
+   * @return true if rollback was triggered. false otherwise.
    */
   protected Boolean rollbackFailedWrites(boolean skipLocking) {
     HoodieTable table = createTable(config, hadoopConf);
@@ -682,7 +684,7 @@ public abstract class BaseHoodieTableServiceClient<O> 
extends BaseHoodieClient i
     Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(table.getMetaClient());
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, 
Option.empty()));
     rollbackFailedWrites(pendingRollbacks, skipLocking);
-    return true;
+    return !pendingRollbacks.isEmpty();
   }
 
   protected void rollbackFailedWrites(Map<String, 
Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b361f8918c4..818fa82e568 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1263,6 +1263,14 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     }
   }
 
+  /**
+   * Rollback failed writes if any.
+   * @return true if rollback happened. false otherwise.
+   */
+  public boolean rollbackFailedWrites() {
+    return tableServiceClient.rollbackFailedWrites();
+  }
+
   /**
    * add columns to table.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cc06b80aafd..fd83d24ea56 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -268,7 +268,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             .withAutoClean(false)
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
             .build())
         // we will trigger archive manually, to ensure only regular writer 
invokes it
@@ -1041,7 +1041,12 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
     final String compactionInstantTime = latestDeltaCommitTimeInMetadataTable 
+ METADATA_COMPACTION_TIME_SUFFIX;
-    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+    // we need to avoid checking compaction w/ same instant again.
+    // lets say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
+    // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
+    // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
+    if 
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
+        && writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
       writeClient.compact(compactionInstantTime);
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index b5ae9471e58..b88b332172b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -155,12 +156,13 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
     // since with LAZY rollback we support parallel writing which can allow a 
new inflight while rollback is ongoing
     // Remove this once we support LAZY rollback of failed writes by default 
as parallel writing becomes the default
     // writer mode.
-    if (config.getFailedWritesCleanPolicy().isEager()) {
+    if (config.getFailedWritesCleanPolicy().isEager()  && 
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
       final String instantTimeToRollback = instantToRollback.getTimestamp();
       HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
       HoodieTimeline inflightAndRequestedCommitTimeline = 
table.getPendingCommitTimeline();
       // Make sure only the last n commits are being rolled back
       // If there is a commit in-between or after that is not rolled back, 
then abort
+      // this condition may not hold good for metadata table. since the order 
of commits applied to MDT is data table commits and the ordering could be 
different.
       if ((instantTimeToRollback != null) && !commitTimeline.empty()
           && !commitTimeline.findInstantsAfter(instantTimeToRollback, 
Integer.MAX_VALUE).empty()) {
         // check if remnants are from a previous LAZY rollback config, if yes, 
let out of order rollback continue
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 272d3d47985..81526c25bcc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
@@ -132,7 +133,11 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = 
HoodieJavaRDD.getJavaRDD(preppedRecords);
 
-    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
+    try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
+      // rollback partially failed writes if any.
+      if (writeClient.rollbackFailedWrites()) {
+        metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+      }
       if (canTriggerTableService) {
         // trigger compaction before doing the delta commit. this is to 
ensure, if this delta commit succeeds in metadata table, but failed in data 
table,
         // we would have compacted metadata table and so could have included 
uncommitted data which will never be ignored while reading from metadata
@@ -162,7 +167,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
         // clean plan is the same, so we don't need to delete the requested 
and inflight instant
         // files in the active timeline.
       }
-      
+
       List<WriteStatus> statuses = 
writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 84e6d342883..d46e02cf39f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -906,6 +906,65 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testMetadataRollbackWithCompaction() throws Exception {
+    HoodieTableType tableType = COPY_ON_WRITE;
+    init(tableType, false);
+    writeConfig = getWriteConfigBuilder(false, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withPopulateMetaFields(true)
+            .build())
+        .build();
+
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // Write 1 (Bulk insert)
+      String newCommitTime1 = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime1, 
100);
+      client.startCommitWithTime(newCommitTime1);
+      JavaRDD writeStatuses = client.insert(jsc.parallelize(records, 1), 
newCommitTime1);
+      client.commit(newCommitTime1, writeStatuses);
+
+      String newCommitTime2 = "0000002";
+      records = dataGen.generateUniqueUpdates(newCommitTime2, 20);
+      client.startCommitWithTime(newCommitTime2);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime2);
+      client.commit(newCommitTime2, writeStatuses);
+
+      String newCommitTime3 = "0000003";
+      records = dataGen.generateUniqueUpdates(newCommitTime3, 20);
+      client.startCommitWithTime(newCommitTime3);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime3);
+      client.commit(newCommitTime3, writeStatuses);
+
+      // rollback "3" so that there is no "3" in data table timeline, but 
there exists a DC "3" in metadata timeline.
+      client.rollback(newCommitTime3);
+
+      // mimicing crash or making an inflight in metadata table.
+      Path toDelete = new Path(metaClient.getMetaPath() + "/metadata/.hoodie/" 
+ newCommitTime2 + "." + HoodieTimeline.DELTA_COMMIT_ACTION);
+      metaClient.getFs().delete(toDelete);
+
+      // re-ingest w/ same commit time.
+      records = dataGen.generateUniqueUpdates(newCommitTime3, 20);
+      client.startCommitWithTime(newCommitTime3);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime3);
+      client.commit(newCommitTime3, writeStatuses);
+
+      // collect all commit meta files from metadata table.
+      FileStatus[] metaFiles = metaClient.getFs().listStatus(new 
Path(metaClient.getMetaPath() + "/metadata/.hoodie"));
+      List<FileStatus> commit3Files = 
Arrays.stream(metaFiles).filter(fileStatus ->
+          fileStatus.getPath().getName().equals(newCommitTime3 + "." + 
HoodieTimeline.DELTA_COMMIT_ACTION)).collect(Collectors.toList());
+      List<FileStatus> rollbackFiles = 
Arrays.stream(metaFiles).filter(fileStatus ->
+          fileStatus.getPath().getName().endsWith("." + 
HoodieTimeline.ROLLBACK_ACTION)).collect(Collectors.toList());
+
+      // ensure commit2's delta commit in MDT has last mod time > the actual 
rollback for previous failed commit i.e. commit2.
+      // if rollback wasn't eager, rollback's last mod time will be lower than 
the commit3'd delta commit last mod time.
+      assertTrue(commit3Files.get(0).getModificationTime() > 
rollbackFiles.get(0).getModificationTime());
+    }
+  }
+
   /**
    * Test arguments - Table type, populate meta fields, exclude key from 
payload.
    */
@@ -1594,6 +1653,54 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
   }
 
+  @Test
+  public void testEagerRollbackinMDT() throws IOException {
+    tableType = MERGE_ON_READ;
+    initPath();
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig);
+    // Write 1 (Bulk insert)
+    String commit1 = HoodieActiveTimeline.createNewInstantTime();
+    List<HoodieRecord> records = dataGen.generateInserts(commit1, 20);
+    client.startCommitWithTime(commit1);
+    List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), commit1).collect();
+    assertNoWriteErrors(writeStatuses);
+
+    // Write 2 (inserts)
+    String commit2 = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(commit2);
+    records = dataGen.generateInserts(commit2, 20);
+    writeStatuses = client.insert(jsc.parallelize(records, 1), 
commit2).collect();
+    assertNoWriteErrors(writeStatuses);
+    // remove latest completed delta commit from MDT.
+    Path toDelete = new Path(metaClient.getMetaPath() + "/metadata/.hoodie/" + 
commit2 + "." + HoodieTimeline.DELTA_COMMIT_ACTION);
+    metaClient.getFs().delete(toDelete);
+
+    // Write 3 (updates)
+    client = new SparkRDDWriteClient(engineContext, writeConfig);
+    String commit3 = HoodieActiveTimeline.createNewInstantTime();
+    client.startCommitWithTime(commit3);
+    records = dataGen.generateUniqueUpdates(commit3, 10);
+    writeStatuses = client.upsert(jsc.parallelize(records, 1), 
commit3).collect();
+    assertNoWriteErrors(writeStatuses);
+
+    // ensure that 000003 is after rollback of the partially failed 2nd commit.
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() + 
"/metadata/").setConf(metaClient.getHadoopConf()).build();
+    HoodieInstant rollbackInstant = 
metadataMetaClient.getActiveTimeline().getRollbackTimeline().getInstants().get(0);
+
+    // collect all commit meta files from metadata table.
+    FileStatus[] metaFiles = metaClient.getFs().listStatus(new 
Path(metaClient.getMetaPath() + "/metadata/.hoodie"));
+    List<FileStatus> commit3Files = Arrays.stream(metaFiles).filter(fileStatus 
->
+        fileStatus.getPath().getName().equals(commit3 + "." + 
HoodieTimeline.DELTA_COMMIT_ACTION)).collect(Collectors.toList());
+    List<FileStatus> rollbackFiles = 
Arrays.stream(metaFiles).filter(fileStatus ->
+        fileStatus.getPath().getName().equals(rollbackInstant.getTimestamp() + 
"." + HoodieTimeline.ROLLBACK_ACTION)).collect(Collectors.toList());
+
+    // ensure commit3's delta commit in MDT has last mod time > the actual 
rollback for previous failed commit i.e. commit2.
+    // if rollback wasn't eager, rollback's last mod time will be lower than 
the commit3'd delta commit last mod time.
+    assertTrue(commit3Files.get(0).getModificationTime() > 
rollbackFiles.get(0).getModificationTime());
+  }
+
   /**
    * Test all major table operations with the given table, config and context.
    *
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 38033b9af35..d67e294e8b9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -28,10 +28,12 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
 import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -51,6 +53,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
@@ -81,6 +84,7 @@ import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
@@ -2415,9 +2419,13 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     testRollbackAfterConsistencyCheckFailureUsingFileList(true, 
enableOptimisticConsistencyGuard, populateMetCols);
   }
 
-  @ParameterizedTest
-  @MethodSource("rollbackFailedCommitsParams")
-  public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, boolean populateMetaFields) throws Exception {
+  //@ParameterizedTest
+  //@MethodSource("rollbackFailedCommitsParams")
+  @Test
+  public void testRollbackFailedCommits() throws Exception {
+    // HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean 
populateMetaFields
+    HoodieFailedWritesCleaningPolicy cleaningPolicy = 
HoodieFailedWritesCleaningPolicy.NEVER;
+    boolean populateMetaFields = true;
     HoodieTestUtils.init(hadoopConf, basePath);
     SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
 
@@ -2476,11 +2484,12 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
               == 0);
       
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 3);
     } else if (cleaningPolicy.isNever()) {
+      // never will get translated to Lazy if OCC is enabled.
       assertTrue(
               timeline
                       
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
                       .countInstants()
-                      == 0);
+                      == 2);
       // There should be no clean or rollback action on the timeline
       assertTrue(
               timeline
@@ -2546,8 +2555,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     client.startCommit();
     timeline = metaClient.getActiveTimeline().reload();
+    // since OCC is enabled, hudi auto flips the cleaningPolicy to Lazy.
     assertTrue(timeline.getTimelineOfActions(
-            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5);
+            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
     
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 1);
   }
 
@@ -2780,6 +2790,14 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   private HoodieWriteConfig 
getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, 
boolean populateMetaFields) {
+    Properties properties = new Properties();
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
 "3000");
+    
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
 "20");
+    if (!populateMetaFields) {
+      getPropertiesForKeyGen(populateMetaFields).entrySet().forEach(kv ->
+          properties.put(kv.getKey(), kv.getValue()));
+    }
     return getConfigBuilder()
         .withEmbeddedTimelineServerEnabled(false)
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -2790,7 +2808,11 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withAutoCommit(false)
-        .withProperties(populateMetaFields ? new Properties() : 
getPropertiesForKeyGen()).build();
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withProperties(properties).build();
   }
 
   public static class FailingPreCommitValidator<T extends HoodieRecordPayload, 
I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, 
K, O> {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 3c97b732eb6..634608e965a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -54,7 +54,7 @@ public class HoodieClusteringJob {
   private final Config cfg;
   private final TypedProperties props;
   private final JavaSparkContext jsc;
-  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetaClient metaClient;
 
   public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
     this.cfg = cfg;
@@ -180,6 +180,7 @@ public class HoodieClusteringJob {
   }
 
   private int doCluster(JavaSparkContext jsc) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
       if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
@@ -208,6 +209,7 @@ public class HoodieClusteringJob {
   }
 
   private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
       return doSchedule(client);
@@ -224,6 +226,7 @@ public class HoodieClusteringJob {
 
   private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
     LOG.info("Step 1: Do schedule");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
       Option<String> instantTime = Option.empty();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 30ad43f894b..fc59d0ff9e0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -29,6 +29,7 @@ import 
org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -162,11 +163,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType) throws IOException {
-    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
+    return initialHoodieDeltaStreamer(tableBasePath, totalRecords, 
asyncCluster, recordType, WriteOperationType.INSERT);
+  }
+
+  protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType,
+                                                             
WriteOperationType writeOperationType) throws IOException {
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
writeOperationType);
     TestHelpers.addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
+    cfg.configs.addAll(getAllMultiWriterConfigs());
     return new HoodieDeltaStreamer(cfg, jsc);
   }
 
@@ -179,10 +186,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   protected HoodieClusteringJob initialHoodieClusteringJob(String 
tableBasePath, String clusteringInstantTime, Boolean runSchedule, String 
scheduleAndExecute,
-      Boolean retryLastFailedClusteringJob, HoodieRecordType recordType) {
+                                                           Boolean 
retryLastFailedClusteringJob, HoodieRecordType recordType) {
     HoodieClusteringJob.Config scheduleClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
         clusteringInstantTime, runSchedule, scheduleAndExecute, 
retryLastFailedClusteringJob);
     TestHelpers.addRecordMerger(recordType, scheduleClusteringConfig.configs);
+    scheduleClusteringConfig.configs.addAll(getAllMultiWriterConfigs());
     return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
   }
 
@@ -1099,6 +1107,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     return configs;
   }
 
+  private List<String> getAllMultiWriterConfigs() {
+    List<String> configs = new ArrayList<>();
+    configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getCanonicalName()));
+    configs.add(String.format("%s=%s", 
LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"));
+    configs.add(String.format("%s=%s", 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+    configs.add(String.format("%s=%s", 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name()));
+    return configs;
+  }
+
   private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String 
basePath,
                                                                      String 
clusteringInstantTime,
                                                                      Boolean 
runSchedule) {
@@ -1305,6 +1322,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
+    cfg.configs.addAll(getAllMultiWriterConfigs());
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
 
@@ -1345,7 +1363,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @CsvSource(value = {"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, 
AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"})
   public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String 
runningMode, HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/asyncClustering2";
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false", recordType);
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false", recordType, WriteOperationType.BULK_INSERT);
     HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, runningMode, recordType);
 
     deltaStreamerTestRunner(ds, (r) -> {
@@ -1356,7 +1374,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         if (result == 0) {
           LOG.info("Cluster success");
         } else {
-          LOG.warn("Import failed");
+          LOG.warn("Cluster failed");
           if (!runningMode.toLowerCase().equals(EXECUTE)) {
             return false;
           }

Reply via email to