This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ca5d4685a00 [HUDI-7300] Merge schema in ParuqetDFSSource (#10199)
ca5d4685a00 is described below

commit ca5d4685a002a3b3da917f6b195e27dcb20d7316
Author: Rohit Mittapalli <[email protected]>
AuthorDate: Tue Jan 16 17:52:07 2024 -0800

    [HUDI-7300] Merge schema in ParuqetDFSSource (#10199)
---
 .../utilities/config/ParquetDFSSourceConfig.java   | 49 ++++++++++++++++++++++
 .../hudi/utilities/sources/ParquetDFSSource.java   |  6 ++-
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
new file mode 100644
index 00000000000..b3bf5678baf
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ParquetDFSSourceConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static 
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
+/**
+ * Parquet DFS Source Configs
+ */
+@Immutable
+@ConfigClassProperty(name = "Parquet DFS Source Configs",
+        groupName = ConfigGroups.Names.HUDI_STREAMER,
+        subGroupName = ConfigGroups.SubGroupNames.DELTA_STREAMER_SOURCE,
+        description = "Configurations controlling the behavior of Parquet DFS 
source in Hudi Streamer.")
+public class ParquetDFSSourceConfig extends HoodieConfig {
+
+  public static final ConfigProperty<Boolean> PARQUET_DFS_MERGE_SCHEMA = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.parquet.dfs.merge_schema.enable")
+      .defaultValue(false)
+      .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + 
"source.parquet.dfs.merge_schema.enable")
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Merge schema across parquet files within a single 
write");
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index a56a878f1fe..a3ee555ec5a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.ParquetDFSSourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
@@ -29,6 +30,8 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+
 /**
  * DFS Source that reads parquet data.
  */
@@ -52,6 +55,7 @@ public class ParquetDFSSource extends RowSource {
   }
 
   private Dataset<Row> fromFiles(String pathStr) {
-    return sparkSession.read().parquet(pathStr.split(","));
+    boolean mergeSchemaOption = getBooleanWithAltKeys(this.props, 
ParquetDFSSourceConfig.PARQUET_DFS_MERGE_SCHEMA);
+    return sparkSession.read().option("mergeSchema", 
mergeSchemaOption).parquet(pathStr.split(","));
   }
 }

Reply via email to