This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 971d0b89247c353742b745a11fa8cbc6f2ff3f7c Author: Danny Chan <[email protected]> AuthorDate: Tue Apr 30 08:23:34 2024 +0800 [HUDI-7684] Sort the records for Flink metadata table bulk_insert (#11116) --- .../apache/hudi/client/HoodieFlinkWriteClient.java | 2 ++ .../FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../apache/hudi/table/ITTestHoodieDataSource.java | 29 ++++++++++++++++++++++ .../test/java/org/apache/hudi/utils/TestSQL.java | 12 +++++++++ 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ed1a3408f67..30dc4b842be 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -57,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -254,6 +255,7 @@ public class HoodieFlinkWriteClient<T> extends Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = preppedRecords.stream().parallel() .collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId())); return preppedRecordsByFileId.values().stream().parallel().map(records -> { + records.sort(Comparator.comparing(HoodieRecord::getRecordKey)); HoodieWriteMetadata<List<WriteStatus>> result; records.get(0).getCurrentLocation().setInstantTime("I"); try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index bafee7295c3..10de70bfb5a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -151,7 +151,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad preWrite(instantTime); List<WriteStatus> statuses = isInitializing - ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, Option.empty()) + ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, bulkInsertPartitioner) : writeClient.upsertPreppedRecords(preppedRecordList, instantTime); // flink does not support auto-commit yet, also the auto commit logic is not complete as BaseHoodieWriteClient now. writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index bc6a250eb8c..689d5a3de7b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.catalog.HoodieCatalogTestUtils; import org.apache.hudi.table.catalog.HoodieHiveCatalog; import org.apache.hudi.util.StreamerUtil; @@ -72,6 +73,7 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.hudi.utils.TestConfigurations.catalog; @@ -1677,6 +1679,33 @@ public class ITTestHoodieDataSource { assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]"); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testEnableMetadataTableOnExistingTable(HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.METADATA_ENABLED, false) + .option(FlinkOptions.TABLE_TYPE, tableType) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + // upsert 5 times so there could be multiple files under one partition + IntStream.range(0, 5).forEach(i -> execInsertSql(tableEnv, TestSQL.INSERT_T1)); + + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + + // defines another table with the same path but enables the metadata table + execInsertSql(tableEnv, TestSQL.insertT1WithSQLHint("/*+options('metadata.enabled'='true')*/")); + // check the existence of metadata table + assertTrue(StreamerUtil.tableExists(HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()), new org.apache.hadoop.conf.Configuration()), + "Metadata table should exist"); + // validate the data set with table metadata + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testBucketPruning(HoodieTableType tableType) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 531847f3c87..70455d94466 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -76,4 +76,16 @@ public class TestSQL { + "('id6','Emma',20,DATE '1970-01-01'),\n" + "('id7','Bob',44,DATE '1970-01-01'),\n" + "('id8','Han',56,DATE '1970-01-01')"; + + public static String insertT1WithSQLHint(String hint) { + return "insert into t1" + hint + " values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + } }
