This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d1aaf2d103 [HUDI-7684] Sort the records for Flink metadata table
bulk_insert (#11116)
9d1aaf2d103 is described below
commit 9d1aaf2d103ba256e0ee1f8737cfe9024aae0782
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 30 08:23:34 2024 +0800
[HUDI-7684] Sort the records for Flink metadata table bulk_insert (#11116)
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 ++
.../FlinkHoodieBackedTableMetadataWriter.java | 2 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 29 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestSQL.java | 12 +++++++++
4 files changed, 44 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index a52e547195f..409dfad159d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -254,6 +255,7 @@ public class HoodieFlinkWriteClient<T> extends
Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId =
preppedRecords.stream().parallel()
.collect(Collectors.groupingBy(r ->
r.getCurrentLocation().getFileId()));
return preppedRecordsByFileId.values().stream().parallel().map(records -> {
+ records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
HoodieWriteMetadata<List<WriteStatus>> result;
records.get(0).getCurrentLocation().setInstantTime("I");
try (AutoCloseableWriteHandle closeableHandle = new
AutoCloseableWriteHandle(records, instantTime, table, true)) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 1459bd1f2e7..fae099e5d3d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -155,7 +155,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
preWrite(instantTime);
List<WriteStatus> statuses = isInitializing
- ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime,
Option.empty())
+ ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime,
bulkInsertPartitioner)
: writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
// flink does not support auto-commit yet, also the auto commit logic is
not complete as BaseHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(),
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index bf050fe399d..5911cf5a9a7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
@@ -73,6 +74,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -1679,6 +1681,33 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05,
par1]]");
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testEnableMetadataTableOnExistingTable(HoodieTableType tableType) {
+ TableEnvironment tableEnv = batchTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.METADATA_ENABLED, false)
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ // upsert 5 times so there could be multiple files under one partition
+ IntStream.range(0, 5).forEach(i -> execInsertSql(tableEnv,
TestSQL.INSERT_T1));
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+ // defines another table with the same path but enables the metadata table
+ execInsertSql(tableEnv,
TestSQL.insertT1WithSQLHint("/*+options('metadata.enabled'='true')*/"));
+ // check the existence of metadata table
+
assertTrue(StreamerUtil.tableExists(HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()),
new org.apache.hadoop.conf.Configuration()),
+ "Metadata table should exist");
+ // validate the data set with table metadata
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testBucketPruning(HoodieTableType tableType) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 531847f3c87..70455d94466 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -76,4 +76,16 @@ public class TestSQL {
+ "('id6','Emma',20,DATE '1970-01-01'),\n"
+ "('id7','Bob',44,DATE '1970-01-01'),\n"
+ "('id8','Han',56,DATE '1970-01-01')";
+
+ public static String insertT1WithSQLHint(String hint) {
+ return "insert into t1" + hint + " values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+ }
}