This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit db7136f5bc3e3ad682d606e5597adfff206a38b8
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed May 15 06:37:25 2024 -0700

    [HUDI-7523] Add HOODIE_SPARK_DATASOURCE_OPTIONS to be used in 
HoodieIncrSource (#10900)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../org/apache/hudi/common/util/ConfigUtils.java   | 17 +++++-
 .../apache/hudi/common/util/TestConfigUtils.java   | 66 ++++++++++++++++------
 .../utilities/config/HoodieIncrSourceConfig.java   |  8 +++
 .../hudi/utilities/sources/HoodieIncrSource.java   | 17 +++++-
 .../utilities/sources/TestHoodieIncrSource.java    | 39 ++++++++++++-
 5 files changed, 122 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 3866069d437..3426477d90d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -98,7 +98,7 @@ public class ConfigUtils {
   }
 
   /**
-   * Convert the key-value config to a map.The format of the config
+   * Convert the key-value config to a map.  The format of the config
    * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3".
    *
    * @param keyValueConfig Key-value configs in properties format, i.e., 
multiple lines of
@@ -106,10 +106,23 @@ public class ConfigUtils {
    * @return A {@link Map} of key-value configs.
    */
   public static Map<String, String> toMap(String keyValueConfig) {
+    return toMap(keyValueConfig, "\n");
+  }
+
+  /**
+   * Convert the key-value config to a map. The format of the config is a 
key-value pair
+   * with defined separator.  For example, if the separator is a comma, the 
input is
+   * "k1=v1,k2=v2,k3=v3".
+   *
+   * @param keyValueConfig key-value configs in properties format, with 
defined separator.
+   * @param separator      the separator.
+   * @return A {@link Map} of key-value configs.
+   */
+  public static Map<String, String> toMap(String keyValueConfig, String 
separator) {
     if (StringUtils.isNullOrEmpty(keyValueConfig)) {
       return new HashMap<>();
     }
-    String[] keyvalues = keyValueConfig.split("\n");
+    String[] keyvalues = keyValueConfig.split(separator);
     Map<String, String> tableProperties = new HashMap<>();
     for (String keyValue : keyvalues) {
       // Handle multiple new lines and lines that contain only spaces after 
splitting
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 5728dd8d36c..3742c961a7d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -21,10 +21,15 @@ package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.config.ConfigProperty;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,43 +41,68 @@ public class TestConfigUtils {
       .withAlternatives("hudi.test.boolean.config")
       .markAdvanced()
       .withDocumentation("Testing boolean config.");
-  
-  @Test
-  public void testToMapSucceeds() {
+
+  private static Stream<Arguments> separatorArgs() {
+    List<Option<String>> separatorList = new ArrayList<>();
+    separatorList.add(Option.empty());
+    separatorList.add(Option.of("\n"));
+    separatorList.add(Option.of(","));
+    return separatorList.stream().map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("separatorArgs")
+  public void testToMapSucceeds(Option<String> separator) {
+    String sepString = separator.isPresent() ? separator.get() : "\n";
     Map<String, String> expectedMap = new HashMap<>();
     expectedMap.put("k.1.1.2", "v1");
     expectedMap.put("k.2.1.2", "v2");
     expectedMap.put("k.3.1.2", "v3");
 
     // Test base case
-    String srcKv = "k.1.1.2=v1\nk.2.1.2=v2\nk.3.1.2=v3";
-    Map<String, String> outMap = ConfigUtils.toMap(srcKv);
+    String srcKv = String.format(
+        "k.1.1.2=v1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString);
+    Map<String, String> outMap = toMap(srcKv, separator);
     assertEquals(expectedMap, outMap);
 
     // Test ends with new line
-    srcKv = "k.1.1.2=v1\nk.2.1.2=v2\nk.3.1.2=v3\n";
-    outMap = ConfigUtils.toMap(srcKv);
+    srcKv = String.format(
+        "k.1.1.2=v1%sk.2.1.2=v2%sk.3.1.2=v3%s", sepString, sepString, 
sepString);
+    outMap = toMap(srcKv, separator);
     assertEquals(expectedMap, outMap);
 
     // Test delimited by multiple new lines
-    srcKv = "k.1.1.2=v1\nk.2.1.2=v2\n\nk.3.1.2=v3";
-    outMap = ConfigUtils.toMap(srcKv);
+    srcKv = String.format(
+        "k.1.1.2=v1%sk.2.1.2=v2%s%sk.3.1.2=v3", sepString, sepString, 
sepString);
+    outMap = toMap(srcKv, separator);
     assertEquals(expectedMap, outMap);
 
     // Test delimited by multiple new lines with spaces in between
-    srcKv = "k.1.1.2=v1\n  \nk.2.1.2=v2\n\nk.3.1.2=v3";
-    outMap = ConfigUtils.toMap(srcKv);
+    srcKv = String.format(
+        "k.1.1.2=v1%s  %sk.2.1.2=v2%s%sk.3.1.2=v3", sepString, sepString, 
sepString, sepString);
+    outMap = toMap(srcKv, separator);
     assertEquals(expectedMap, outMap);
 
     // Test with random spaces if trim works properly
-    srcKv = " k.1.1.2 =   v1\n k.2.1.2 = v2 \nk.3.1.2 = v3";
-    outMap = ConfigUtils.toMap(srcKv);
+    srcKv = String.format(
+        " k.1.1.2 =   v1%s k.2.1.2 = v2 %sk.3.1.2 = v3", sepString, sepString);
+    outMap = toMap(srcKv, separator);
     assertEquals(expectedMap, outMap);
   }
 
-  @Test
-  public void testToMapThrowError() {
-    String srcKv = "k.1.1.2=v1=v1.1\nk.2.1.2=v2\nk.3.1.2=v3";
-    assertThrows(IllegalArgumentException.class, () -> 
ConfigUtils.toMap(srcKv));
+  @ParameterizedTest
+  @MethodSource("separatorArgs")
+  public void testToMapThrowError(Option<String> separator) {
+    String sepString = separator.isPresent() ? separator.get() : "\n";
+    String srcKv = String.format(
+        "k.1.1.2=v1=v1.1%sk.2.1.2=v2%sk.3.1.2=v3", sepString, sepString);
+    assertThrows(IllegalArgumentException.class, () -> toMap(srcKv, 
separator));
+  }
+
+  private Map<String, String> toMap(String config, Option<String> separator) {
+    if (separator.isEmpty()) {
+      return ConfigUtils.toMap(config);
+    }
+    return ConfigUtils.toMap(config, separator.get());
   }
 }
\ No newline at end of file
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java
index 63da2358e02..648af1c7615 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java
@@ -101,4 +101,12 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.partition.extractor.class")
       .markAdvanced()
       .withDocumentation("PartitionValueExtractor class to extract partition 
fields from _hoodie_partition_path");
+
+  public static final ConfigProperty<String> 
HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS = ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.data.datasource.options")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("A comma-separated list of Hudi options that can be 
passed to the spark dataframe reader of a hudi table, "
+          + "eg: 
`hoodie.metadata.enable=true,hoodie.enable.data.skipping=true`. Used only for 
incremental source.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index eecab298840..768e4c3c3fc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
@@ -30,13 +31,17 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
 import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
@@ -172,10 +177,18 @@ public class HoodieIncrSource extends RowSource {
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
+    DataFrameReader reader = sparkSession.read().format("hudi");
+    String datasourceOpts = getStringWithAltKeys(props, 
HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS, true);
+    if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
+      Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(","))
+          .map(option -> Pair.of(option.split("=")[0], option.split("=")[1]))
+          .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+      reader = reader.options(optionsMap);
+    }
     Dataset<Row> source;
     // Do Incr pull. Set end instant if available
     if (queryInfo.isIncremental()) {
-      source = sparkSession.read().format("org.apache.hudi")
+      source = reader
           .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
           .option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
           .option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
@@ -186,7 +199,7 @@ public class HoodieIncrSource extends RowSource {
           .load(srcPath);
     } else {
       // if checkpoint is missing from source table, and if strategy is set to 
READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
-      Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
+      Dataset<Row> snapshot = reader
           .option(DataSourceReadOptions.QUERY_TYPE().key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
           .load(srcPath);
       if (snapshotLoadQuerySplitter.isPresent()) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index c1e7f9dca49..319aa8540a4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -40,6 +40,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
@@ -294,7 +295,7 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
           Option.empty(),
           100,
           dataBatches.get(0).getKey(),
-          Option.of(TestSnapshotQuerySplitterImpl.class.getName()));
+          Option.of(TestSnapshotQuerySplitterImpl.class.getName()), new 
TypedProperties());
 
       // The pending tables services should not block the incremental pulls
       // Reads everything up to latest
@@ -327,8 +328,40 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType 
tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(storageConf(), basePath());
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 
12).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withScheduleInlineCompaction(true)
+                .withMaxNumDeltaCommitsBeforeCompaction(1)
+                .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+            .withMetadataIndexColumnStats(true)
+            .withColumnStatsIndexForColumns("_hoodie_commit_time")
+            .build())
+        .build();
+
+    TypedProperties extraProps = new TypedProperties();
+    
extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS.key(),
 "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+      Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, 
INSERT, null, "100");
+      Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, 
INSERT, null, "200");
+      
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.empty(),
+          100,
+          inserts.getKey(),
+          Option.of(TestSnapshotQuerySplitterImpl.class.getName()), 
extraProps);
+    }
+  }
+
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount,
-                             String expectedCheckpoint, Option<String> 
snapshotCheckPointImplClassOpt) {
+                             String expectedCheckpoint, Option<String> 
snapshotCheckPointImplClassOpt, TypedProperties extraProps) {
 
     Properties properties = new Properties();
     properties.setProperty("hoodie.streamer.source.hoodieincr.path", 
basePath());
@@ -351,7 +384,7 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
 
   private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy 
missingCheckpointStrategy, Option<String> checkpointToPull,
                              int expectedCount, String expectedCheckpoint) {
-    readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, 
expectedCheckpoint, Option.empty());
+    readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, 
expectedCheckpoint, Option.empty(), new TypedProperties());
   }
 
   private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient 
writeClient,

Reply via email to