This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 457d345f44592aea2d67e500fdcd0414e3ceca55 Author: Vinish Reddy <[email protected]> AuthorDate: Tue May 14 12:32:12 2024 +0530 [HUDI-7617] Fix issues for bulk insert user defined partitioner in StreamSync (#11014) Co-authored-by: sivabalan <[email protected]> --- .../apache/hudi/table/BulkInsertPartitioner.java | 7 +++ .../hudi/table/TestBulkInsertPartitioner.java | 20 -------- .../JavaCustomColumnsSortPartitioner.java | 10 ++-- .../RDDCustomColumnsSortPartitioner.java | 16 +++---- .../TestBulkInsertInternalPartitioner.java | 7 ++- .../main/java/org/apache/hudi/DataSourceUtils.java | 2 +- .../apache/hudi/utilities/streamer/StreamSync.java | 3 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 55 ++++++++++++++++++++++ 8 files changed, 81 insertions(+), 39 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 6f1efeebf17..816741108e6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -100,4 +100,11 @@ public interface BulkInsertPartitioner<I> extends Serializable { return sortCols.toArray(new String[0]); } + static Object[] prependPartitionPath(String partitionPath, Object[] columnValues) { + Object[] prependColumnValues = new Object[columnValues.length + 1]; + System.arraycopy(columnValues, 0, prependColumnValues, 1, columnValues.length); + prependColumnValues[0] = partitionPath; + return prependColumnValues; + } + } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java index 376a944d873..abdf0adc345 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestBulkInsertPartitioner.java @@ -19,20 +19,11 @@ package org.apache.hudi.table; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; - -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; -import java.util.Properties; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; - public class TestBulkInsertPartitioner { private static Stream<Arguments> argsForTryPrependPartitionColumns() { @@ -45,15 +36,4 @@ public class TestBulkInsertPartitioner { Arguments.of(Arrays.asList("pt1", "pt2", "col1", "col2").toArray(), Arrays.asList("col1", "pt1", "col2").toArray(), false, "pt1,pt2") ); } - - @ParameterizedTest - @MethodSource("argsForTryPrependPartitionColumns") - public void testTryPrependPartitionColumns(String[] expectedSortColumns, String[] sortColumns, boolean populateMetaField, String partitionColumnName) { - Properties props = new Properties(); - props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionColumnName); - props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaField)); - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/").withProperties(props).build(); - assertArrayEquals(expectedSortColumns, BulkInsertPartitioner.tryPrependPartitionPathColumns(sortColumns, writeConfig)); - } - } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index ea0f5247250..ae6842c242c 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -22,8 +22,8 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.common.util.collection.FlatLists; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; @@ -31,8 +31,6 @@ import org.apache.avro.Schema; import java.util.List; import java.util.stream.Collectors; -import static org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns; - /** * A partitioner that does sorting based on specified column values for Java client. * @@ -46,7 +44,7 @@ public class JavaCustomColumnsSortPartitioner<T> private final boolean consistentLogicalTimestampEnabled; public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) { - this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config); + this.sortColumnNames = columnNames; this.schema = schema; this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled(); } @@ -56,10 +54,10 @@ public class JavaCustomColumnsSortPartitioner<T> List<HoodieRecord<T>> records, int outputPartitions) { return records.stream().sorted((o1, o2) -> { FlatLists.ComparableList<Comparable> values1 = FlatLists.ofComparableArray( - HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, sortColumnNames, schema, consistentLogicalTimestampEnabled) + BulkInsertPartitioner.prependPartitionPath(o1.getPartitionPath(), HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o1, sortColumnNames, schema, consistentLogicalTimestampEnabled)) ); FlatLists.ComparableList<Comparable> values2 = FlatLists.ofComparableArray( - HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, sortColumnNames, schema, consistentLogicalTimestampEnabled) + BulkInsertPartitioner.prependPartitionPath(o2.getPartitionPath(), HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) o2, sortColumnNames, schema, consistentLogicalTimestampEnabled)) ); return values1.compareTo(values2); }).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 7c0ffac28d3..092c78d39e7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -29,8 +29,6 @@ import org.apache.spark.api.java.JavaRDD; import java.util.Arrays; -import static org.apache.hudi.table.BulkInsertPartitioner.tryPrependPartitionPathColumns; - /** * A partitioner that globally sorts a {@link JavaRDD<HoodieRecord>} based on partition path column and custom columns. * @@ -46,12 +44,12 @@ public class RDDCustomColumnsSortPartitioner<T> public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) { this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema())); - this.sortColumnNames = tryPrependPartitionPathColumns(getSortColumnName(config), config); + this.sortColumnNames = getSortColumnName(config); this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled(); } public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) { - this.sortColumnNames = tryPrependPartitionPathColumns(columnNames, config); + this.sortColumnNames = columnNames; this.serializableSchema = new SerializableSchema(schema); this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled(); } @@ -63,11 +61,11 @@ public class RDDCustomColumnsSortPartitioner<T> final SerializableSchema schema = this.serializableSchema; final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled; return records.sortBy( - record -> { - Object[] columnValues = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled); - return FlatLists.ofComparableArray(columnValues); - }, - true, outputSparkPartitions); + record -> FlatLists.ofComparableArray( + BulkInsertPartitioner.prependPartitionPath( + record.getPartitionPath(), + record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled)) + ), true, outputSparkPartitions); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index b59a420379e..45fb48316d5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -220,7 +220,7 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); String[] sortColumns = sortColumnString.split(","); - Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); + Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, true, sortColumns); JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); @@ -236,11 +236,14 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), true); } - private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) { + private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, boolean prependPartitionPath, String[] sortColumns) { Comparator<HoodieRecord<? extends HoodieRecordPayload>> comparator = Comparator.comparing(record -> { try { GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); List<Object> keys = new ArrayList<>(); + if (prependPartitionPath) { + keys.add(record.getPartitionPath()); + } for (String col : sortColumns) { keys.add(genericRecord.get(col)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 04c7ea0d6c4..47f12218b1e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -96,7 +96,7 @@ public class DataSourceUtils { * * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass() */ - private static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config) + public static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config) throws HoodieException { String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass(); try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 3bc937836f2..20e530c2ee7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -134,6 +134,7 @@ import java.util.stream.Collectors; import scala.Tuple2; +import static org.apache.hudi.DataSourceUtils.createUserDefinedBulkInsertPartitioner; import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE; @@ -988,7 +989,7 @@ public class StreamSync implements Serializable, Closeable { writeClientWriteResult = new WriteClientWriteResult(writeClient.upsert(records, instantTime)); break; case BULK_INSERT: - writeClientWriteResult = new WriteClientWriteResult(writeClient.bulkInsert(records, instantTime)); + writeClientWriteResult = new WriteClientWriteResult(writeClient.bulkInsert(records, instantTime, createUserDefinedBulkInsertPartitioner(writeClient.getConfig()))); break; case INSERT_OVERWRITE: writeResult = writeClient.insertOverwrite(records, instantTime); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 94c51be0274..9831ec060a8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -31,6 +31,8 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -53,6 +55,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -65,11 +68,14 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metadata.HoodieMetadataFileSystemView; import org.apache.hudi.metrics.Metrics; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.utilities.DummySchemaProvider; @@ -100,6 +106,7 @@ import org.apache.hudi.utilities.transform.Transformer; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -2886,6 +2893,54 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } + @Test + public void testBulkInsertWithUserDefinedPartitioner() throws Exception { + String tableBasePath = basePath + "/test_table_bulk_insert"; + String sortColumn = "weight"; + TypedProperties bulkInsertProps = + new DFSPropertiesConfiguration(fs.getConf(), new StoragePath(basePath + "/" + PROPS_FILENAME_TEST_SOURCE)).getProps(); + bulkInsertProps.setProperty("hoodie.bulkinsert.shuffle.parallelism", "1"); + bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.class", "org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner"); + bulkInsertProps.setProperty("hoodie.bulkinsert.user.defined.partitioner.sort.columns", sortColumn); + String bulkInsertPropsFileName = "bulk_insert_override.properties"; + UtilitiesTestBase.Helpers.savePropsToDFS(bulkInsertProps, storage, basePath + "/" + bulkInsertPropsFileName); + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, + Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()), bulkInsertPropsFileName, false); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(HoodieTestUtils.getDefaultStorageConf()).build(); + List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getStorageConf()), metaClient.getBasePath(), false); + StorageConfiguration hadoopConf = metaClient.getStorageConf(); + HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); + HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + HoodieMetadataConfig.newBuilder().enable(false).build()); + List<String> baseFiles = partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(HoodieBaseFile::getPath)).collect(Collectors.toList()); + // Verify each partition has one base file because parallelism is 1. + assertEquals(baseFiles.size(), partitions.size()); + // Verify if each parquet file is actually sorted by sortColumn. + for (String filePath : baseFiles) { + try (HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(HoodieTestUtils.getDefaultStorageConf(), new StoragePath(filePath))) { + ClosableIterator<HoodieRecord<IndexedRecord>> iterator = parquetReader.getRecordIterator(); + List<Float> sortColumnValues = new ArrayList<>(); + while (iterator.hasNext()) { + IndexedRecord indexedRecord = iterator.next().getData(); + List<Schema.Field> fields = indexedRecord.getSchema().getFields(); + for (int i = 0; i < fields.size(); i++) { + if (fields.get(i).name().equals(sortColumn)) { + sortColumnValues.add((Float) indexedRecord.get(i)); + } + } + } + // Assert whether records read are same as the sorted records. + List<Float> actualSortColumnValues = new ArrayList<>(sortColumnValues); + Collections.sort(sortColumnValues); + assertEquals(sortColumnValues, actualSortColumnValues); + } + } + } + private Set<String> getAllFileIDsInTable(String tableBasePath, Option<String> partition) { HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath); final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
