This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9259287d926 [MINOR] Fix default config values if not specified (#9625)
9259287d926 is described below

commit 9259287d92634cacc8f5a5ea78d8e8ef27ac0e45
Author: voonhous <[email protected]>
AuthorDate: Fri Sep 22 03:17:17 2023 +0800

    [MINOR] Fix default config values if not specified (#9625)
    
    The default values for the configs below are incorrect:
    
    1. hoodie.datasource.write.row.writer.enable
    2. hoodie.clustering.preserve.commit.metadata (getPreserveHoodieMetadata)
    
    The default values are not loaded from `#defaultVal` as the configurations 
are defined in a module-scope that is inaccessible by the current scope. This 
is why config keys are defined as string here.
    
    This commit fixes these inconsistencies first. Subsequent refactoring might 
be required to move these config-keys to a scope that is accessible by all 
other (relevant) modules.
    
    **Note:** The existing test coverage does not cover clustering performed 
using the RowWriter API. Only RDD API is included as of now.
    
    Co-authored-by: voon <[email protected]>
---
 .../MultipleSparkJobExecutionStrategy.java         |  4 +--
 .../hudi/client/TestHoodieClientMultiWriter.java   |  9 ++++-
 .../TestMultiWriterWithPreferWriterIngestion.java  |  2 ++
 .../functional/TestHoodieBackedMetadata.java       | 18 +++++++++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 41 +++++++++++++++++-----
 .../TestCopyOnWriteRollbackActionExecutor.java     |  6 +++-
 .../TestSparkConsistentBucketClustering.java       |  4 +++
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  6 ++++
 .../offlinejob/TestHoodieClusteringJob.java        |  1 +
 9 files changed, 77 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index ccdad98fc8e..e2f06de00ea 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -108,7 +108,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime) {
     JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
-    boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
+    boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
     ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
         Math.min(clusteringPlan.getInputGroups().size(), 
writeConfig.getClusteringMaxParallelism()),
         new CustomizedThreadFactory("clustering-job-group", true));
@@ -117,7 +117,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
       Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
               clusteringPlan.getInputGroups().stream()
                   .map(inputGroup -> {
-                    if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
+                    if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 true)) {
                       return runClusteringForGroupAsyncAsRow(inputGroup,
                           clusteringPlan.getStrategy().getStrategyParams(),
                           shouldPreserveMetadata,
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index e26be8c09a6..7b3e6a80ae3 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -448,6 +448,11 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     if (tableType == HoodieTableType.MERGE_ON_READ) {
       setUpMORTestTable();
     }
+
+    // Use RDD API to perform clustering (TODO: Fix row-writer API)
+    Properties properties = new Properties();
+    properties.put("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
+
     // Disabling embedded timeline server, it doesn't work with multiwriter
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -466,7 +471,9 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
         
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
             .withConflictResolutionStrategy(resolutionStrategy)
-            .build()).withAutoCommit(false).withProperties(lockProperties);
+            .build()).withAutoCommit(false).withProperties(lockProperties)
+        .withProperties(properties);
+
     Set<String> validInstants = new HashSet<>();
 
     // Create the first commit with inserts
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 59547cd5b63..bebacd2afaf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -201,6 +201,8 @@ public class TestMultiWriterWithPreferWriterIngestion 
extends HoodieClientTestBa
       setUpMORTestTable();
     }
     Properties properties = new Properties();
+    // Use RDD API to perform clustering (TODO: Fix row-writer API)
+    properties.put("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
     HoodieWriteConfig cfg = getConfigBuilder()
         
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 089a452304d..b1b3b001312 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -3089,6 +3089,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
   public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, 
HoodieIndex.IndexType indexType,
                                                     
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
+    Properties properties = getDisabledRowWriterProperties();
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
         .withParallelism(2, 
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
@@ -3102,7 +3103,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withEnableBackupForRemoteFileSystemView(false) // Fail test if 
problem connecting to timeline-server
             .withRemoteServerPort(timelineServicePort)
-            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+            
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build())
+        .withProperties(properties);
   }
 
   @Test
@@ -3135,6 +3137,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         .withClusteringConfig(HoodieClusteringConfig.newBuilder()
             .withInlineClusteringNumCommits(0)
             .build())
+        .withProperties(getDisabledRowWriterProperties())
         .build();
     SparkRDDWriteClient clusteringClient = 
getHoodieWriteClient(clusterWriteCfg);
     clusteringClient.scheduleTableService("0000003", Option.empty(), 
TableServiceType.CLUSTER);
@@ -3193,6 +3196,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         .withClusteringConfig(HoodieClusteringConfig.newBuilder()
             .withInlineClusteringNumCommits(0)
             .build())
+        .withProperties(getDisabledRowWriterProperties())
         .build();
     SparkRDDWriteClient clusteringClient = 
getHoodieWriteClient(clusterWriteCfg);
     clusteringClient.scheduleTableService("0000003", Option.empty(), 
TableServiceType.CLUSTER);
@@ -3565,4 +3569,16 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   protected HoodieTableType getTableType() {
     return tableType;
   }
+
+  /**
+   * Disabling row writer here as clustering tests will throw the error below 
if it is used.
+   * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException
+   * TODO: Fix this and increase test coverage to include clustering via row 
writers
+   * @return
+   */
+  private static Properties getDisabledRowWriterProperties() {
+    Properties properties = new Properties();
+    properties.setProperty("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
+    return properties;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ccab8a004aa..2094ced2cdd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1460,6 +1460,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties())
         .build();
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
@@ -1471,7 +1472,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     // Trigger clustering
     HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
-        
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
+        
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2)
+            .fromProperties(getDisabledRowWriterProperties()).build());
     try (SparkRDDWriteClient client = 
getHoodieWriteClient(cfgBuilder.build())) {
       int numRecords = 200;
       String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1504,6 +1506,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline() 
throws Exception {
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties())
         .build();
     // trigger clustering, but do not complete
     testInsertAndClustering(clusteringConfig, true, false, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1576,6 +1579,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : 
"_row_key")
         
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties())
         .build();
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
@@ -1589,6 +1593,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
         
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
+        .fromProperties(getDisabledRowWriterProperties())
         .build();
     // note that assertSameFileIds is true for this test because of the plan 
and execution strategy
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, 
SqlQueryEqualityPreCommitValidator.class.getName(), 
COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1599,7 +1604,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     boolean populateMetaFields = true;
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties()).build();
 
     // start clustering, but don't commit
     List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, 
populateMetaFields, false);
@@ -1660,7 +1666,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         .withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
         
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
         .withRollbackPendingClustering(rollbackPendingClustering)
-        .withInlineClustering(true).withInlineClusteringNumCommits(1).build();
+        .withInlineClustering(true).withInlineClusteringNumCommits(1)
+        .fromProperties(getDisabledRowWriterProperties()).build();
 
     // start clustering, but don't commit keep it inflight
     List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, 
true, false);
@@ -1692,7 +1699,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         
.withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true)
-        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
+        .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
+        .fromProperties(getDisabledRowWriterProperties()).build();
     try {
       testInsertAndClustering(clusteringConfig, true, true, false, 
FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
       fail("expected pre-commit clustering validation to fail");
@@ -1705,7 +1713,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void testClusteringInvalidConfigForSqlQueryValidator() throws 
Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties()).build();
     try {
       testInsertAndClustering(clusteringConfig, false, true, false, 
SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
       fail("expected pre-commit clustering validation to fail because sql 
query is not configured");
@@ -1718,7 +1727,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() 
throws Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties()).build();
 
     testInsertAndClustering(clusteringConfig, false, true, false, 
SqlQuerySingleResultPreCommitValidator.class.getName(),
         "", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
@@ -1728,7 +1738,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   public void 
testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws 
Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+        
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+        .fromProperties(getDisabledRowWriterProperties()).build();
 
     try {
       testInsertAndClustering(clusteringConfig, false, true, false, 
SqlQuerySingleResultPreCommitValidator.class.getName(),
@@ -2689,7 +2700,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   @Test
   public void testClusteringCommitInPresenceOfInflightCommit() throws 
Exception {
-    Properties properties = new Properties();
+    Properties properties = getDisabledRowWriterProperties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
     HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
         .withLockProvider(FileSystemBasedLockProviderTestClass.class)
@@ -2757,7 +2768,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   @Test
   public void testIngestionCommitInPresenceOfCompletedClusteringCommit() 
throws Exception {
-    Properties properties = new Properties();
+    Properties properties = getDisabledRowWriterProperties();
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + 
"/.hoodie/.locks");
     HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
         .withLockProvider(FileSystemBasedLockProviderTestClass.class)
@@ -2958,4 +2969,16 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     }
 
   }
+
+  /**
+   * Disabling row writer here as clustering tests will throw the error below 
if it is used.
+   * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException
+   * TODO: Fix this and increase test coverage to include clustering via row 
writers
+   * @return
+   */
+  private static Properties getDisabledRowWriterProperties() {
+    Properties properties = new Properties();
+    properties.setProperty("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
+    return properties;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 07dc831578c..ca881308fc5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -59,6 +59,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -429,7 +430,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
         2, true);
 
     // Create completed clustering commit
-    SparkRDDWriteClient clusteringClient = 
getHoodieWriteClient(ClusteringTestUtils.getClusteringConfig(basePath));
+    Properties properties = new Properties();
+    properties.put("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
+    SparkRDDWriteClient clusteringClient = getHoodieWriteClient(
+        ClusteringTestUtils.getClusteringConfig(basePath, 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, properties));
 
     // Save an older instant for us to run clustering.
     String clusteringInstant1 = HoodieActiveTimeline.createNewInstantTime();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 940f5db9023..ff3dcd47c32 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -230,6 +230,8 @@ public class TestSparkConsistentBucketClustering extends 
HoodieSparkClientTestHa
     } else {
       options.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), 
sortColumn);
     }
+    // TODO: row writer does not support sort for consistent hashing index
+    options.put("hoodie.datasource.write.row.writer.enable", 
String.valueOf(false));
     setup(128 * 1024 * 1024, options);
 
     writeData(HoodieActiveTimeline.createNewInstantTime(), 500, true);
@@ -254,6 +256,8 @@ public class TestSparkConsistentBucketClustering extends 
HoodieSparkClientTestHa
       throw new HoodieException("Cannot get comparator: unsupported data type, 
" + field.schema().getType());
     }
 
+    // Note: If row writer is used, it will throw: 
https://github.com/apache/hudi/issues/8838
+    // Use #readRecords() instead if row-writer is used in the future
     for (RecordReader recordReader: readers) {
       Object key = recordReader.createKey();
       ArrayWritable writable = (ArrayWritable) recordReader.createValue();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 96d888f5f07..5b86f3fc90a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -783,6 +783,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
@@ -800,6 +801,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     // assert ingest successful
@@ -885,6 +887,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
     cfg.configs.add(String.format("%s=%s", 
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
     cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
@@ -1011,6 +1014,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     if (retryLastFailedClusteringJob != null) {
       config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
     }
+    config.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     return config;
   }
 
@@ -1125,6 +1129,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
@@ -1160,6 +1165,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "2"));
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       // when pending clustering overlaps w/ incoming, incoming batch will 
fail and hence will result in rollback.
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index b02ef677d64..6fc86558e22 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -99,6 +99,7 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
 
   private HoodieClusteringJob init(String tableBasePath, boolean runSchedule, 
String scheduleAndExecute, boolean isAutoClean) {
     HoodieClusteringJob.Config clusterConfig = 
buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute, 
isAutoClean);
+    clusterConfig.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     return new HoodieClusteringJob(jsc, clusterConfig);
   }
 

Reply via email to