This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3913dcad6a8 [HUDI-7098] Add max bytes per partition with cloud stores 
source in DS (#10100)
3913dcad6a8 is described below

commit 3913dcad6a8ac840b5b6709cae3f7a10eb1cfc45
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Nov 18 23:50:37 2023 -0500

    [HUDI-7098] Add max bytes per partition with cloud stores source in DS 
(#10100)
---
 .../apache/hudi/utilities/config/CloudSourceConfig.java  | 16 ++++++++++++++++
 .../sources/helpers/CloudObjectsSelectorCommon.java      | 11 +++++++----
 .../sources/helpers/CloudStoreIngestionConfig.java       |  7 -------
 .../hudi/utilities/sources/helpers/QueryRunner.java      |  8 +++++++-
 4 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index 16d9b73c70e..e7b44cf9121 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -121,4 +121,20 @@ public class CloudSourceConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("A comma delimited list of path-based partition 
fields in the source file structure.");
 
+  public static final ConfigProperty<Boolean> 
SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.cloud.data.reader.comma.separated.path.format")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("0.14.1")
+      .withDocumentation("Boolean value for specifying path format in load 
args of spark.read.format(\"..\").load(\"a.xml,b.xml,c.xml\"),\n"
+          + "   * set true if path format needs to be comma separated string 
value, if false it's passed as array of strings like\n"
+          + "   * spark.read.format(\"..\").load(new 
String[]{a.xml,b.xml,c.xml})");
+
+  public static final ConfigProperty<String> SOURCE_MAX_BYTES_PER_PARTITION = 
ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.partition.max.size")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("0.14.1")
+      .withDocumentation("specify this value in bytes, to coalesce partitions 
of source dataset not greater than specified limit");
+
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 19da6aada9b..4098448b793 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -57,7 +57,8 @@ import static 
org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
-import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
+import static 
org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
 import static org.apache.spark.sql.functions.input_file_name;
 import static org.apache.spark.sql.functions.split;
 
@@ -191,9 +192,11 @@ public class CloudObjectsSelectorCommon {
     }
     // inflate 10% for potential hoodie meta fields
     totalSize *= 1.1;
-    long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
-    int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
-    boolean isCommaSeparatedPathFormat = 
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false);
+    // if source bytes are provided, then give preference to that.
+    long bytesPerPartition = 
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ? 
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
+        props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+    int numPartitions = (int) Math.max(Math.ceil(totalSize / 
bytesPerPartition), 1);
+    boolean isCommaSeparatedPathFormat = 
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(), 
false);
 
     Dataset<Row> dataset;
     if (isCommaSeparatedPathFormat) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
index 66b94177b7b..8a1c15c8886 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
@@ -107,11 +107,4 @@ public class CloudStoreIngestionConfig {
    * A comma delimited list of path-based partition fields in the source file 
structure
    */
   public static final String PATH_BASED_PARTITION_FIELDS = 
"hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";
-
-  /**
-   * boolean value for specifying path format in load args of 
spark.read.format("..").load("a.xml,b.xml,c.xml"),
-   * set true if path format needs to be comma separated string value, if 
false it's passed as array of strings like
-   * spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml})
-   */
-  public static final String 
SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = 
"hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format";
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index 761e942549c..597c0195f5e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -44,12 +44,14 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  */
 public class QueryRunner {
   private final SparkSession sparkSession;
+  private final TypedProperties props;
   private final String sourcePath;
 
   private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
 
   public QueryRunner(SparkSession sparkSession, TypedProperties props) {
     this.sparkSession = sparkSession;
+    this.props = props;
     checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
     this.sourcePath = getStringWithAltKeys(props, 
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
   }
@@ -85,7 +87,11 @@ public class QueryRunner {
     return sparkSession.read().format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE().key(), 
queryInfo.getQueryType())
         .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), 
queryInfo.getPreviousInstant())
-        .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
queryInfo.getEndInstant()).load(sourcePath);
+        .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
queryInfo.getEndInstant())
+        
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
+            
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
+                
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
+        .load(sourcePath);
   }
 
   public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {

Reply via email to