This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 98af701675e [HUDI-7452] Repartition row dataset in S3/GCS based on
task size (#10777)
98af701675e is described below
commit 98af701675e5a3bd6cfee045fc7b979e9ae1ff46
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Feb 29 09:01:10 2024 +0530
[HUDI-7452] Repartition row dataset in S3/GCS based on task size (#10777)
---
.../helpers/CloudObjectsSelectorCommon.java | 15 ++++++++++--
.../helpers/TestCloudObjectsSelectorCommon.java | 27 ++++++++++++++++++++--
.../partitioned/country=IND/state=TS/data.json | 1 +
.../data/partitioned/country=US/state=TX/data.json | 1 +
4 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 750d619258e..5ed7dcae897 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -18,7 +18,6 @@
package org.apache.hudi.utilities.sources.helpers;
-import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
@@ -33,6 +32,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -204,7 +204,6 @@ public class CloudObjectsSelectorCommon {
} else {
dataset = reader.load(paths.toArray(new
String[cloudObjectMetadata.size()]));
}
- dataset = dataset.coalesce(numPartitions);
// add partition column from source path if configured
if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) {
@@ -216,9 +215,21 @@ public class CloudObjectsSelectorCommon {
dataset = dataset.withColumn(partitionKey,
split(split(input_file_name(), partitionPathPattern).getItem(1),
"/").getItem(0));
}
}
+ dataset = coalesceOrRepartition(dataset, numPartitions);
return Option.of(dataset);
}
+ private static Dataset<Row> coalesceOrRepartition(Dataset dataset, int
numPartitions) {
+ int existingNumPartitions = dataset.rdd().getNumPartitions();
+ LOG.info(String.format("existing number of partitions=%d, required number
of partitions=%d", existingNumPartitions, numPartitions));
+ if (existingNumPartitions < numPartitions) {
+ dataset = dataset.repartition(numPartitions);
+ } else {
+ dataset = dataset.coalesce(numPartitions);
+ }
+ return dataset;
+ }
+
public static Option<Dataset<Row>> loadAsDataset(SparkSession spark,
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String
fileFormat) {
return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat,
Option.empty());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index b4b6507e074..b97e2fa80a0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -21,18 +21,19 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-
import org.apache.spark.sql.RowFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness {
@@ -104,4 +105,26 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
Row expected = RowFactory.create("some data", null);
Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
}
+
+ @Test
+ public void loadDatasetWithSchemaAndRepartition() {
+ TypedProperties props = new TypedProperties();
+
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+ String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ // Setting this config so that dataset repartition happens inside
`loadAsDataset`
+ props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1000),
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json",
1000),
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
1000)
+ );
+ Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)));
+ Assertions.assertTrue(result.isPresent());
+ List<Row> expected = Arrays.asList(RowFactory.create("some data", "US",
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some
data", "IND", "TS"));
+ List<Row> actual = result.get().collectAsList();
+ Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+ }
}
diff --git
a/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
new file mode 100644
index 00000000000..9fb29b4dcf4
--- /dev/null
+++
b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
@@ -0,0 +1 @@
+{"data": "some data"}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
new file mode 100644
index 00000000000..9fb29b4dcf4
--- /dev/null
+++
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
@@ -0,0 +1 @@
+{"data": "some data"}
\ No newline at end of file