This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d1aaf2d103 [HUDI-7684] Sort the records for Flink metadata table 
bulk_insert (#11116)
9d1aaf2d103 is described below

commit 9d1aaf2d103ba256e0ee1f8737cfe9024aae0782
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 30 08:23:34 2024 +0800

    [HUDI-7684] Sort the records for Flink metadata table bulk_insert (#11116)
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 ++
 .../FlinkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 29 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestSQL.java   | 12 +++++++++
 4 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index a52e547195f..409dfad159d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -57,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -254,6 +255,7 @@ public class HoodieFlinkWriteClient<T> extends
     Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = 
preppedRecords.stream().parallel()
         .collect(Collectors.groupingBy(r -> 
r.getCurrentLocation().getFileId()));
     return preppedRecordsByFileId.values().stream().parallel().map(records -> {
+      records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
       HoodieWriteMetadata<List<WriteStatus>> result;
       records.get(0).getCurrentLocation().setInstantTime("I");
       try (AutoCloseableWriteHandle closeableHandle = new 
AutoCloseableWriteHandle(records, instantTime, table, true)) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 1459bd1f2e7..fae099e5d3d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -155,7 +155,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     preWrite(instantTime);
 
     List<WriteStatus> statuses = isInitializing
-        ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, 
Option.empty())
+        ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, 
bulkInsertPartitioner)
         : writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
     // flink does not support auto-commit yet, also the auto commit logic is 
not complete as BaseHoodieWriteClient now.
     writeClient.commit(instantTime, statuses, Option.empty(), 
HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index bf050fe399d..5911cf5a9a7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
 import org.apache.hudi.table.catalog.HoodieHiveCatalog;
 import org.apache.hudi.util.StreamerUtil;
@@ -73,6 +74,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -1679,6 +1681,33 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, 
par1]]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testEnableMetadataTableOnExistingTable(HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.METADATA_ENABLED, false)
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    // upsert 5 times so there could be multiple files under one partition
+    IntStream.range(0, 5).forEach(i -> execInsertSql(tableEnv, 
TestSQL.INSERT_T1));
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+    // defines another table with the same path but enables the metadata table
+    execInsertSql(tableEnv, 
TestSQL.insertT1WithSQLHint("/*+options('metadata.enabled'='true')*/"));
+    // check the existence of metadata table
+    
assertTrue(StreamerUtil.tableExists(HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()),
 new org.apache.hadoop.conf.Configuration()),
+        "Metadata table should exist");
+    // validate the data set with table metadata
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testBucketPruning(HoodieTableType tableType) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 531847f3c87..70455d94466 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -76,4 +76,16 @@ public class TestSQL {
       + "('id6','Emma',20,DATE '1970-01-01'),\n"
       + "('id7','Bob',44,DATE '1970-01-01'),\n"
       + "('id8','Han',56,DATE '1970-01-01')";
+
+  public static String insertT1WithSQLHint(String hint) {
+    return "insert into t1" + hint + " values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+        + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+        + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+        + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+        + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+        + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+        + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+        + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+  }
 }

Reply via email to