This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a7e0f4c0f9f [HUDI-5321] Prepend partition path for custom partitioner
(#8831)
a7e0f4c0f9f is described below
commit a7e0f4c0f9f7e5e4b41205e1e764d406ce578607
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon May 29 14:08:40 2023 +0800
[HUDI-5321] Prepend partition path for custom partitioner (#8831)
Co-authored-by: Jon Vexler <[email protected]>
---
.../apache/hudi/table/BulkInsertPartitioner.java | 30 ++++++++++-
.../hudi/table/TestBulkInsertPartitioner.java | 59 ++++++++++++++++++++++
.../run/strategy/JavaExecutionStrategy.java | 3 +-
.../JavaCustomColumnsSortPartitioner.java | 9 ++--
.../TestJavaBulkInsertInternalPartitioner.java | 7 ++-
.../MultipleSparkJobExecutionStrategy.java | 5 +-
.../RDDCustomColumnsSortPartitioner.java | 10 ++--
.../RowCustomColumnsSortPartitioner.java | 14 ++---
.../SpatialCurveSortPartitionerBase.java | 7 ++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 2 +-
.../TestBulkInsertInternalPartitioner.java | 19 +++----
.../TestBulkInsertInternalPartitionerForRows.java | 7 +--
.../java/org/apache/hudi/TestDataSourceUtils.java | 2 +-
13 files changed, 139 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 4239e47234e..6f1efeebf17 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -19,11 +19,20 @@
package org.apache.hudi.table;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* Partitions the input records for bulk insert operation.
@@ -47,7 +56,7 @@ public interface BulkInsertPartitioner<I> extends
Serializable {
I repartitionRecords(I records, int outputPartitions);
/**
- * @return {@code true} if the records within a partition are sorted; {@code
false} otherwise.
+ * @return {@code true} if the records are sorted by partition-path; {@code
false} otherwise.
*/
boolean arePartitionRecordsSorted();
@@ -72,4 +81,23 @@ public interface BulkInsertPartitioner<I> extends
Serializable {
return Option.empty();
}
+ /*
+ * If possible, we want to sort the data by partition path. Doing so will
reduce the number of files written.
+ * This will not change the desired sort order, it is just a performance
improvement.
+ **/
+ static String[] tryPrependPartitionPathColumns(String[] columnNames,
HoodieWriteConfig config) {
+ String partitionPath;
+ if (config.populateMetaFields()) {
+ partitionPath =
HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName();
+ } else {
+ partitionPath =
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+ }
+ if (isNullOrEmpty(partitionPath)) {
+ return columnNames;
+ }
+ Set<String> sortCols = new
LinkedHashSet<>(StringUtils.split(partitionPath, ","));
+ sortCols.addAll(Arrays.asList(columnNames));
+ return sortCols.toArray(new String[0]);
+ }
+
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
new file mode 100644
index 00000000000..376a944d873
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+public class TestBulkInsertPartitioner {
+
+ private static Stream<Arguments> argsForTryPrependPartitionColumns() {
+ return Stream.of(
+ Arguments.of(Arrays.asList("_hoodie_partition_path", "col1",
"col2").toArray(), Arrays.asList("col1", "col2").toArray(), true, "pt"),
+ Arguments.of(Arrays.asList("_hoodie_partition_path", "col1",
"col2").toArray(), Arrays.asList("col1", "_hoodie_partition_path",
"col2").toArray(), true, "pt"),
+ Arguments.of(Arrays.asList("col1", "col2").toArray(),
Arrays.asList("col1", "col2").toArray(), false, ""),
+ Arguments.of(Arrays.asList("pt1", "col1", "col2").toArray(),
Arrays.asList("col1", "col2").toArray(), false, "pt1"),
+ Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(),
Arrays.asList("col1", "col2").toArray(), false, "pt1,pt2"),
+ Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(),
Arrays.asList("col1", "pt1", "col2").toArray(), false, "pt1,pt2")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("argsForTryPrependPartitionColumns")
+ public void testTryPrependPartitionColumns(String[] expectedSortColumns,
String[] sortColumns, boolean populateMetaField, String partitionColumnName) {
+ Properties props = new Properties();
+ props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
partitionColumnName);
+ props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(populateMetaField));
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/").withProperties(props).build();
+ assertArrayEquals(expectedSortColumns,
BulkInsertPartitioner.tryPrependPartitionPathColumns(sortColumns, writeConfig));
+ }
+
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index a063ad4654b..dcd88b083fc 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -122,8 +122,7 @@ public abstract class JavaExecutionStrategy<T>
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
- HoodieAvroUtils.addMetadataFields(schema),
- getWriteConfig().isConsistentLogicalTimestampEnabled());
+ HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
} else {
return
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index 642cad4e21e..ea0f5247250 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -22,6 +22,7 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -30,6 +31,8 @@ import org.apache.avro.Schema;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
/**
* A partitioner that does sorting based on specified column values for Java
client.
*
@@ -42,10 +45,10 @@ public class JavaCustomColumnsSortPartitioner<T>
private final Schema schema;
private final boolean consistentLogicalTimestampEnabled;
- public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
boolean consistentLogicalTimestampEnabled) {
- this.sortColumnNames = columnNames;
+ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
HoodieWriteConfig config) {
+ this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
this.schema = schema;
- this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+ this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
index b85565e5027..13600a96f13 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -25,6 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.FlatLists;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
@@ -65,8 +67,11 @@ public class TestJavaBulkInsertInternalPartitioner extends
HoodieJavaClientTestH
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA,
sortColumns);
List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
+ HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath("basePath").build();
+ cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME,
"partition_path");
+
cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED,
"false");
testBulkInsertInternalPartitioner(
- new JavaCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+ new JavaCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, cfg),
records, true, generatePartitionNumRecords(records),
Option.of(columnComparator));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 9210cd5eac6..540da42fd78 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -202,9 +202,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
HoodieAvroUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
- ? new RowCustomColumnsSortPartitioner(orderByColumns)
- : new RDDCustomColumnsSortPartitioner(orderByColumns,
HoodieAvroUtils.addMetadataFields(schema),
- getWriteConfig().isConsistentLogicalTimestampEnabled());
+ ? new RowCustomColumnsSortPartitioner(orderByColumns,
getWriteConfig())
+ : new RDDCustomColumnsSortPartitioner(orderByColumns,
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout
optimization strategy '%s' is not supported", layoutOptStrategy));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 3d8c21a75d2..7c0ffac28d3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -29,6 +29,8 @@ import org.apache.spark.api.java.JavaRDD;
import java.util.Arrays;
+import static
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
/**
* A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on
partition path column and custom columns.
*
@@ -44,14 +46,14 @@ public class RDDCustomColumnsSortPartitioner<T>
public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.serializableSchema = new SerializableSchema(new
Schema.Parser().parse(config.getSchema()));
- this.sortColumnNames = getSortColumnName(config);
+ this.sortColumnNames =
tryPrependPartitionPathColumns(getSortColumnName(config), config);
this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
}
- public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
boolean consistentLogicalTimestampEnabled) {
- this.sortColumnNames = columnNames;
+ public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema,
HoodieWriteConfig config) {
+ this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
this.serializableSchema = new SerializableSchema(schema);
- this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+ this.consistentLogicalTimestampEnabled =
config.isConsistentLogicalTimestampEnabled();
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
index a61e6fd4101..db9cc4d4fbd 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
@@ -18,15 +18,17 @@
package org.apache.hudi.execution.bulkinsert;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Arrays;
+import static
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
+
/**
* A partitioner that globally sorts a {@link Dataset<Row>} based on partition
path column and custom columns.
*
@@ -38,17 +40,17 @@ public class RowCustomColumnsSortPartitioner implements
BulkInsertPartitioner<Da
private final String[] sortColumnNames;
public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
- this.sortColumnNames = getSortColumnName(config);
+ this.sortColumnNames =
tryPrependPartitionPathColumns(getSortColumnName(config), config);
}
- public RowCustomColumnsSortPartitioner(String[] columnNames) {
- this.sortColumnNames = columnNames;
+ public RowCustomColumnsSortPartitioner(String[] columnNames,
HoodieWriteConfig config) {
+ this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
}
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int
outputSparkPartitions) {
- final String[] sortColumns = this.sortColumnNames;
- return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
sortColumns)
+ return records
+
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
.coalesce(outputSparkPartitions);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
index 96048f2782b..0f602737be1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
import org.apache.hudi.table.BulkInsertPartitioner;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -76,8 +77,12 @@ public abstract class SpatialCurveSortPartitionerBase<T>
implements BulkInsertPa
}
}
+ /**
+ * The data is sorted using a function that maps multiple columns into a
single dimension.
+ * Therefore, it is not sorted by partition.
+ */
@Override
public boolean arePartitionRecordsSorted() {
- return true;
+ return false;
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index f55a64d2bec..6890bc50786 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -929,7 +929,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
- BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[]{"rider"},
HoodieTestDataGenerator.AVRO_SCHEMA, false);
+ BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new
RDDCustomColumnsSortPartitioner(new String[]{"rider"},
HoodieTestDataGenerator.AVRO_SCHEMA, config);
List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1,
commitTime1, Option.of(partitioner)).collect();
assertNoWriteErrors(statuses);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index c740fb38afc..b59a420379e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -212,23 +212,24 @@ public class TestBulkInsertInternalPartitioner extends
HoodieClientTestBase impl
@Test
public void testCustomColumnSortPartitioner() {
String sortColumnString = "begin_lat";
+ HoodieWriteConfig config = HoodieWriteConfig
+ .newBuilder()
+ .withPath("/")
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+ .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+ .build();
String[] sortColumns = sortColumnString.split(",");
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator =
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 =
generateTripleTestRecordsForBulkInsert(jsc);
- testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+ testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, config),
records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator),
true);
- testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, false),
+ testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA, config),
records2, true, true, true,
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator),
true);
- HoodieWriteConfig config = HoodieWriteConfig
- .newBuilder()
- .withPath("/")
- .withSchema(TRIP_EXAMPLE_SCHEMA)
-
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
- .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
- .build();
+
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(config),
records1, true, true, true,
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator),
true);
testBulkInsertInternalPartitioner(new
RDDCustomColumnsSortPartitioner(config),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index 03fc7dbca8c..9283dd6883a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -131,15 +131,16 @@ public class TestBulkInsertInternalPartitionerForRows
extends HoodieClientTestHa
String[] sortColumns = sortColumnString.split(",");
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
- testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(sortColumns),
- records, true, true, true,
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
-
HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
.withPath("/")
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
+
+ testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(sortColumns, config),
+ records, true, true, true,
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
+
testBulkInsertInternalPartitioner(new
RowCustomColumnsSortPartitioner(config),
records, true, true, true,
generateExpectedPartitionNumRecords(records), Option.of(comparator), true);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index d5a99dfc144..fef6e64f143 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -219,7 +219,7 @@ public class TestDataSourceUtils {
.newBuilder()
.withPath("/")
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
- .withUserDefinedBulkInsertPartitionerSortColumns("column1,
column2")
+ .withUserDefinedBulkInsertPartitionerSortColumns("column1,column2")
.withSchema(avroSchemaString)
.build();