This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit eccd183a3d3f5bbf02e32032b49cfbb6aa5201a3
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Feb 29 09:01:10 2024 +0530

    [HUDI-7452] Repartition row dataset in S3/GCS based on task size (#10777)
---
 .../helpers/CloudObjectsSelectorCommon.java        | 15 ++++++++++--
 .../helpers/TestCloudObjectsSelectorCommon.java    | 27 ++++++++++++++++++++--
 .../partitioned/country=IND/state=TS/data.json     |  1 +
 .../data/partitioned/country=US/state=TX/data.json |  1 +
 4 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 750d619258e..5ed7dcae897 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
@@ -33,6 +32,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -204,7 +204,6 @@ public class CloudObjectsSelectorCommon {
     } else {
       dataset = reader.load(paths.toArray(new 
String[cloudObjectMetadata.size()]));
     }
-    dataset = dataset.coalesce(numPartitions);
 
     // add partition column from source path if configured
     if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) {
@@ -216,9 +215,21 @@ public class CloudObjectsSelectorCommon {
         dataset = dataset.withColumn(partitionKey, 
split(split(input_file_name(), partitionPathPattern).getItem(1), 
"/").getItem(0));
       }
     }
+    dataset = coalesceOrRepartition(dataset, numPartitions);
     return Option.of(dataset);
   }
 
+  private static Dataset<Row> coalesceOrRepartition(Dataset dataset, int 
numPartitions) {
+    int existingNumPartitions = dataset.rdd().getNumPartitions();
+    LOG.info(String.format("existing number of partitions=%d, required number 
of partitions=%d", existingNumPartitions, numPartitions));
+    if (existingNumPartitions < numPartitions) {
+      dataset = dataset.repartition(numPartitions);
+    } else {
+      dataset = dataset.coalesce(numPartitions);
+    }
+    return dataset;
+  }
+
   public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObjectMetadata> cloudObjectMetadata, TypedProperties props, String 
fileFormat) {
     return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, 
Option.empty());
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index b4b6507e074..b97e2fa80a0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -21,18 +21,19 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-
 import org.apache.spark.sql.RowFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 
 public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness {
@@ -104,4 +105,26 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     Row expected = RowFactory.create("some data", null);
     Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
   }
+
+  @Test
+  public void loadDatasetWithSchemaAndRepartition() {
+    TypedProperties props = new TypedProperties();
+    
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    // Setting this config so that dataset repartition happens inside 
`loadAsDataset`
+    props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1000),
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/data.json",
 1000),
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
 1000)
+    );
+    Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)));
+    Assertions.assertTrue(result.isPresent());
+    List<Row> expected = Arrays.asList(RowFactory.create("some data", "US", 
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some 
data", "IND", "TS"));
+    List<Row> actual = result.get().collectAsList();
+    Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+  }
 }
diff --git 
a/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
 
b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
new file mode 100644
index 00000000000..9fb29b4dcf4
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/data/partitioned/country=IND/state=TS/data.json
@@ -0,0 +1 @@
+{"data": "some data"}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
 
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
new file mode 100644
index 00000000000..9fb29b4dcf4
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/data.json
@@ -0,0 +1 @@
+{"data": "some data"}
\ No newline at end of file

Reply via email to