This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit eccd183a3d3f5bbf02e32032b49cfbb6aa5201a3 Author: Vinish Reddy <[email protected]> AuthorDate: Thu Feb 29 09:01:10 2024 +0530 [HUDI-7452] Repartition row dataset in S3/GCS based on task size (#10777) --- .../helpers/CloudObjectsSelectorCommon.java | 15 ++++++++++-- .../helpers/TestCloudObjectsSelectorCommon.java | 27 ++++++++++++++++++++-- .../partitioned/country=IND/state=TS/data.json | 1 + .../data/partitioned/country=US/state=TX/data.json | 1 + 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 750d619258e..5ed7dcae897 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -18,7 +18,6 @@ package org.apache.hudi.utilities.sources.helpers; -import org.apache.avro.Schema; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; @@ -33,6 +32,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -204,7 +204,6 @@ public class CloudObjectsSelectorCommon { } else { dataset = reader.load(paths.toArray(new String[cloudObjectMetadata.size()])); } - dataset = dataset.coalesce(numPartitions); // add partition column from source path if configured if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) { @@ -216,9 +215,21 @@ public class CloudObjectsSelectorCommon { dataset = dataset.withColumn(partitionKey, split(split(input_file_name(), partitionPathPattern).getItem(1), "/").getItem(0)); } } + dataset = coalesceOrRepartition(dataset, numPartitions); return Option.of(dataset); } + private static Dataset<Row> coalesceOrRepartition(Dataset dataset, int numPartitions) { + int existingNumPartitions = dataset.rdd().getNumPartitions(); + LOG.info(String.format("existing number of partitions=%d, required number of partitions=%d", existingNumPartitions, numPartitions)); + if (existingNumPartitions < numPartitions) { + dataset = dataset.repartition(numPartitions); + } else { + dataset = dataset.coalesce(numPartitions); + } + return dataset; + } + public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String fileFormat) { return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, Option.empty()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index b4b6507e074..b97e2fa80a0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -21,18 +21,19 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; - import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; - import org.apache.spark.sql.RowFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness { @@ -104,4 +105,26 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness Row expected = RowFactory.create("some data", null); Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); } + + @Test + public void loadDatasetWithSchemaAndRepartition() { + TypedProperties props = new TypedProperties(); + TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc"); + String schemaFilePath = TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath(); + props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + props.put("hoodie.deltastreamer.schema.provider.class.name", FilebasedSchemaProvider.class.getName()); + props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + // Setting this config so that dataset repartition happens inside `loadAsDataset` + props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1"); + List<CloudObjectMetadata> input = Arrays.asList( + new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1000), + new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json", 1000), + new CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json", 1000) + ); + Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", Option.of(new FilebasedSchemaProvider(props, jsc))); + Assertions.assertTrue(result.isPresent()); + List<Row> expected = Arrays.asList(RowFactory.create("some data", "US", "CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some data", "IND", "TS")); + List<Row> actual = result.get().collectAsList(); + Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + } } diff --git a/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json new file mode 100644 index 00000000000..9fb29b4dcf4 --- /dev/null +++ b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json @@ -0,0 +1 @@ +{"data": "some data"} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json new file mode 100644 index 00000000000..9fb29b4dcf4 --- /dev/null +++ b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json @@ -0,0 +1 @@ +{"data": "some data"} \ No newline at end of file
