This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 558281ed430 [MINOR] Disable reader for test with enum (#10061)
558281ed430 is described below
commit 558281ed4303756ad7a00331e1568dbb107f8571
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Nov 10 19:42:45 2023 -0500
[MINOR] Disable reader for test with enum (#10061)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/utilities/sources/HoodieIncrSource.java | 20 ++++++++++++++++++++
.../hudi/utilities/sources/TestHoodieIncrSource.java | 3 +++
2 files changed, 23 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index fa316cf806f..aafd4c9e3b5 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -19,9 +19,11 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -38,6 +40,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
@@ -58,6 +63,10 @@ import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHoll
public class HoodieIncrSource extends RowSource {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieIncrSource.class);
+ public static final Set<String> HOODIE_INCR_SOURCE_READ_OPT_KEYS =
+ CollectionUtils.createImmutableSet(
+ "hoodie.datasource.read.use.new.parquet.file.format",
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key());
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;
public static class Config {
@@ -128,10 +137,19 @@ public class HoodieIncrSource extends RowSource {
HoodieIncrSourceConfig.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE.defaultValue();
}
+ private final Map<String, String> readOpts = new HashMap<>();
+
public HoodieIncrSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
+ for (Object key : props.keySet()) {
+ String keyString = key.toString();
+ if (HOODIE_INCR_SOURCE_READ_OPT_KEYS.contains(keyString)) {
+ readOpts.put(keyString, props.getString(key.toString()));
+ }
+ }
+
this.snapshotLoadQuerySplitter =
Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
null))
.map(className -> (SnapshotLoadQuerySplitter)
ReflectionUtils.loadClass(className,
new Class<?>[] {TypedProperties.class}, props));
@@ -181,6 +199,7 @@ public class HoodieIncrSource extends RowSource {
// Do Incr pull. Set end instant if available
if (queryInfo.isIncremental()) {
source = sparkSession.read().format("org.apache.hudi")
+ .options(readOpts)
.option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
.option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
@@ -192,6 +211,7 @@ public class HoodieIncrSource extends RowSource {
} else {
// if checkpoint is missing from source table, and if strategy is set to
READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
+ .options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(srcPath);
if (snapshotLoadQuerySplitter.isPresent()) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index d35041592aa..1b534c22c7e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
@@ -332,6 +333,8 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
+ // TODO: [HUDI-7081] get rid of this
+ properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
snapshotCheckPointImplClassOpt.map(className ->
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME,
className));
TypedProperties typedProperties = new TypedProperties(properties);