This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new de94787 [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523) de94787 is described below commit de94787a85b272f79181dff73907b0f20855ee78 Author: zhangyue19921010 <69956021+zhangyue19921...@users.noreply.github.com> AuthorDate: Tue Aug 24 21:45:17 2021 +0800 [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523) Co-authored-by: yuezhang <yuezh...@freewheel.tv> --- .../org/apache/hudi/config/HoodieWriteConfig.java | 18 +++++++- .../RDDCustomColumnsSortPartitioner.java | 10 ++++ .../TestBulkInsertInternalPartitioner.java | 18 +++++++- .../main/java/org/apache/hudi/DataSourceUtils.java | 4 +- .../java/org/apache/hudi/TestDataSourceUtils.java | 54 +++++++++++++++------- 5 files changed, 83 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d16d417..04660fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -158,11 +158,18 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done" + "before writing records to the table."); + public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty + .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns") + .noDefaultValue() + .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. " + + "For example 'column1,column2'"); + public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty .key("hoodie.bulkinsert.user.defined.partitioner.class") .noDefaultValue() .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data" - + " optimally for common query patterns."); + + " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner" + + " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key()); public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.upsert.shuffle.parallelism") @@ -894,6 +901,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME); } + public String getUserDefinedBulkInsertPartitionerSortColumns() { + return getString(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS); + } + public int getInsertShuffleParallelism() { return getInt(INSERT_PARALLELISM_VALUE); } @@ -1832,6 +1843,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withUserDefinedBulkInsertPartitionerSortColumns(String columns) { + writeConfig.setValue(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS, columns); + return this; + } + public Builder withDeleteParallelism(int parallelism) { writeConfig.setValue(DELETE_PARALLELISM_VALUE, String.valueOf(parallelism)); return this; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 209531d..fb3c5ec 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; @@ -41,6 +42,11 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload> private final String[] sortColumnNames; private final SerializableSchema serializableSchema; + public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) { + this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema())); + this.sortColumnNames = getSortColumnName(config); + } + public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { this.sortColumnNames = columnNames; this.serializableSchema = new SerializableSchema(schema); @@ -79,4 +85,8 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload> throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); } } + + private String[] getSortColumnName(HoodieWriteConfig config) { + return config.getUserDefinedBulkInsertPartitionerSortColumns().split(","); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 81effaa..f4c5622 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -43,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { @@ -139,7 +141,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { @Test public void testCustomColumnSortPartitioner() throws Exception { - String[] sortColumns = new String[] {"rider"}; + String sortColumnString = "rider"; + String[] sortColumns = sortColumnString.split(","); Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); @@ -148,6 +151,19 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + + HoodieWriteConfig config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()) + .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) + .build(); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), + records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), + records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + } private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 7e043eb..0dafba4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -96,7 +96,7 @@ public class DataSourceUtils { try { return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) ? Option.empty() : - Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config)); } catch (Throwable e) { throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e); } @@ -115,7 +115,7 @@ public class DataSourceUtils { try { return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) ? Option.empty() : - Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config)); } catch (Throwable e) { throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 94f1a69..081a8e4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Conversions; @@ -74,6 +75,24 @@ public class TestDataSourceUtils { private ArgumentCaptor<Option> optionCaptor; private HoodieWriteConfig config; + // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are + // of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union + // date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to + // the event_date. + private String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]}," + + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}}," + + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]}," + + "{\"name\": \"event_name\", \"type\": \"string\"}," + + "{\"name\": \"event_organizer\", \"type\": \"string\"}," + + "{\"name\": \"event_cost1\", \"type\": " + + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]}," + + "{\"name\": \"event_cost2\", \"type\": " + + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}}," + + "{\"name\": \"event_cost3\", \"type\": " + + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}" + + "]}"; + @BeforeEach public void setUp() { config = HoodieWriteConfig.newBuilder().withPath("/").build(); @@ -81,23 +100,6 @@ public class TestDataSourceUtils { @Test public void testAvroRecordsFieldConversion() { - // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are - // of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union - // date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to - // the event_date. - String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ " - + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]}," - + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}}," - + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]}," - + "{\"name\": \"event_name\", \"type\": \"string\"}," - + "{\"name\": \"event_organizer\", \"type\": \"string\"}," - + "{\"name\": \"event_cost1\", \"type\": " - + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]}," - + "{\"name\": \"event_cost2\", \"type\": " - + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}}," - + "{\"name\": \"event_cost3\", \"type\": " - + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}" - + "]}"; Schema avroSchema = new Schema.Parser().parse(avroSchemaString); GenericRecord record = new GenericData.Record(avroSchema); @@ -183,6 +185,20 @@ public class TestDataSourceUtils { assertThat(partitioner.isPresent(), is(true)); } + @Test + public void testCreateRDDCustomColumnsSortPartitionerWithValidPartitioner() throws HoodieException { + config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()) + .withUserDefinedBulkInsertPartitionerSortColumns("column1, column2") + .withSchema(avroSchemaString) + .build(); + + Option<BulkInsertPartitioner<Dataset<Row>>> partitioner = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config); + assertThat(partitioner.isPresent(), is(true)); + } + private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) { config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath()) .withUserDefinedBulkInsertPartitionerClass(partitionerClassName) @@ -195,6 +211,8 @@ public class TestDataSourceUtils { public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { + public NoOpBulkInsertPartitioner(HoodieWriteConfig config) {} + @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { return records; @@ -209,6 +227,8 @@ public class TestDataSourceUtils { public static class NoOpBulkInsertPartitionerRows implements BulkInsertPartitioner<Dataset<Row>> { + public NoOpBulkInsertPartitionerRows(HoodieWriteConfig config) {} + @Override public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) { return records;