This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9259287d926 [MINOR] Fix default config values if not specified (#9625)
9259287d926 is described below
commit 9259287d92634cacc8f5a5ea78d8e8ef27ac0e45
Author: voonhous <[email protected]>
AuthorDate: Fri Sep 22 03:17:17 2023 +0800
[MINOR] Fix default config values if not specified (#9625)
The default values for the configs below are incorrect:
1. hoodie.datasource.write.row.writer.enable
2. hoodie.clustering.preserve.commit.metadata (getPreserveHoodieMetadata)
The default values are not loaded from `#defaultVal` as the configurations
are defined in a module-scope that is inaccessible by the current scope. This
is why config keys are defined as string here.
This commit fixes these inconsistencies first. Subsequent refactoring might
be required to move these config-keys to a scope that is accessible by all
other (relevant) modules.
**Note:** The existing test coverage does not cover clustering performed
using the RowWriter API. Only RDD API is included as of now.
Co-authored-by: voon <[email protected]>
---
.../MultipleSparkJobExecutionStrategy.java | 4 +--
.../hudi/client/TestHoodieClientMultiWriter.java | 9 ++++-
.../TestMultiWriterWithPreferWriterIngestion.java | 2 ++
.../functional/TestHoodieBackedMetadata.java | 18 +++++++++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 41 +++++++++++++++++-----
.../TestCopyOnWriteRollbackActionExecutor.java | 6 +++-
.../TestSparkConsistentBucketClustering.java | 4 +++
.../deltastreamer/TestHoodieDeltaStreamer.java | 6 ++++
.../offlinejob/TestHoodieClusteringJob.java | 1 +
9 files changed, 77 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index ccdad98fc8e..e2f06de00ea 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -108,7 +108,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final
HoodieClusteringPlan clusteringPlan, final Schema schema, final String
instantTime) {
JavaSparkContext engineContext =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
- boolean shouldPreserveMetadata =
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
+ boolean shouldPreserveMetadata =
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
Math.min(clusteringPlan.getInputGroups().size(),
writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
@@ -117,7 +117,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
- if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
false)) {
+ if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
true)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index e26be8c09a6..7b3e6a80ae3 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -448,6 +448,11 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
+
+ // Use RDD API to perform clustering (TODO: Fix row-writer API)
+ Properties properties = new Properties();
+ properties.put("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
+
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -466,7 +471,9 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
.withConflictResolutionStrategy(resolutionStrategy)
- .build()).withAutoCommit(false).withProperties(lockProperties);
+ .build()).withAutoCommit(false).withProperties(lockProperties)
+ .withProperties(properties);
+
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 59547cd5b63..bebacd2afaf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -201,6 +201,8 @@ public class TestMultiWriterWithPreferWriterIngestion
extends HoodieClientTestBa
setUpMORTestTable();
}
Properties properties = new Properties();
+ // Use RDD API to perform clustering (TODO: Fix row-writer API)
+ properties.put("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 089a452304d..b1b3b001312 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -3089,6 +3089,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr,
HoodieIndex.IndexType indexType,
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
+ Properties properties = getDisabledRowWriterProperties();
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
.withParallelism(2,
2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
@@ -3102,7 +3103,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withEnableBackupForRemoteFileSystemView(false) // Fail test if
problem connecting to timeline-server
.withRemoteServerPort(timelineServicePort)
-
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build())
+ .withProperties(properties);
}
@Test
@@ -3135,6 +3137,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClusteringNumCommits(0)
.build())
+ .withProperties(getDisabledRowWriterProperties())
.build();
SparkRDDWriteClient clusteringClient =
getHoodieWriteClient(clusterWriteCfg);
clusteringClient.scheduleTableService("0000003", Option.empty(),
TableServiceType.CLUSTER);
@@ -3193,6 +3196,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClusteringNumCommits(0)
.build())
+ .withProperties(getDisabledRowWriterProperties())
.build();
SparkRDDWriteClient clusteringClient =
getHoodieWriteClient(clusterWriteCfg);
clusteringClient.scheduleTableService("0000003", Option.empty(),
TableServiceType.CLUSTER);
@@ -3565,4 +3569,16 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
protected HoodieTableType getTableType() {
return tableType;
}
+
+ /**
+ * Disabling row writer here as clustering tests will throw the error below
if it is used.
+ * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException
+ * TODO: Fix this and increase test coverage to include clustering via row
writers
+ * @return
+ */
+ private static Properties getDisabledRowWriterProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
+ return properties;
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ccab8a004aa..2094ced2cdd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1460,6 +1460,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties())
.build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1471,7 +1472,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// Trigger clustering
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
-
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
+
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2)
+ .fromProperties(getDisabledRowWriterProperties()).build());
try (SparkRDDWriteClient client =
getHoodieWriteClient(cfgBuilder.build())) {
int numRecords = 200;
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1504,6 +1506,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testRollbackOfRegularCommitWithPendingReplaceCommitInTimeline()
throws Exception {
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties())
.build();
// trigger clustering, but do not complete
testInsertAndClustering(clusteringConfig, true, false, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1576,6 +1579,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" :
"_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties())
.build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1589,6 +1593,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
+ .fromProperties(getDisabledRowWriterProperties())
.build();
// note that assertSameFileIds is true for this test because of the plan
and execution strategy
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true,
SqlQueryEqualityPreCommitValidator.class.getName(),
COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1599,7 +1604,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
boolean populateMetaFields = true;
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties()).build();
// start clustering, but don't commit
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig,
populateMetaFields, false);
@@ -1660,7 +1666,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
.withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
.withRollbackPendingClustering(rollbackPendingClustering)
- .withInlineClustering(true).withInlineClusteringNumCommits(1).build();
+ .withInlineClustering(true).withInlineClusteringNumCommits(1)
+ .fromProperties(getDisabledRowWriterProperties()).build();
// start clustering, but don't commit keep it inflight
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig,
true, false);
@@ -1692,7 +1699,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true)
-
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
+ .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
+ .fromProperties(getDisabledRowWriterProperties()).build();
try {
testInsertAndClustering(clusteringConfig, true, true, false,
FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
fail("expected pre-commit clustering validation to fail");
@@ -1705,7 +1713,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testClusteringInvalidConfigForSqlQueryValidator() throws
Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties()).build();
try {
testInsertAndClustering(clusteringConfig, false, true, false,
SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
fail("expected pre-commit clustering validation to fail because sql
query is not configured");
@@ -1718,7 +1727,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator()
throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties()).build();
testInsertAndClustering(clusteringConfig, false, true, false,
SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
@@ -1728,7 +1738,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void
testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws
Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
-
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
+
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
+ .fromProperties(getDisabledRowWriterProperties()).build();
try {
testInsertAndClustering(clusteringConfig, false, true, false,
SqlQuerySingleResultPreCommitValidator.class.getName(),
@@ -2689,7 +2700,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
@Test
public void testClusteringCommitInPresenceOfInflightCommit() throws
Exception {
- Properties properties = new Properties();
+ Properties properties = getDisabledRowWriterProperties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
@@ -2757,7 +2768,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
@Test
public void testIngestionCommitInPresenceOfCompletedClusteringCommit()
throws Exception {
- Properties properties = new Properties();
+ Properties properties = getDisabledRowWriterProperties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath +
"/.hoodie/.locks");
HoodieLockConfig lockConfig = HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
@@ -2958,4 +2969,16 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
}
+
+ /**
+ * Disabling row writer here as clustering tests will throw the error below
if it is used.
+ * java.util.concurrent.CompletionException: java.lang.ClassNotFoundException
+ * TODO: Fix this and increase test coverage to include clustering via row
writers
+ * @return
+ */
+ private static Properties getDisabledRowWriterProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
+ return properties;
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 07dc831578c..ca881308fc5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -59,6 +59,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -429,7 +430,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
2, true);
// Create completed clustering commit
- SparkRDDWriteClient clusteringClient =
getHoodieWriteClient(ClusteringTestUtils.getClusteringConfig(basePath));
+ Properties properties = new Properties();
+ properties.put("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
+ SparkRDDWriteClient clusteringClient = getHoodieWriteClient(
+ ClusteringTestUtils.getClusteringConfig(basePath,
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, properties));
// Save an older instant for us to run clustering.
String clusteringInstant1 = HoodieActiveTimeline.createNewInstantTime();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 940f5db9023..ff3dcd47c32 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -230,6 +230,8 @@ public class TestSparkConsistentBucketClustering extends
HoodieSparkClientTestHa
} else {
options.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(),
sortColumn);
}
+ // TODO: row writer does not support sort for consistent hashing index
+ options.put("hoodie.datasource.write.row.writer.enable",
String.valueOf(false));
setup(128 * 1024 * 1024, options);
writeData(HoodieActiveTimeline.createNewInstantTime(), 500, true);
@@ -254,6 +256,8 @@ public class TestSparkConsistentBucketClustering extends
HoodieSparkClientTestHa
throw new HoodieException("Cannot get comparator: unsupported data type,
" + field.schema().getType());
}
+ // Note: If row writer is used, it will throw:
https://github.com/apache/hudi/issues/8838
+ // Use #readRecords() instead if row-writer is used in the future
for (RecordReader recordReader: readers) {
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 96d888f5f07..5b86f3fc90a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -783,6 +783,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
@@ -800,6 +801,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
@@ -885,6 +887,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add(String.format("%s=%s",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
@@ -1011,6 +1014,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
if (retryLastFailedClusteringJob != null) {
config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
}
+ config.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
return config;
}
@@ -1125,6 +1129,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
@@ -1160,6 +1165,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "2"));
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
// when pending clustering overlaps w/ incoming, incoming batch will
fail and hence will result in rollback.
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index b02ef677d64..6fc86558e22 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -99,6 +99,7 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
private HoodieClusteringJob init(String tableBasePath, boolean runSchedule,
String scheduleAndExecute, boolean isAutoClean) {
HoodieClusteringJob.Config clusterConfig =
buildHoodieClusteringUtilConfig(tableBasePath, runSchedule, scheduleAndExecute,
isAutoClean);
+ clusterConfig.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
return new HoodieClusteringJob(jsc, clusterConfig);
}