nsivabalan commented on code in PR #13976:
URL: https://github.com/apache/hudi/pull/13976#discussion_r2389339506


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java:
##########
@@ -87,12 +95,22 @@ public void commitToMetadataTable(HoodieTable table,
   private HoodieData<WriteStatus> 
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
                                                              
HoodieTableMetadataWriter metadataWriter,
                                                              HoodieTable table,
-                                                             String 
instantTime) {
-    HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+                                                             String 
instantTime,
+                                                             boolean 
enforceCoalesceWithRepartition) {
     HoodieData<WriteStatus> mdtWriteStatuses = 
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, 
instantTime);
-    allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
-    allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
-    return allWriteStatus;
+    mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
+    HoodieData<WriteStatus> coalescedDataWriteStatuses;
+    if (enforceCoalesceWithRepartition) {
+      // with bulk insert and NONE sort mode, simple coalesce on datatable 
write statuses also impact record key generation stages.
+      // and hence we are adding a partitioner to cut the chain so that 
coalesce(1) here does not impact record key generation stages.
+      coalescedDataWriteStatuses = 
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+          .mapToPair((PairFunction<WriteStatus, Boolean, WriteStatus>) 
writeStatus -> new Tuple2(true, writeStatus))
+          .partitionBy(new CoalescingPartitioner())
+          .map((Function<Tuple2<Boolean, WriteStatus>, WriteStatus>) 
booleanWriteStatusTuple2 -> booleanWriteStatusTuple2._2));
+    } else {
+      coalescedDataWriteStatuses = dataTableWriteStatuses.coalesce(1);

Review Comment:
   good point. 
   we benchmarked 100GB and ~3GB memory per executor, 2000 files touched per 
commit were good w/ single executor.
   
   may be, we can allocate 1 for every 5k tasks. 
   what do you think? 
   
   ```
   int coalesceParallelism = dataTableWriteStatuses.getNumPartitions() / 5000
   ``` 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to