codope commented on code in PR #10900:
URL: https://github.com/apache/hudi/pull/10900#discussion_r1534235492


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -189,10 +193,18 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
+    DataFrameReader reader = sparkSession.read().format("org.apache.hudi");
+    String datasourceOpts = getStringWithAltKeys(props, 
HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS, true);
+    if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
+      Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(","))
+          .map(option -> Pair.of(option.split("=")[0], option.split("=")[1]))
+          .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

Review Comment:
   Can we reuse `ConfigUtils.toMap` in some way?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java:
##########
@@ -333,14 +334,47 @@ public void 
testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableTy
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType 
tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 
12).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withScheduleInlineCompaction(true)
+                .withMaxNumDeltaCommitsBeforeCompaction(1)
+                .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+            .withMetadataIndexColumnStats(true)
+            .withColumnStatsIndexForColumns("_hoodie_commit_time")
+            .build())
+        .build();
+
+    TypedProperties extraProps = new TypedProperties();
+    
extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS.key(),
 "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");

Review Comment:
   Is there also a test which actually confirms that metadata table is used and 
data skipping happens when reading incrementally?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.partition.extractor.class")
       .markAdvanced()
       .withDocumentation("PartitionValueExtractor class to extract partition 
fields from _hoodie_partition_path");
+
+  public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.data.datasource.options")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("A comma separate list of options that can be passed 
to the spark dataframe reader of a hudi table, "
+          + "eg: 
hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");

Review Comment:
   ```suggestion
         .withDocumentation("A comma-separated list of Hudi options that can be 
passed to the spark dataframe reader of a hudi table, "
             + "eg: 
`hoodie.metadata.enable=true,hoodie.enable.data.skipping=true`. Used only for 
incremental source.");
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.partition.extractor.class")
       .markAdvanced()
       .withDocumentation("PartitionValueExtractor class to extract partition 
fields from _hoodie_partition_path");
+
+  public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS = 
ConfigProperty

Review Comment:
   Let's rename to `HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS` to make it 
more explicit like the config key.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -189,10 +193,18 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
       return Pair.of(Option.empty(), queryInfo.getEndInstant());
     }
 
+    DataFrameReader reader = sparkSession.read().format("org.apache.hudi");

Review Comment:
   ```suggestion
       DataFrameReader reader = sparkSession.read().format("hudi");
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
       .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.partition.extractor.class")
       .markAdvanced()
       .withDocumentation("PartitionValueExtractor class to extract partition 
fields from _hoodie_partition_path");
+
+  public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.hoodieincr.data.datasource.options")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("A comma separate list of options that can be passed 
to the spark dataframe reader of a hudi table, "
+          + "eg: 
hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");

Review Comment:
   Btw, if this is a useful config for other sources too, then move to 
`HoodieReaderConfig` and rename config variable and key accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to