This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 457d345f44592aea2d67e500fdcd0414e3ceca55
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue May 14 12:32:12 2024 +0530

    [HUDI-7617] Fix issues for bulk insert user defined partitioner in 
StreamSync (#11014)
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../apache/hudi/table/BulkInsertPartitioner.java   |  7 +++
 .../hudi/table/TestBulkInsertPartitioner.java      | 20 --------
 .../JavaCustomColumnsSortPartitioner.java          | 10 ++--
 .../RDDCustomColumnsSortPartitioner.java           | 16 +++----
 .../TestBulkInsertInternalPartitioner.java         |  7 ++-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  2 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |  3 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 55 ++++++++++++++++++++++
 8 files changed, 81 insertions(+), 39 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 6f1efeebf17..816741108e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -100,4 +100,11 @@ public interface BulkInsertPartitioner<I> extends 
Serializable {
     return sortCols.toArray(new String[0]);
   }
 
+  static Object[] prependPartitionPath(String partitionPath, Object[] 
columnValues) {
+    Object[] prependColumnValues = new Object[columnValues.length + 1];
+    System.arraycopy(columnValues, 0, prependColumnValues, 1, 
columnValues.length);
+    prependColumnValues[0] = partitionPath;
+    return prependColumnValues;
+  }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
index 376a944d873..abdf0adc345 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java
@@ -19,20 +19,11 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
-import java.util.Properties;
 import java.util.stream.Stream;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
 public class TestBulkInsertPartitioner {
 
   private static Stream<Arguments> argsForTryPrependPartitionColumns() {
@@ -45,15 +36,4 @@ public class TestBulkInsertPartitioner {
         Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(), 
Arrays.asList("col1", "pt1", "col2").toArray(), false, "pt1,pt2")
     );
   }
-
-  @ParameterizedTest
-  @MethodSource("argsForTryPrependPartitionColumns")
-  public void testTryPrependPartitionColumns(String[] expectedSortColumns, 
String[] sortColumns, boolean populateMetaField, String partitionColumnName) {
-    Properties props = new Properties();
-    props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
partitionColumnName);
-    props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
String.valueOf(populateMetaField));
-    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/").withProperties(props).build();
-    assertArrayEquals(expectedSortColumns, 
BulkInsertPartitioner.tryPrependPartitionPathColumns(sortColumns, writeConfig));
-  }
-
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index ea0f5247250..ae6842c242c 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -22,8 +22,8 @@ package org.apache.hudi.execution.bulkinsert;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.common.util.collection.FlatLists;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.avro.Schema;
@@ -31,8 +31,6 @@ import org.apache.avro.Schema;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
-
 /**
  * A partitioner that does sorting based on specified column values for Java 
client.
  *
@@ -46,7 +44,7 @@ public class JavaCustomColumnsSortPartitioner<T>
   private final boolean consistentLogicalTimestampEnabled;
 
   public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
-    this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
+    this.sortColumnNames = columnNames;
     this.schema = schema;
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
@@ -56,10 +54,10 @@ public class JavaCustomColumnsSortPartitioner<T>
       List<HoodieRecord<T>> records, int outputPartitions) {
     return records.stream().sorted((o1, o2) -> {
       FlatLists.ComparableList<Comparable> values1 = 
FlatLists.ofComparableArray(
-          HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, 
sortColumnNames, schema, consistentLogicalTimestampEnabled)
+          BulkInsertPartitioner.prependPartitionPath(o1.getPartitionPath(), 
HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, sortColumnNames, 
schema, consistentLogicalTimestampEnabled))
       );
       FlatLists.ComparableList<Comparable> values2 = 
FlatLists.ofComparableArray(
-          HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, 
sortColumnNames, schema, consistentLogicalTimestampEnabled)
+          BulkInsertPartitioner.prependPartitionPath(o2.getPartitionPath(), 
HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, sortColumnNames, 
schema, consistentLogicalTimestampEnabled))
       );
       return values1.compareTo(values2);
     }).collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 7c0ffac28d3..092c78d39e7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -29,8 +29,6 @@ import org.apache.spark.api.java.JavaRDD;
 
 import java.util.Arrays;
 
-import static 
org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns;
-
 /**
  * A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on 
partition path column and custom columns.
  *
@@ -46,12 +44,12 @@ public class RDDCustomColumnsSortPartitioner<T>
 
   public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
     this.serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
-    this.sortColumnNames = 
tryPrependPartitionPathColumns(getSortColumnName(config), config);
+    this.sortColumnNames = getSortColumnName(config);
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
 
   public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
-    this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config);
+    this.sortColumnNames = columnNames;
     this.serializableSchema = new SerializableSchema(schema);
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
   }
@@ -63,11 +61,11 @@ public class RDDCustomColumnsSortPartitioner<T>
     final SerializableSchema schema = this.serializableSchema;
     final boolean consistentLogicalTimestampEnabled = 
this.consistentLogicalTimestampEnabled;
     return records.sortBy(
-        record -> {
-          Object[] columnValues = record.getColumnValues(schema.get(), 
sortColumns, consistentLogicalTimestampEnabled);
-          return FlatLists.ofComparableArray(columnValues);
-        },
-        true, outputSparkPartitions);
+        record -> FlatLists.ofComparableArray(
+            BulkInsertPartitioner.prependPartitionPath(
+                record.getPartitionPath(),
+                record.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled))
+        ), true, outputSparkPartitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index b59a420379e..45fb48316d5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -220,7 +220,7 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
         .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
         .build();
     String[] sortColumns = sortColumnString.split(",");
-    Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = 
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
+    Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = 
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, true, 
sortColumns);
 
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
@@ -236,11 +236,14 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
         records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), 
true);
   }
 
-  private Comparator<HoodieRecord<? extends HoodieRecordPayload>> 
getCustomColumnComparator(Schema schema, String[] sortColumns) {
+  private Comparator<HoodieRecord<? extends HoodieRecordPayload>> 
getCustomColumnComparator(Schema schema, boolean prependPartitionPath, String[] 
sortColumns) {
     Comparator<HoodieRecord<? extends HoodieRecordPayload>> comparator = 
Comparator.comparing(record -> {
       try {
         GenericRecord genericRecord = (GenericRecord) 
record.getData().getInsertValue(schema).get();
         List<Object> keys = new ArrayList<>();
+        if (prependPartitionPath) {
+          keys.add(record.getPartitionPath());
+        }
         for (String col : sortColumns) {
           keys.add(genericRecord.get(col));
         }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 04c7ea0d6c4..47f12218b1e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -96,7 +96,7 @@ public class DataSourceUtils {
    *
    * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
    */
-  private static Option<BulkInsertPartitioner> 
createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
+  public static Option<BulkInsertPartitioner> 
createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
       throws HoodieException {
     String bulkInsertPartitionerClass = 
config.getUserDefinedBulkInsertPartitionerClass();
     try {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 3bc937836f2..20e530c2ee7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -134,6 +134,7 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.DataSourceUtils.createUserDefinedBulkInsertPartitioner;
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
@@ -988,7 +989,7 @@ public class StreamSync implements Serializable, Closeable {
           writeClientWriteResult = new 
WriteClientWriteResult(writeClient.upsert(records, instantTime));
           break;
         case BULK_INSERT:
-          writeClientWriteResult = new 
WriteClientWriteResult(writeClient.bulkInsert(records, instantTime));
+          writeClientWriteResult = new 
WriteClientWriteResult(writeClient.bulkInsert(records, instantTime, 
createUserDefinedBulkInsertPartitioner(writeClient.getConfig())));
           break;
         case INSERT_OVERWRITE:
           writeResult = writeClient.insertOverwrite(records, instantTime);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 94c51be0274..9831ec060a8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -31,6 +31,8 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -53,6 +55,7 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -65,11 +68,14 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.keygen.ComplexKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.utilities.DummySchemaProvider;
@@ -100,6 +106,7 @@ import org.apache.hudi.utilities.transform.Transformer;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -2886,6 +2893,54 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  @Test
+  public void testBulkInsertWithUserDefinedPartitioner() throws Exception {
+    String tableBasePath = basePath + "/test_table_bulk_insert";
+    String sortColumn = "weight";
+    TypedProperties bulkInsertProps =
+        new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath 
+ "/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
+    bulkInsertProps.setProperty("hoodie.bulkinsert.shuffle.parallelism", "1");
+    
bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.class", 
"org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner");
+    
bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.sort.columns",
 sortColumn);
+    String bulkInsertPropsFileName = "bulk_insert_override.properties";
+    UtilitiesTestBase.Helpers.savePropsToDFS(bulkInsertProps, storage, 
basePath + "/" + bulkInsertPropsFileName);
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
+        
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()),
 bulkInsertPropsFileName, false);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build();
+    List<String> partitions = FSUtils.getAllPartitionPaths(new 
HoodieLocalEngineContext(metaClient.getStorageConf()), 
metaClient.getBasePath(), false);
+    StorageConfiguration hadoopConf = metaClient.getStorageConf();
+    HoodieLocalEngineContext engContext = new 
HoodieLocalEngineContext(hadoopConf);
+    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(engContext, metaClient,
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+        HoodieMetadataConfig.newBuilder().enable(false).build());
+    List<String> baseFiles = partitions.parallelStream().flatMap(partition -> 
fsView.getLatestBaseFiles(partition).map(HoodieBaseFile::getPath)).collect(Collectors.toList());
+    // Verify each partition has one base file because parallelism is 1.
+    assertEquals(baseFiles.size(), partitions.size());
+    // Verify if each parquet file is actually sorted by sortColumn.
+    for (String filePath : baseFiles) {
+      try (HoodieAvroParquetReader parquetReader = new 
HoodieAvroParquetReader(HoodieTestUtils.getDefaultStorageConf(), new 
StoragePath(filePath))) {
+        ClosableIterator<HoodieRecord<IndexedRecord>> iterator = 
parquetReader.getRecordIterator();
+        List<Float> sortColumnValues = new ArrayList<>();
+        while (iterator.hasNext()) {
+          IndexedRecord indexedRecord = iterator.next().getData();
+          List<Schema.Field> fields = indexedRecord.getSchema().getFields();
+          for (int i = 0; i < fields.size(); i++) {
+            if (fields.get(i).name().equals(sortColumn)) {
+              sortColumnValues.add((Float) indexedRecord.get(i));
+            }
+          }
+        }
+        // Assert whether records read are same as the sorted records.
+        List<Float> actualSortColumnValues = new ArrayList<>(sortColumnValues);
+        Collections.sort(sortColumnValues);
+        assertEquals(sortColumnValues, actualSortColumnValues);
+      }
+    }
+  }
+
   private Set<String> getAllFileIDsInTable(String tableBasePath, 
Option<String> partition) {
     HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath);
     final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());

Reply via email to