This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e0f4c0f9f [HUDI-5321] Prepend partition path for custom partitioner 
(#8831)
a7e0f4c0f9f is described below

commit a7e0f4c0f9f7e5e4b41205e1e764d406ce578607
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon May 29 14:08:40 2023 +0800

    [HUDI-5321] Prepend partition path for custom partitioner (#8831)
    
    Co-authored-by: Jon Vexler <[email protected]>
---
 .../apache/hudi/table/BulkInsertPartitioner.java   | 30 ++++++++++-
 .../hudi/table/TestBulkInsertPartitioner.java      | 59 ++++++++++++++++++++++
 .../run/strategy/JavaExecutionStrategy.java        |  3 +-
 .../JavaCustomColumnsSortPartitioner.java          |  9 ++--
 .../TestJavaBulkInsertInternalPartitioner.java     |  7 ++-
 .../MultipleSparkJobExecutionStrategy.java         |  5 +-
 .../RDDCustomColumnsSortPartitioner.java           | 10 ++--
 .../RowCustomColumnsSortPartitioner.java           | 14 ++---
 .../SpatialCurveSortPartitionerBase.java           |  7 ++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  2 +-
 .../TestBulkInsertInternalPartitioner.java         | 19 +++----
 .../TestBulkInsertInternalPartitionerForRows.java  |  7 +--
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  2 +-
 13 files changed, 139 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 4239e47234e..6f1efeebf17 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -19,11 +19,20 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
 /**
  * Partitions the input records for bulk insert operation.
@@ -47,7 +56,7 @@ public interface BulkInsertPartitioner<I> extends 
Serializable {
   I repartitionRecords(I records, int outputPartitions);
 
   /**
-   * @return {@code true} if the records within a partition are sorted; {@code 
false} otherwise.
+   * @return {@code true} if the records are sorted by partition-path; {@code 
false} otherwise.
    */
   boolean arePartitionRecordsSorted();
 
@@ -72,4 +81,23 @@ public interface BulkInsertPartitioner<I> extends 
Serializable {
     return Option.empty();
   }
 
+  /*
+   * If possible, we want to sort the data by partition path. Doing so will 
reduce the number of files written.
+   * This will not change the desired sort order, it is just a performance 
improvement.
+   **/
+  static String[] tryPrependPartitionPathColumns(String[] columnNames, 
HoodieWriteConfig config) {
+    String partitionPath;
+    if (config.populateMetaFields()) {
+      partitionPath = 
HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName();
+    } else {
+      partitionPath = 
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+    }
+    if (isNullOrEmpty(partitionPath)) {
+      return columnNames;
+    }
+    Set<String> sortCols = new 
LinkedHashSet<>(StringUtils.split(partitionPath, ","));
+    sortCols.addAll(Arrays.asList(columnNames));
+    return sortCols.toArray(new String[0]);
+  }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
new file mode 100644
index 00000000000..376a944d873
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+public class TestBulkInsertPartitioner {
+
+  private static Stream<Arguments> argsForTryPrependPartitionColumns() {
+    return Stream.of(
+        Arguments.of(Arrays.asList("_hoodie_partition_path", "col1", 
"col2").toArray(), Arrays.asList("col1", "col2").toArray(), true, "pt"),
+        Arguments.of(Arrays.asList("_hoodie_partition_path", "col1", 
"col2").toArray(), Arrays.asList("col1", "_hoodie_partition_path", 
"col2").toArray(), true, "pt"),
+        Arguments.of(Arrays.asList("col1", "col2").toArray(), 
Arrays.asList("col1", "col2").toArray(), false, ""),
+        Arguments.of(Arrays.asList("pt1", "col1", "col2").toArray(), 
Arrays.asList("col1", "col2").toArray(), false, "pt1"),
+        Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(), 
Arrays.asList("col1", "col2").toArray(), false, "pt1,pt2"),
+        Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(), 
Arrays.asList("col1", "pt1", "col2").toArray(), false, "pt1,pt2")
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("argsForTryPrependPartitionColumns")
+  public void testTryPrependPartitionColumns(String[] expectedSortColumns, 
String[] sortColumns, boolean populateMetaField, String partitionColumnName) {
+    Properties props = new Properties();
+    props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
partitionColumnName);
+    props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
String.valueOf(populateMetaField));
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/").withProperties(props).build();
+    assertArrayEquals(expectedSortColumns, 
BulkInsertPartitioner.tryPrependPartitionPathColumns(sortColumns, writeConfig));
+  }
+
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index a063ad4654b..dcd88b083fc 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -122,8 +122,7 @@ public abstract class JavaExecutionStrategy<T>
     if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
       return new JavaCustomColumnsSortPartitioner(
           strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
-          HoodieAvroUtils.addMetadataFields(schema),
-          getWriteConfig().isConsistentLogicalTimestampEnabled());
+          HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
     } else {
       return 
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index 642cad4e21e..ea0f5247250 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -22,6 +22,7 @@ package org.apache.hudi.execution.bulkinsert;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
@@ -30,6 +31,8 @@ import org.apache.avro.Schema;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
 /**
  * A partitioner that does sorting based on specified column values for Java 
client.
  *
@@ -42,10 +45,10 @@ public class JavaCustomColumnsSortPartitioner<T>
   private final Schema schema;
   private final boolean consistentLogicalTimestampEnabled;
 
-  public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
boolean consistentLogicalTimestampEnabled) {
-    this.sortColumnNames = columnNames;
+  public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
+    this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
     this.schema = schema;
-    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+    this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
index b85565e5027..13600a96f13 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -25,6 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.FlatLists;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
 
@@ -65,8 +67,11 @@ public class TestJavaBulkInsertInternalPartitioner extends 
HoodieJavaClientTestH
         getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, 
sortColumns);
 
     List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
+    HoodieWriteConfig cfg = 
HoodieWriteConfig.newBuilder().withPath("basePath").build();
+    cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME, 
"partition_path");
+    
cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED,
 "false");
     testBulkInsertInternalPartitioner(
-        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, cfg),
         records, true, generatePartitionNumRecords(records), 
Option.of(columnComparator));
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 9210cd5eac6..540da42fd78 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -202,9 +202,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
               getWriteConfig().getLayoutOptimizationCurveBuildMethod(), 
HoodieAvroUtils.addMetadataFields(schema), recordType);
         case LINEAR:
           return isRowPartitioner
-              ? new RowCustomColumnsSortPartitioner(orderByColumns)
-              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
-              getWriteConfig().isConsistentLogicalTimestampEnabled());
+              ? new RowCustomColumnsSortPartitioner(orderByColumns, 
getWriteConfig())
+              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
         default:
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 3d8c21a75d2..7c0ffac28d3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -29,6 +29,8 @@ import org.apache.spark.api.java.JavaRDD;
 
 import java.util.Arrays;
 
+import static 
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
 /**
  * A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on 
partition path column and custom columns.
  *
@@ -44,14 +46,14 @@ public class RDDCustomColumnsSortPartitioner<T>
 
   public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
     this.serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
-    this.sortColumnNames = getSortColumnName(config);
+    this.sortColumnNames = 
tryPrependPartitionPathColumns(getSortColumnName(config), config);
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
 
-  public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
boolean consistentLogicalTimestampEnabled) {
-    this.sortColumnNames = columnNames;
+  public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
+    this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
     this.serializableSchema = new SerializableSchema(schema);
-    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+    this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
index a61e6fd4101..db9cc4d4fbd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
@@ -18,15 +18,17 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 import java.util.Arrays;
 
+import static 
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
 /**
  * A partitioner that globally sorts a {@link Dataset<Row>} based on partition 
path column and custom columns.
  *
@@ -38,17 +40,17 @@ public class RowCustomColumnsSortPartitioner implements 
BulkInsertPartitioner<Da
   private final String[] sortColumnNames;
 
   public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.sortColumnNames = getSortColumnName(config);
+    this.sortColumnNames = 
tryPrependPartitionPathColumns(getSortColumnName(config), config);
   }
 
-  public RowCustomColumnsSortPartitioner(String[] columnNames) {
-    this.sortColumnNames = columnNames;
+  public RowCustomColumnsSortPartitioner(String[] columnNames, 
HoodieWriteConfig config) {
+    this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputSparkPartitions) {
-    final String[] sortColumns = this.sortColumnNames;
-    return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
sortColumns)
+    return records
+        
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
         .coalesce(outputSparkPartitions);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
index 96048f2782b..0f602737be1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.execution.bulkinsert;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.sort.SpaceCurveSortingHelper;
 import org.apache.hudi.table.BulkInsertPartitioner;
+
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -76,8 +77,12 @@ public abstract class SpatialCurveSortPartitionerBase<T> 
implements BulkInsertPa
     }
   }
 
+  /**
+   * The data is sorted using a function that maps multiple columns into a 
single dimension.
+   * Therefore, it is not sorted by partition.
+   */
   @Override
   public boolean arePartitionRecordsSorted() {
-    return true;
+    return false;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index f55a64d2bec..6890bc50786 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -929,7 +929,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       client.startCommitWithTime(commitTime1);
       List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
       JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
-      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[]{"rider"}, 
HoodieTestDataGenerator.AVRO_SCHEMA, false);
+      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[]{"rider"}, 
HoodieTestDataGenerator.AVRO_SCHEMA, config);
       List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1, 
commitTime1, Option.of(partitioner)).collect();
       assertNoWriteErrors(statuses);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index c740fb38afc..b59a420379e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -212,23 +212,24 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
   @Test
   public void testCustomColumnSortPartitioner() {
     String sortColumnString = "begin_lat";
+    HoodieWriteConfig config = HoodieWriteConfig
+        .newBuilder()
+        .withPath("/")
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+        .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+        .build();
     String[] sortColumns = sortColumnString.split(",");
     Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = 
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
 
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
-    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, config),
         records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), 
true);
-    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, config),
         records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), 
true);
 
-    HoodieWriteConfig config = HoodieWriteConfig
-        .newBuilder()
-        .withPath("/")
-        .withSchema(TRIP_EXAMPLE_SCHEMA)
-        
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
-        .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
-        .build();
+
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
         records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), 
true);
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index 03fc7dbca8c..9283dd6883a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -131,15 +131,16 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     String[] sortColumns = sortColumnString.split(",");
     Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
 
-    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
-        records, true, true, true, 
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
-
     HoodieWriteConfig config = HoodieWriteConfig
         .newBuilder()
         .withPath("/")
         
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
         .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
         .build();
+
+    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns, config),
+        records, true, true, true, 
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
+
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
         records, true, true, true, 
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index d5a99dfc144..fef6e64f143 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -219,7 +219,7 @@ public class TestDataSourceUtils {
             .newBuilder()
             .withPath("/")
             
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
-            .withUserDefinedBulkInsertPartitionerSortColumns("column1, 
column2")
+            .withUserDefinedBulkInsertPartitionerSortColumns("column1,column2")
             .withSchema(avroSchemaString)
             .build();
 

Reply via email to