This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9a3fcc8456796add306f6d3d2756afadf830f41a Author: Jon Vexler <[email protected]> AuthorDate: Wed Sep 28 17:12:27 2022 -0400 [HUDI-4734] Deltastreamer table config change validation (#6753) Co-authored-by: sivabalan <[email protected]> --- .../deltastreamer/HoodieDeltaStreamer.java | 5 ++ .../functional/TestHoodieDeltaStreamer.java | 58 ++++++++++++++++------ 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 867aa05b30..74cb3e31df 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.async.HoodieAsyncService; @@ -76,6 +77,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -651,6 +653,9 @@ public class HoodieDeltaStreamer implements Serializable { + cfg.baseFileFormat); cfg.baseFileFormat = baseFileFormat; this.cfg.baseFileFormat = baseFileFormat; + Map<String,String> propsToValidate = new HashMap<>(); + properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString())); + HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig()); } else { tableType = HoodieTableType.valueOf(cfg.tableType); if (cfg.baseFileFormat == null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index d94ff1477a..12c4c6fe0e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -611,25 +611,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); // No new data => no commits. cfg.sourceLimit = 0; - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); // upsert() #1 cfg.sourceLimit = 2000; cfg.operation = WriteOperationType.UPSERT; - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); + syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2); List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -663,6 +654,43 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertTrue(fieldNames.containsAll(expectedFieldNames)); } + @Test + public void testModifiedTableConfigs() throws Exception { + String tableBasePath = dfsBasePath + "/test_table_modified_configs"; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); + + // No new data => no commits. + cfg.sourceLimit = 0; + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); + + // add disallowed config update to recordkey field. An exception should be thrown + cfg.sourceLimit = 2000; + cfg.operation = WriteOperationType.UPSERT; + cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval"); + assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1)); + List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + + //perform the upsert and now with the original config, the commit should go through + HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + newCfg.sourceLimit = 2000; + newCfg.operation = WriteOperationType.UPSERT; + syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2); + List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum()); + } + + private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception { + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext); + TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits); + } + @ParameterizedTest @MethodSource("schemaEvolArgs") public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception { @@ -1418,7 +1446,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { @Test public void testPayloadClassUpdate() throws Exception { - String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; + String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, "MERGE_ON_READ"); @@ -1572,6 +1600,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { populateCommonProps(parquetProps, dfsBasePath); } + parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server", "false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); @@ -2122,7 +2152,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION. TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); } - + void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception { // Initial insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
