codope commented on code in PR #10900:
URL: https://github.com/apache/hudi/pull/10900#discussion_r1534235492
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -189,10 +193,18 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
+ DataFrameReader reader = sparkSession.read().format("org.apache.hudi");
+ String datasourceOpts = getStringWithAltKeys(props,
HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS, true);
+ if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
+ Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(","))
+ .map(option -> Pair.of(option.split("=")[0], option.split("=")[1]))
+ .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
Review Comment:
Can we reuse `ConfigUtils.toMap` in some way?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java:
##########
@@ -333,14 +334,47 @@ public void
testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableTy
}
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType
tableType) throws IOException {
+ this.tableType = tableType;
+ metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+ HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10,
12).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+ .withScheduleInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+ .withMetadataIndexColumnStats(true)
+ .withColumnStatsIndexForColumns("_hoodie_commit_time")
+ .build())
+ .build();
+
+ TypedProperties extraProps = new TypedProperties();
+
extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS.key(),
"hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
Review Comment:
Is there also a test which actually confirms that metadata table is used and
data skipping happens when reading incrementally?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.hoodieincr.partition.extractor.class")
.markAdvanced()
.withDocumentation("PartitionValueExtractor class to extract partition
fields from _hoodie_partition_path");
+
+ public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.hoodieincr.data.datasource.options")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("A comma separate list of options that can be passed
to the spark dataframe reader of a hudi table, "
+ + "eg:
hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
Review Comment:
```suggestion
.withDocumentation("A comma-separated list of Hudi options that can be
passed to the spark dataframe reader of a hudi table, "
+ "eg:
`hoodie.metadata.enable=true,hoodie.enable.data.skipping=true`. Used only for
incremental source.");
```
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.hoodieincr.partition.extractor.class")
.markAdvanced()
.withDocumentation("PartitionValueExtractor class to extract partition
fields from _hoodie_partition_path");
+
+ public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS =
ConfigProperty
Review Comment:
Let's rename to `HOODIE_INCREMENTAL_SPARK_DATASOURCE_OPTIONS` to make it
more explicit like the config key.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -189,10 +193,18 @@ public Pair<Option<Dataset<Row>>, String>
fetchNextBatch(Option<String> lastCkpt
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}
+ DataFrameReader reader = sparkSession.read().format("org.apache.hudi");
Review Comment:
```suggestion
DataFrameReader reader = sparkSession.read().format("hudi");
```
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieIncrSourceConfig.java:
##########
@@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.hoodieincr.partition.extractor.class")
.markAdvanced()
.withDocumentation("PartitionValueExtractor class to extract partition
fields from _hoodie_partition_path");
+
+ public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.hoodieincr.data.datasource.options")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("A comma separate list of options that can be passed
to the spark dataframe reader of a hudi table, "
+ + "eg:
hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
Review Comment:
Btw, if this is a useful config for other sources too, then move to
`HoodieReaderConfig` and rename config variable and key accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]