the-other-tim-brown commented on code in PR #13976:
URL: https://github.com/apache/hudi/pull/13976#discussion_r2389351347


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java:
##########
@@ -87,12 +95,22 @@ public void commitToMetadataTable(HoodieTable table,
   private HoodieData<WriteStatus> 
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
                                                              
HoodieTableMetadataWriter metadataWriter,
                                                              HoodieTable table,
-                                                             String 
instantTime) {
-    HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+                                                             String 
instantTime,
+                                                             boolean 
enforceCoalesceWithRepartition) {
     HoodieData<WriteStatus> mdtWriteStatuses = 
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, 
instantTime);
-    allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
-    allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
-    return allWriteStatus;
+    mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
+    HoodieData<WriteStatus> coalescedDataWriteStatuses;
+    if (enforceCoalesceWithRepartition) {
+      // with bulk insert and NONE sort mode, simple coalesce on datatable 
write statuses also impact record key generation stages.
+      // and hence we are adding a partitioner to cut the chain so that 
coalesce(1) here does not impact record key generation stages.
+      coalescedDataWriteStatuses = 
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+          .mapToPair((PairFunction<WriteStatus, Boolean, WriteStatus>) 
writeStatus -> new Tuple2(true, writeStatus))
+          .partitionBy(new CoalescingPartitioner())
+          .map((Function<Tuple2<Boolean, WriteStatus>, WriteStatus>) 
booleanWriteStatusTuple2 -> booleanWriteStatusTuple2._2));
+    } else {
+      coalescedDataWriteStatuses = dataTableWriteStatuses.coalesce(1);

Review Comment:
   Sounds good as a default, can we make it configurable in case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to