This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit db7136f5bc3e3ad682d606e5597adfff206a38b8 Author: Vinish Reddy <[email protected]> AuthorDate: Wed May 15 06:37:25 2024 -0700 [HUDI-7523] Add HOODIE_SPARK_DATASOURCE_OPTIONS to be used in HoodieIncrSource (#10900) Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: Sagar Sumit <[email protected]> --- .../org/apache/hudi/common/util/ConfigUtils.java | 17 +++++- .../apache/hudi/common/util/TestConfigUtils.java | 66 ++++++++++++++++------ .../utilities/config/HoodieIncrSourceConfig.java | 8 +++ .../hudi/utilities/sources/HoodieIncrSource.java | 17 +++++- .../utilities/sources/TestHoodieIncrSource.java | 39 ++++++++++++- 5 files changed, 122 insertions(+), 25 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java index 3866069d437..3426477d90d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java @@ -98,7 +98,7 @@ public class ConfigUtils { } /** - * Convert the key-value config to a map.The format of the config + * Convert the key-value config to a map. The format of the config * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3". * * @param keyValueConfig Key-value configs in properties format, i.e., multiple lines of @@ -106,10 +106,23 @@ public class ConfigUtils { * @return A {@link Map} of key-value configs. */ public static Map<String, String> toMap(String keyValueConfig) { + return toMap(keyValueConfig, "\n"); + } + + /** + * Convert the key-value config to a map. The format of the config is a key-value pair + * with defined separator. For example, if the separator is a comma, the input is + * "k1=v1,k2=v2,k3=v3". + * + * @param keyValueConfig key-value configs in properties format, with defined separator. + * @param separator the separator. + * @return A {@link Map} of key-value configs. + */ + public static Map<String, String> toMap(String keyValueConfig, String separator) { if (StringUtils.isNullOrEmpty(keyValueConfig)) { return new HashMap<>(); } - String[] keyvalues = keyValueConfig.split("\n"); + String[] keyvalues = keyValueConfig.split(separator); Map<String, String> tableProperties = new HashMap<>(); for (String keyValue : keyvalues) { // Handle multiple new lines and lines that contain only spaces after splitting diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java index 5728dd8d36c..3742c961a7d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java @@ -21,10 +21,15 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.config.ConfigProperty; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -36,43 +41,68 @@ public class TestConfigUtils { .withAlternatives("hudi.test.boolean.config") .markAdvanced() .withDocumentation("Testing boolean config."); - - @Test - public void testToMapSucceeds() { + + private static Stream<Arguments> separatorArgs() { + List<Option<String>> separatorList = new ArrayList<>(); + separatorList.add(Option.empty()); + separatorList.add(Option.of("\n")); + separatorList.add(Option.of(",")); + return separatorList.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("separatorArgs") + public void testToMapSucceeds(Option<String> separator) { + String sepString = separator.isPresent() ? separator.get() : "\n"; Map<String, String> expectedMap = new HashMap<>(); expectedMap.put("k.1.1.2", "v1"); expectedMap.put("k.2.1.2", "v2"); expectedMap.put("k.3.1.2", "v3"); // Test base case - String srcKv = "k.1.1.2=v1\nk.2.1.2=v2\nk.3.1.2=v3"; - Map<String, String> outMap = ConfigUtils.toMap(srcKv); + String srcKv = String.format( + "k.1.1.2=v1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString); + Map<String, String> outMap = toMap(srcKv, separator); assertEquals(expectedMap, outMap); // Test ends with new line - srcKv = "k.1.1.2=v1\nk.2.1.2=v2\nk.3.1.2=v3\n"; - outMap = ConfigUtils.toMap(srcKv); + srcKv = String.format( + "k.1.1.2=v1%sk.2.1.2=v2%sk.3.1.2=v3%s", sepString, sepString, sepString); + outMap = toMap(srcKv, separator); assertEquals(expectedMap, outMap); // Test delimited by multiple new lines - srcKv = "k.1.1.2=v1\nk.2.1.2=v2\n\nk.3.1.2=v3"; - outMap = ConfigUtils.toMap(srcKv); + srcKv = String.format( + "k.1.1.2=v1%sk.2.1.2=v2%s%sk.3.1.2=v3", sepString, sepString, sepString); + outMap = toMap(srcKv, separator); assertEquals(expectedMap, outMap); // Test delimited by multiple new lines with spaces in between - srcKv = "k.1.1.2=v1\n \nk.2.1.2=v2\n\nk.3.1.2=v3"; - outMap = ConfigUtils.toMap(srcKv); + srcKv = String.format( + "k.1.1.2=v1%s %sk.2.1.2=v2%s%sk.3.1.2=v3", sepString, sepString, sepString, sepString); + outMap = toMap(srcKv, separator); assertEquals(expectedMap, outMap); // Test with random spaces if trim works properly - srcKv = " k.1.1.2 = v1\n k.2.1.2 = v2 \nk.3.1.2 = v3"; - outMap = ConfigUtils.toMap(srcKv); + srcKv = String.format( + " k.1.1.2 = v1%s k.2.1.2 = v2 %sk.3.1.2 = v3", sepString, sepString); + outMap = toMap(srcKv, separator); assertEquals(expectedMap, outMap); } - @Test - public void testToMapThrowError() { - String srcKv = "k.1.1.2=v1=v1.1\nk.2.1.2=v2\nk.3.1.2=v3"; - assertThrows(IllegalArgumentException.class, () -> ConfigUtils.toMap(srcKv)); + @ParameterizedTest + @MethodSource("separatorArgs") + public void testToMapThrowError(Option<String> separator) { + String sepString = separator.isPresent() ? separator.get() : "\n"; + String srcKv = String.format( + "k.1.1.2=v1=v1.1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString); + assertThrows(IllegalArgumentException.class, () -> toMap(srcKv, separator)); + } + + private Map<String, String> toMap(String config, Option<String> separator) { + if (separator.isEmpty()) { + return ConfigUtils.toMap(config); + } + return ConfigUtils.toMap(config, separator.get()); } } \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java index 63da2358e02..648af1c7615 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java @@ -101,4 +101,12 @@ public class HoodieIncrSourceConfig extends HoodieConfig { .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.hoodieincr.partition.extractor.class") .markAdvanced() .withDocumentation("PartitionValueExtractor class to extract partition fields from _hoodie_partition_path"); + + public static final ConfigProperty<String> HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "source.hoodieincr.data.datasource.options") + .noDefaultValue() + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("A comma-separated list of Hudi options that can be passed to the spark dataframe reader of a hudi table, " + + "eg: `hoodie.metadata.enable=true,hoodie.enable.data.skipping=true`. Used only for incremental source."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index eecab298840..768e4c3c3fc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; @@ -30,13 +31,17 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.streamer.StreamContext; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME; import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME; @@ -172,10 +177,18 @@ public class HoodieIncrSource extends RowSource { return Pair.of(Option.empty(), queryInfo.getEndInstant()); } + DataFrameReader reader = sparkSession.read().format("hudi"); + String datasourceOpts = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS, true); + if (!StringUtils.isNullOrEmpty(datasourceOpts)) { + Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(",")) + .map(option -> Pair.of(option.split("=")[0], option.split("=")[1])) + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + reader = reader.options(optionsMap); + } Dataset<Row> source; // Do Incr pull. Set end instant if available if (queryInfo.isIncremental()) { - source = sparkSession.read().format("org.apache.hudi") + source = reader .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL()) .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant()) .option(END_INSTANTTIME().key(), queryInfo.getEndInstant()) @@ -186,7 +199,7 @@ public class HoodieIncrSource extends RowSource { .load(srcPath); } else { // if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query - Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi") + Dataset<Row> snapshot = reader .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) .load(srcPath); if (snapshotLoadQuerySplitter.isPresent()) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index c1e7f9dca49..319aa8540a4 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl; @@ -294,7 +295,7 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { Option.empty(), 100, dataBatches.get(0).getKey(), - Option.of(TestSnapshotQuerySplitterImpl.class.getName())); + Option.of(TestSnapshotQuerySplitterImpl.class.getName()), new TypedProperties()); // The pending tables services should not block the incremental pulls // Reads everything up to latest @@ -327,8 +328,40 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { } } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(storageConf(), basePath()); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withScheduleInlineCompaction(true) + .withMaxNumDeltaCommitsBeforeCompaction(1) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) + .withMetadataIndexColumnStats(true) + .withColumnStatsIndexForColumns("_hoodie_commit_time") + .build()) + .build(); + + TypedProperties extraProps = new TypedProperties(); + extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS.key(), "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true"); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { + Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, INSERT, null, "100"); + Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, INSERT, null, "200"); + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 100, + inserts.getKey(), + Option.of(TestSnapshotQuerySplitterImpl.class.getName()), extraProps); + } + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, - String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt) { + String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt, TypedProperties extraProps) { Properties properties = new Properties(); properties.setProperty("hoodie.streamer.source.hoodieincr.path", basePath()); @@ -351,7 +384,7 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) { - readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, Option.empty()); + readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, Option.empty(), new TypedProperties()); } private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient,
