This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new def40238f42 [MINOR] Rename a conflict resolution strategy class (#9239)
def40238f42 is described below

commit def40238f421de7a701009ff8d4430721faae6b3
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jul 21 08:31:52 2023 +0530

    [MINOR] Rename a conflict resolution strategy class (#9239)
    
    * Rename to PreferWriterConflictResolutionStrategy
---
 ...=> PreferWriterConflictResolutionStrategy.java} |  11 +-
 ...estPreferWriterConflictResolutionStrategy.java} |  28 +--
 .../hudi/client/TestHoodieClientMultiWriter.java   |  71 ++++----
 ... TestMultiWriterWithPreferWriterIngestion.java} |  11 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 199 ++++++++++-----------
 5 files changed, 159 insertions(+), 161 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
similarity index 92%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 1c71be9f70b..f95e7b078a6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/IngestionPrimaryWriterBasedConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +40,12 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
 
 /**
  * This class extends the base implementation of conflict resolution strategy.
- * It gives preference to ingestion writers compared to table services.
+ * It gives preference to non-blocking ingestion over table services in case 
of conflicts.
  */
-public class IngestionPrimaryWriterBasedConflictResolutionStrategy
+public class PreferWriterConflictResolutionStrategy
     extends SimpleConcurrentFileWritesConflictResolutionStrategy {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(IngestionPrimaryWriterBasedConflictResolutionStrategy.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(PreferWriterConflictResolutionStrategy.class);
 
   /**
    * For tableservices like replacecommit and compaction commits this method 
also returns ingestion inflight commits.
@@ -54,7 +55,7 @@ public class 
IngestionPrimaryWriterBasedConflictResolutionStrategy
                                                     Option<HoodieInstant> 
lastSuccessfulInstant) {
     HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
     if ((REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
-          && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
+        && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
         || COMPACTION_ACTION.equals(currentInstant.getAction())) {
       return getCandidateInstantsForTableServicesCommits(activeTimeline, 
currentInstant);
     } else {
@@ -64,7 +65,7 @@ public class 
IngestionPrimaryWriterBasedConflictResolutionStrategy
 
   private Stream<HoodieInstant> 
getCandidateInstantsForNonTableServicesCommits(HoodieActiveTimeline 
activeTimeline, HoodieInstant currentInstant) {
 
-    // To findout which instants are conflicting, we apply the following logic
+    // To find out which instants are conflicting, we apply the following logic
     // Get all the completed instants timeline only for commits that have 
happened
     // since the last successful write based on the transition times.
     // We need to check for write conflicts since they may have mutated the 
same files
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
similarity index 90%
rename from 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
rename to 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
index 966da46690e..a2cf9103284 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestIngestionPrimaryWriterBasedConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestPreferWriterConflictResolutionStrategy.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieWriteConflictException;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceInflight;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplaceRequested;
 
-public class TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends 
HoodieCommonTestHarness {
+public class TestPreferWriterConflictResolutionStrategy extends 
HoodieCommonTestHarness {
 
   @BeforeEach
   public void init() throws IOException {
@@ -64,7 +65,7 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     createCompactionRequested(newInstantTime, metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 does not have a conflict with scheduled compaction plan 1
@@ -84,12 +85,12 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     // compaction 1 gets scheduled and finishes
     String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
     // TODO: Remove sleep stmt once the modified times issue is fixed.
-    // Sleep thread for atleast 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
+    // Sleep thread for at least 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
     Thread.sleep(1000);
     createCompaction(newInstantTime, metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
@@ -121,7 +122,7 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     createCompactionRequested(newInstantTime, metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
newInstantTime));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     // TODO Create method to create compactCommitMetadata
     //    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(newInstantTime);
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
@@ -149,7 +150,7 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     createInflightCommit(currentWriterInstant, metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
     // writer 1 should not conflict with an earlier scheduled compaction 1 
with the same file ids
@@ -174,16 +175,16 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     createReplaceInflight(newInstantTime, WriteOperationType.CLUSTER, 
metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
-    // Since we give preference to ingestion over clustering, there wont be a 
conflict with replacecommit.
+    // Since we give preference to ingestion over clustering, there won't be a 
conflict with replacecommit.
     Assertions.assertEquals(0, candidateInstants.size());
   }
 
   /**
    * This method confirms ingestion commit failing due to already present 
replacecommit.
-   * Here the replacecommit is allowed to commit. Ideally replacecommit cannot 
be committed when there is a ingestion inflight.
+   * Here the replacecommit is allowed to commit. Ideally replacecommit cannot 
be committed when there is an ingestion inflight.
    * The following case can occur, during transition phase of ingestion commit 
from Requested to Inflight,
    * during that time replacecommit can be completed.
    */
@@ -197,14 +198,14 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
     createInflightCommit(currentWriterInstant, metaClient);
     // TODO: Remove sleep stmt once the modified times issue is fixed.
-    // Sleep thread for atleast 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
+    // Sleep thread for at least 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
     Thread.sleep(1000);
     // clustering writer starts and complete before ingestion commit.
     String replaceWriterInstant = HoodieActiveTimeline.createNewInstantTime();
     createReplace(replaceWriterInstant, WriteOperationType.CLUSTER, 
metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     metaClient.reloadActiveTimeline();
     List<HoodieInstant> candidateInstants = strategy
         .getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant)
@@ -233,14 +234,14 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
     String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
     createInflightCommit(currentWriterInstant, metaClient);
     // TODO: Remove sleep stmt once the modified times issue is fixed.
-    // Sleep thread for atleast 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
+    // Sleep thread for at least 1sec for consecutive commits that way they do 
not have two commits modified times falls on the same millisecond.
     Thread.sleep(1000);
     // replace 1 gets scheduled and finished
     String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
     createReplace(newInstantTime, WriteOperationType.INSERT_OVERWRITE, 
metaClient);
 
     Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
-    IngestionPrimaryWriterBasedConflictResolutionStrategy strategy = new 
IngestionPrimaryWriterBasedConflictResolutionStrategy();
+    PreferWriterConflictResolutionStrategy strategy = new 
PreferWriterConflictResolutionStrategy();
     HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
     List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient, currentInstant.get(), 
lastSuccessfulInstant).collect(
         Collectors.toList());
@@ -256,5 +257,4 @@ public class 
TestIngestionPrimaryWriterBasedConflictResolutionStrategy extends H
       // expected
     }
   }
-
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 0984f9436bf..c2de0595dc7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
-import 
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
+import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -143,12 +143,12 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
 
   private static final List<ConflictResolutionStrategy> 
CONFLICT_RESOLUTION_STRATEGY_CLASSES = Arrays.asList(
       new SimpleConcurrentFileWritesConflictResolutionStrategy(),
-      new IngestionPrimaryWriterBasedConflictResolutionStrategy());
+      new PreferWriterConflictResolutionStrategy());
 
   private static Iterable<Object[]> 
providerClassResolutionStrategyAndTableType() {
     List<Object[]> opts = new ArrayList<>();
     for (Object providerClass : LOCK_PROVIDER_CLASSES) {
-      for (ConflictResolutionStrategy resolutionStrategy: 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
+      for (ConflictResolutionStrategy resolutionStrategy : 
CONFLICT_RESOLUTION_STRATEGY_CLASSES) {
         opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass, 
resolutionStrategy});
         opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass, 
resolutionStrategy});
       }
@@ -158,19 +158,20 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
 
   /**
    * Test multi-writers with early conflict detect enable, including
-   *    1. MOR + Direct marker
-   *    2. COW + Direct marker
-   *    3. MOR + Timeline server based marker
-   *    4. COW + Timeline server based marker
-   *
-   *                                    |---------------------- 003 heartBeat 
expired -------------------|
-   *
-   *  
---|---------|--------------------|--------------------------------------|-------------------------|------------------------->
 time
+   * 1. MOR + Direct marker
+   * 2. COW + Direct marker
+   * 3. MOR + Timeline server based marker
+   * 4. COW + Timeline server based marker
+   * <p>
+   * |---------------------- 003 heartBeat expired -------------------|
+   * <p>
+   * 
---|---------|--------------------|--------------------------------------|-------------------------|------------------------->
 time
    * init 001
-   *               002 start writing
-   *                                    003 start which has conflict with 002
-   *                                    and failed soon
-   *                                                                           
002 commit successfully       004 write successfully
+   * 002 start writing
+   * 003 start which has conflict with 002
+   * and failed soon
+   * 002 commit successfully       004 write successfully
+   *
    * @param tableType
    * @param markerType
    * @throws Exception
@@ -527,7 +528,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
 
     String pendingCompactionTime = (tableType == HoodieTableType.MERGE_ON_READ)
         ? metaClient.reloadActiveTimeline().filterPendingCompactionTimeline()
-          .firstInstant().get().getTimestamp()
+        .firstInstant().get().getTimestamp()
         : "";
     Option<HoodieInstant> pendingCleanInstantOp = 
metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested()
         .firstInstant();
@@ -551,7 +552,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
       latchCountDownAndWait(runCountDownLatch, 30000);
       if (tableType == HoodieTableType.MERGE_ON_READ) {
         assertDoesNotThrow(() -> {
-          HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =  
client2.compact(pendingCompactionTime);
+          HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client2.compact(pendingCompactionTime);
           client2.commitCompaction(pendingCompactionTime, 
compactionMetadata.getCommitMetadata().get(), Option.empty());
           validInstants.add(pendingCompactionTime);
         });
@@ -587,22 +588,22 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     }
     // Disabling embedded timeline server, it doesn't work with multiwriter
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
-            .withCleanConfig(HoodieCleanConfig.newBuilder()
-                    .withAutoClean(false)
-                    .withAsyncClean(true)
-                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                    .withInlineCompaction(false)
-                    .withMaxNumDeltaCommitsBeforeCompaction(2).build())
-            .withEmbeddedTimelineServerEnabled(false)
-            // Timeline-server-based markers are not used for multi-writer 
tests
-            .withMarkersType(MarkerType.DIRECT.name())
-            
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
-                    FileSystemViewStorageType.MEMORY).build())
-            
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
-            // Set the config so that heartbeat will expire in 1 second 
without update
-            
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-                    
.build()).withAutoCommit(false).withProperties(lockProperties);
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withAutoClean(false)
+            .withAsyncClean(true)
+            
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(2).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        // Timeline-server-based markers are not used for multi-writer tests
+        .withMarkersType(MarkerType.DIRECT.name())
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
+            FileSystemViewStorageType.MEMORY).build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        // Set the config so that heartbeat will expire in 1 second without 
update
+        
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+            .build()).withAutoCommit(false).withProperties(lockProperties);
     Set<String> validInstants = new HashSet<>();
     // Create the first commit with inserts
     HoodieWriteConfig cfg = writeConfigBuilder.build();
@@ -855,8 +856,8 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
   }
 
   private JavaRDD<WriteStatus> createCommitWithInserts(HoodieWriteConfig cfg, 
SparkRDDWriteClient client,
-                                       String prevCommitTime, String 
newCommitTime, int numRecords,
-                                       boolean doCommit) throws Exception {
+                                                       String prevCommitTime, 
String newCommitTime, int numRecords,
+                                                       boolean doCommit) 
throws Exception {
     // Finish first base commit
     JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, 
prevCommitTime, numRecords, SparkRDDWriteClient::bulkInsert,
         false, false, numRecords);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
similarity index 96%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
rename to 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 6d2e2182106..59547cd5b63 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiwriterWithIngestionAsPrimaryWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -18,9 +18,8 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hadoop.fs.Path;
+import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
-import 
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -41,6 +40,8 @@ import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -61,7 +62,7 @@ import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PA
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestMultiwriterWithIngestionAsPrimaryWriter extends 
HoodieClientTestBase {
+public class TestMultiWriterWithPreferWriterIngestion extends 
HoodieClientTestBase {
 
   public void setUpMORTestTable() throws IOException {
     cleanupResources();
@@ -102,7 +103,7 @@ public class TestMultiwriterWithIngestionAsPrimaryWriter 
extends HoodieClientTes
         
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
         
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-            .withConflictResolutionStrategy(new 
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+            .withConflictResolutionStrategy(new 
PreferWriterConflictResolutionStrategy())
             .build()).withAutoCommit(false).withProperties(properties).build();
     Set<String> validInstants = new HashSet<>();
     // Create the first commit with inserts
@@ -207,7 +208,7 @@ public class TestMultiwriterWithIngestionAsPrimaryWriter 
extends HoodieClientTes
         
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
         
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
-            .withConflictResolutionStrategy(new 
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+            .withConflictResolutionStrategy(new 
PreferWriterConflictResolutionStrategy())
             .build()).withAutoCommit(false).withProperties(properties).build();
     // Create the first commit
     String instant1 = HoodieActiveTimeline.createNewInstantTime();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 75023f98b5a..946c4115ab9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -30,7 +30,7 @@ import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanSt
 import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
 import 
org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy;
 import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
-import 
org.apache.hudi.client.transaction.IngestionPrimaryWriterBasedConflictResolutionStrategy;
+import 
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
@@ -135,8 +135,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -191,7 +189,7 @@ import static org.mockito.Mockito.when;
 @Tag("functional")
 public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase 
{
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+  private static final String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
   private static final Map<String, String> STRATEGY_PARAMS = new 
HashMap<String, String>() {
     {
       put("sortColumn", "record_key");
@@ -289,13 +287,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   /**
-   * Test auto-commit by applying write function.
+   * Test auto-commit by applying the write function.
    *
    * @param writeFn One of HoodieWriteClient Write API
    * @throws Exception in case of failure
    */
   private void testAutoCommit(Function3<JavaRDD<WriteStatus>, 
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-      boolean isPrepped, boolean populateMetaFields) throws Exception {
+                              boolean isPrepped, boolean populateMetaFields) 
throws Exception {
     // Set autoCommit false
     HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder().withAutoCommit(false);
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
@@ -405,7 +403,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     assertTrue(testTable.inflightCommitExists(instant1));
     assertTrue(testTable.commitExists(instant2));
   }
-  
+
   private void insertWithConfig(HoodieWriteConfig config, int numRecords, 
String instant) throws Exception {
     try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) 
->
@@ -513,7 +511,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       List<WriteStatus> statuses = writeFn.apply(client, recordList, 
newCommitTime).collect();
       assertNoWriteErrors(statuses);
       assertEquals(2, statuses.size());
-      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+      
assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
           .collect(Collectors.toList()));
     }
   }
@@ -566,7 +564,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       List<WriteStatus> statuses = writeFn.apply(client, recordList, 
newCommitTime).collect();
       assertNoWriteErrors(statuses);
       assertEquals(2, statuses.size());
-      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+      
assertNoDuplicatesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
           .collect(Collectors.toList()));
     }
   }
@@ -576,7 +574,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
    *
    * @param recordDelegates List of Hoodie record delegates
    */
-  void assertNodupesInPartition(List<HoodieRecordDelegate> recordDelegates) {
+  void assertNoDuplicatesInPartition(List<HoodieRecordDelegate> 
recordDelegates) {
     Map<String, Set<String>> partitionToKeys = new HashMap<>();
     for (HoodieRecordDelegate r : recordDelegates) {
       String recordKey = r.getRecordKey();
@@ -614,22 +612,22 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   /**
    * Test one of HoodieWriteClient upsert(Prepped) APIs.
    *
-   * @param config Write Config
+   * @param config  Write Config
    * @param writeFn One of Hoodie Write Function API
    * @throws Exception in case of error
    */
   private void testUpsertsInternal(HoodieWriteConfig config,
-      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
+                                   Function3<JavaRDD<WriteStatus>, 
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
       throws Exception {
     // Force using older timeline layout
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withRollbackUsingMarkers(true)
         .withProps(config.getProps()).withTimelineLayoutVersion(
-        VERSION_0).build();
+            VERSION_0).build();
 
     HoodieTableMetaClient.withPropertyBuilder()
-      .fromMetaClient(metaClient)
-      .setTimelineLayoutVersion(VERSION_0)
+        .fromMetaClient(metaClient)
+        .setTimelineLayoutVersion(VERSION_0)
         .setPopulateMetaFields(config.populateMetaFields())
         .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
@@ -664,7 +662,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         TimelineLayoutVersion.CURR_VERSION).build();
     client = getHoodieWriteClient(newConfig);
 
-    client.savepoint("004", "user1","comment1");
+    client.savepoint("004", "user1", "comment1");
 
     client.restoreToInstant("004", config.isMetadataTableEnabled());
 
@@ -714,7 +712,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     jsc.parallelize(Arrays.asList(1)).map(e -> {
       HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
           .fromBytes(metaClient.getActiveTimeline().getInstantDetails(
-              
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
+                  
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
               HoodieCommitMetadata.class);
       String filePath = 
commitMetadata.getPartitionToWriteStats().values().stream()
           .flatMap(w -> w.stream()).filter(s -> 
s.getPath().endsWith(extension)).findAny()
@@ -801,7 +799,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         0, 150, config.populateMetaFields());
 
     HoodieWriteConfig newConfig = 
getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
-        TimelineLayoutVersion.CURR_VERSION)
+            TimelineLayoutVersion.CURR_VERSION)
         
.withArchivalConfig(HoodieArchivalConfig.newBuilder().withArchiveBeyondSavepoint(true).build()).build();
     client = getHoodieWriteClient(newConfig);
 
@@ -833,7 +831,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void testInsertsPreppedWithHoodieConcatHandle(boolean 
populateMetaFields) throws Exception {
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
-    testHoodieConcatHandle(cfgBuilder.build(),  true);
+    testHoodieConcatHandle(cfgBuilder.build(), true);
   }
 
   /**
@@ -849,9 +847,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         
.withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
             VERSION_0).build();
     HoodieTableMetaClient.withPropertyBuilder()
-      .fromMetaClient(metaClient)
-      .setTimelineLayoutVersion(VERSION_0)
-      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
+        .fromMetaClient(metaClient)
+        .setTimelineLayoutVersion(VERSION_0)
+        .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
@@ -931,7 +929,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       client.startCommitWithTime(commitTime1);
       List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
       JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
-      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[]{"rider"}, 
HoodieTestDataGenerator.AVRO_SCHEMA, config);
+      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[] {"rider"}, 
HoodieTestDataGenerator.AVRO_SCHEMA, config);
       List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1, 
commitTime1, Option.of(partitioner)).collect();
       assertNoWriteErrors(statuses);
     }
@@ -951,7 +949,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       assertNoWriteErrors(statuses);
 
       // inject a pending restore
-      client.savepoint("001", "user1","comment1");
+      client.savepoint("001", "user1", "comment1");
 
       client.restoreToInstant("001", false);
       // remove completed restore instant from timeline to mimic pending 
restore.
@@ -1035,6 +1033,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   /**
    * When records getting inserted are deleted in the same write batch, hudi 
should have deleted those records and
    * not be available in read path.
+   *
    * @throws Exception
    */
   @ParameterizedTest
@@ -1099,7 +1098,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   private void 
assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>> 
expectedPartitionPathRecKeyPairs,
-      List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+                                                                    
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
     // verify all partitionpath, record key matches
     assertEquals(expectedPartitionPathRecKeyPairs.size(), 
actualPartitionPathRecKeyPairs.size());
     for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
@@ -1146,7 +1145,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     String commitTime2 = "002";
     List<List<FileSlice>> firstInsertFileSlicesList = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
         .map(fileGroup -> 
fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
-    List<FileSlice>[] fileSlices = 
(List<FileSlice>[])firstInsertFileSlicesList.toArray(new 
List[firstInsertFileSlicesList.size()]);
+    List<FileSlice>[] fileSlices = (List<FileSlice>[]) 
firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]);
     createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
 
     // 3. insert one record with no updating reject exception, and not merge 
the small file, just generate a new file group
@@ -1164,7 +1163,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     String assertMsg = String.format("Not allowed to update the clustering 
files in partition: %s "
         + "For pending clustering operations, we are not going to support 
update for now.", testPartitionPath);
     assertThrows(HoodieUpsertException.class, () -> {
-      client.upsert(jsc.parallelize(insertsAndUpdates3, 1), 
commitTime4).collect(); }, assertMsg);
+      client.upsert(jsc.parallelize(insertsAndUpdates3, 1), 
commitTime4).collect();
+    }, assertMsg);
 
     // 5. insert one record with no updating reject exception, will merge the 
small file
     String commitTime5 = "005";
@@ -1479,7 +1479,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
       List<String> filesFromReplaceCommit = new ArrayList<>();
       replaceCommitMetadata.getPartitionToWriteStats()
-              .forEach((k,v) -> v.forEach(entry -> 
filesFromReplaceCommit.add(entry.getPath())));
+          .forEach((k, v) -> v.forEach(entry -> 
filesFromReplaceCommit.add(entry.getPath())));
 
       // find all parquet files created as part of clustering. Verify it 
matches w/ what is found in replace commit metadata.
       FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + 
partitionPath));
@@ -1490,7 +1490,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   @Test
-  public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() 
throws Exception {
+  public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline() 
throws Exception {
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
         .build();
@@ -1728,13 +1728,13 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     }
   }
 
-  private List<HoodieRecord>  testInsertAndClustering(HoodieClusteringConfig 
clusteringConfig, boolean populateMetaFields, boolean completeClustering) 
throws Exception {
+  private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig 
clusteringConfig, boolean populateMetaFields, boolean completeClustering) 
throws Exception {
     return testInsertAndClustering(clusteringConfig, populateMetaFields, 
completeClustering, false, "", "", "");
   }
-  
+
   private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig 
clusteringConfig, boolean populateMetaFields,
-                                       boolean completeClustering, boolean 
assertSameFileIds, String validatorClasses,
-                                       String sqlQueryForEqualityValidation, 
String sqlQueryForSingleResultValidation) throws Exception {
+                                                     boolean 
completeClustering, boolean assertSameFileIds, String validatorClasses,
+                                                     String 
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws 
Exception {
     Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> 
allRecords = testInsertTwoBatches(populateMetaFields);
     testClustering(clusteringConfig, populateMetaFields, completeClustering, 
assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, 
sqlQueryForSingleResultValidation, allRecords);
     return allRecords.getLeft().getLeft();
@@ -1805,7 +1805,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     assertEquals(0, fileIdIntersection.size());
     return Pair.of(Pair.of(Stream.concat(records1.stream(), 
records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, 
commitTime2)), fileIdsUnion);
   }
-  
+
   private void testClustering(HoodieClusteringConfig clusteringConfig, boolean 
populateMetaFields, boolean completeClustering, boolean assertSameFileIds,
                               String validatorClasses, String 
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
                               Pair<Pair<List<HoodieRecord>, List<String>>, 
Set<HoodieFileGroupId>> allRecords) throws IOException {
@@ -1817,7 +1817,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         performClustering(clusteringConfig, populateMetaFields, 
completeClustering, validatorClasses, sqlQueryForEqualityValidation, 
sqlQueryForSingleResultValidation, allRecords.getLeft());
     if (assertSameFileIds) {
       Set<HoodieFileGroupId> replacedFileIds = 
clusterMetadata.getWriteStats().get().stream()
-          .map(s -> new 
HoodieFileGroupId(s.getPartitionPath(),s.getFileId())).collect(Collectors.toSet());
+          .map(s -> new HoodieFileGroupId(s.getPartitionPath(), 
s.getFileId())).collect(Collectors.toSet());
       Set<HoodieFileGroupId> insertedFileIds = allRecords.getRight();
       assertEquals(insertedFileIds, replacedFileIds);
     }
@@ -1827,7 +1827,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       verifyRecordsWritten(clusteringCommitTime, populateMetaFields, 
allRecords.getLeft().getLeft(), clusterMetadata.getWriteStatuses().collect(), 
config);
     }
   }
-  
+
   private HoodieWriteMetadata<JavaRDD<WriteStatus>> 
performClustering(HoodieClusteringConfig clusteringConfig,
                                                                       boolean 
populateMetaFields,
                                                                       boolean 
completeClustering,
@@ -1839,7 +1839,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         
.withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation)
         
.withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation)
         .build();
-    
+
     HoodieWriteConfig config = getConfigBuilder().withAutoCommit(false)
         .withPreCommitValidatorConfig(validatorConfig)
         .withProps(populateMetaFields ? new Properties() : 
getPropertiesForKeyGen())
@@ -1894,10 +1894,10 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   /**
-   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
-   *  2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of 
records.
-   *
-   *  Verify that all records in step1 are overwritten
+   * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+   * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of 
records.
+   * <p>
+   * Verify that all records in step1 are overwritten
    */
   private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, 
int batch2RecordsCount, boolean populateMetaFields) throws Exception {
     final String testPartitionPath = "americas";
@@ -1932,7 +1932,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   /**
-   * Test scenario of writing fewer file groups for first partition than 
second an third partition.
+   * Test scenario of writing fewer file groups for first partition than 
second and third partition.
    */
   @ParameterizedTest
   @MethodSource("populateMetaFieldsParams")
@@ -1950,7 +1950,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   /**
-   * Test scenario of writing more file groups for first partition than second 
an third partition.
+   * Test scenario of writing more file groups for first partition than second 
and third partition.
    */
   @ParameterizedTest
   @MethodSource("populateMetaFieldsParams")
@@ -1979,12 +1979,11 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
   }
 
   /**
-   *  1) Do write1 (upsert) with 'batch1RecordsCount' number of records for 
first partition.
-   *  2) Do write2 (upsert) with 'batch2RecordsCount' number of records for 
second partition.
-   *  3) Do write3 (upsert) with 'batch3RecordsCount' number of records for 
third partition.
-   *  4) delete first partition and check result.
-   *  5) delete second and third partition and check result.
-   *
+   * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for 
first partition.
+   * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for 
second partition.
+   * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for 
third partition.
+   * 4) delete first partition and check result.
+   * 5) delete second and third partition and check result.
    */
   private void verifyDeletePartitionsHandling(int batch1RecordsCount, int 
batch2RecordsCount, int batch3RecordsCount,
                                               boolean populateMetaFields) 
throws Exception {
@@ -2111,7 +2110,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   private Pair<Set<String>, List<HoodieRecord>> testUpdates(String 
instantTime, SparkRDDWriteClient client,
-      int sizeToInsertAndUpdate, int expectedTotalRecords)
+                                                            int 
sizeToInsertAndUpdate, int expectedTotalRecords)
       throws IOException {
     client.startCommitWithTime(instantTime);
     List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, 
sizeToInsertAndUpdate);
@@ -2136,7 +2135,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   private void testDeletes(SparkRDDWriteClient client, List<HoodieRecord> 
previousRecords, int sizeToDelete,
-      String existingFile, String instantTime, int expectedRecords, 
List<String> keys) {
+                           String existingFile, String instantTime, int 
expectedRecords, List<String> keys) {
     client.startCommitWithTime(instantTime);
 
     List<HoodieKey> hoodieKeysToDelete = 
randomSelectAsHoodieKeys(previousRecords, sizeToDelete);
@@ -2407,25 +2406,25 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
 
     // Perform 2 failed writes to table
     writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
     // Perform 1 successful write
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, true);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, true);
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
 
     assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
-            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
+        CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
     
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 
2);
     
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 2);
     // Await till enough time passes such that the first 2 failed commits 
heartbeats are expired
@@ -2437,36 +2436,36 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     // Perform 1 successful write
     writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, true);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, true);
     client.clean();
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
     if (cleaningPolicy.isLazy()) {
       assertTrue(
           timeline
-                  
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
-                  .countInstants()
+              .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
+              .countInstants()
               == 2);
       // Since we write rollbacks not clean, there should be no clean action 
on the timeline
       assertTrue(
           timeline
-                  
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
-                  .countInstants()
+              .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
+              .countInstants()
               == 0);
       
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 3);
     } else if (cleaningPolicy.isNever()) {
       // never will get translated to Lazy if OCC is enabled.
       assertTrue(
-              timeline
-                      
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
-                      .countInstants()
-                      == 2);
+          timeline
+              .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
+              .countInstants()
+              == 2);
       // There should be no clean or rollback action on the timeline
       assertTrue(
-              timeline
-                      
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
-                      .countInstants()
-                      == 0);
+          timeline
+              .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
+              .countInstants()
+              == 0);
       
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 3);
     }
   }
@@ -2484,21 +2483,21 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
 
     // Perform 1 failed writes to table
     writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     // Toggle cleaning policy to LAZY
     cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
     // Perform 2 failed writes to table
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     // Await till enough time passes such that the 2 failed commits heartbeats 
are expired
     boolean conditionMet = false;
@@ -2509,17 +2508,17 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     client.clean();
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
     assertTrue(timeline.getTimelineOfActions(
-            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+        CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
     // Perform 2 failed commits
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
     writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 300,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 300,
+        0, false);
     client.close();
     // Toggle cleaning policy to EAGER
     cleaningPolicy = EAGER;
@@ -2528,7 +2527,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     timeline = metaClient.getActiveTimeline().reload();
     // since OCC is enabled, hudi auto flips the cleaningPolicy to Lazy.
     assertTrue(timeline.getTimelineOfActions(
-            CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+        CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
     
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
 == 1);
   }
 
@@ -2545,20 +2544,20 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
 
     // Perform 2 failed writes to table
     writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 100,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 100,
+        0, false);
     client.close();
     client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig(cleaningPolicy, true));
     writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
-            100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, 
false, 100, 100,
-            0, false);
+        100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 
100, 100,
+        0, false);
     client.close();
     // refresh data generator to delete records generated from failed commits
     dataGen = new HoodieTestDataGenerator();
     // Create a successful commit
     Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-            "400", "300", Option.of(Arrays.asList("400")), "300", 100, 
dataGen::generateInserts,
-            SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
+        "400", "300", Option.of(Arrays.asList("400")), "300", 100, 
dataGen::generateInserts,
+        SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
     commit3.get();
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
 
@@ -2574,8 +2573,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       Thread.sleep(2000);
     }
     Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)),
-            "500", "400", Option.of(Arrays.asList("500")), "500", 100, 
dataGen::generateInserts,
-            SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
+        "500", "400", Option.of(Arrays.asList("500")), "500", 100, 
dataGen::generateInserts,
+        SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
     Future<HoodieCleanMetadata> clean1 = service.submit(() -> new 
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, 
true)).clean());
     commit4.get();
     clean1.get();
@@ -2626,8 +2625,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         .create(partitionPath,
             FSUtils.makeBaseFileName(instantTime, "1-0-1", 
UUID.randomUUID().toString()),
             IOType.MERGE);
-    LOG.info("Created a dummy marker path=" + markerFilePath.get());
-
     if (!enableOptimisticConsistencyGuard) {
       Exception e = assertThrows(HoodieCommitException.class, () -> {
         client.commit(instantTime, result);
@@ -2686,7 +2683,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
     HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
         .withLockProvider(FileSystemBasedLockProviderTestClass.class)
-        .withConflictResolutionStrategy(new 
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+        .withConflictResolutionStrategy(new 
PreferWriterConflictResolutionStrategy())
         .build();
     HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder()
         
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build();
@@ -2702,7 +2699,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     int numRecords = 200;
     String firstCommit = HoodieActiveTimeline.createNewInstantTime();
     String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[]{partitionStr});
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
     writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
         numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
         1, true);
@@ -2754,7 +2751,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
     HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
         .withLockProvider(FileSystemBasedLockProviderTestClass.class)
-        .withConflictResolutionStrategy(new 
IngestionPrimaryWriterBasedConflictResolutionStrategy())
+        .withConflictResolutionStrategy(new 
PreferWriterConflictResolutionStrategy())
         .build();
     HoodieCleanConfig cleanConfig = HoodieCleanConfig.newBuilder()
         
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build();
@@ -2770,7 +2767,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     int numRecords = 200;
     String firstCommit = HoodieActiveTimeline.createNewInstantTime();
     String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[]{partitionStr});
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionStr});
     writeBatch(client, firstCommit, "000", Option.of(Arrays.asList("000")), 
"000",
         numRecords, dataGenerator::generateInserts, 
SparkRDDWriteClient::insert, true, numRecords, numRecords,
         1, true);
@@ -2868,7 +2865,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, 
String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts,
-      boolean populateMetaFields, Properties props) {
+                                                      boolean 
populateMetaFields, Properties props) {
     HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
     if (!populateMetaFields) {
       
builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.SIMPLE).build());
@@ -2937,7 +2934,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     }
   }
 
-  public static class WriteClientBrokenClustering<T extends 
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T>   {
+  public static class WriteClientBrokenClustering<T extends 
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {
 
     public WriteClientBrokenClustering(HoodieEngineContext context, 
HoodieWriteConfig clientConfig) {
       super(context, clientConfig);
@@ -2951,6 +2948,4 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     }
 
   }
-
-  public static  String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
 }

Reply via email to