This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3913dcad6a8 [HUDI-7098] Add max bytes per partition with cloud stores
source in DS (#10100)
3913dcad6a8 is described below
commit 3913dcad6a8ac840b5b6709cae3f7a10eb1cfc45
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat Nov 18 23:50:37 2023 -0500
[HUDI-7098] Add max bytes per partition with cloud stores source in DS
(#10100)
---
.../apache/hudi/utilities/config/CloudSourceConfig.java | 16 ++++++++++++++++
.../sources/helpers/CloudObjectsSelectorCommon.java | 11 +++++++----
.../sources/helpers/CloudStoreIngestionConfig.java | 7 -------
.../hudi/utilities/sources/helpers/QueryRunner.java | 8 +++++++-
4 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index 16d9b73c70e..e7b44cf9121 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -121,4 +121,20 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("A comma delimited list of path-based partition
fields in the source file structure.");
+ public static final ConfigProperty<Boolean>
SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.cloud.data.reader.comma.separated.path.format")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("0.14.1")
+ .withDocumentation("Boolean value for specifying path format in load
args of spark.read.format(\"..\").load(\"a.xml,b.xml,c.xml\"),\n"
+ + " * set true if path format needs to be comma separated string
value, if false it's passed as array of strings like\n"
+ + " * spark.read.format(\"..\").load(new
String[]{a.xml,b.xml,c.xml})");
+
+ public static final ConfigProperty<String> SOURCE_MAX_BYTES_PER_PARTITION =
ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.partition.max.size")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("0.14.1")
+ .withDocumentation("specify this value in bytes, to coalesce partitions
of source dataset not greater than specified limit");
+
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 19da6aada9b..4098448b793 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -57,7 +57,8 @@ import static
org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
-import static
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
+import static
org.apache.hudi.utilities.config.CloudSourceConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
import static org.apache.spark.sql.functions.input_file_name;
import static org.apache.spark.sql.functions.split;
@@ -191,9 +192,11 @@ public class CloudObjectsSelectorCommon {
}
// inflate 10% for potential hoodie meta fields
totalSize *= 1.1;
- long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(),
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
- int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
- boolean isCommaSeparatedPathFormat =
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false);
+ // if source bytes are provided, then give preference to that.
+ long bytesPerPartition =
props.containsKey(SOURCE_MAX_BYTES_PER_PARTITION.key()) ?
props.getLong(SOURCE_MAX_BYTES_PER_PARTITION.key()) :
+ props.getLong(PARQUET_MAX_FILE_SIZE.key(),
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+ int numPartitions = (int) Math.max(Math.ceil(totalSize /
bytesPerPartition), 1);
+ boolean isCommaSeparatedPathFormat =
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT.key(),
false);
Dataset<Row> dataset;
if (isCommaSeparatedPathFormat) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
index 66b94177b7b..8a1c15c8886 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
@@ -107,11 +107,4 @@ public class CloudStoreIngestionConfig {
* A comma delimited list of path-based partition fields in the source file
structure
*/
public static final String PATH_BASED_PARTITION_FIELDS =
"hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";
-
- /**
- * boolean value for specifying path format in load args of
spark.read.format("..").load("a.xml,b.xml,c.xml"),
- * set true if path format needs to be comma separated string value, if
false it's passed as array of strings like
- * spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml})
- */
- public static final String
SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT =
"hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format";
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
index 761e942549c..597c0195f5e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java
@@ -44,12 +44,14 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
*/
public class QueryRunner {
private final SparkSession sparkSession;
+ private final TypedProperties props;
private final String sourcePath;
private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
public QueryRunner(SparkSession sparkSession, TypedProperties props) {
this.sparkSession = sparkSession;
+ this.props = props;
checkRequiredConfigProperties(props,
Collections.singletonList(HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH));
this.sourcePath = getStringWithAltKeys(props,
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
}
@@ -85,7 +87,11 @@ public class QueryRunner {
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(),
queryInfo.getPreviousInstant())
- .option(DataSourceReadOptions.END_INSTANTTIME().key(),
queryInfo.getEndInstant()).load(sourcePath);
+ .option(DataSourceReadOptions.END_INSTANTTIME().key(),
queryInfo.getEndInstant())
+
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
+
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
+
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
+ .load(sourcePath);
}
public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {