nsivabalan commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2146295795


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+  private HoodieMetadataWriteWrapper metadataWriterWrapper = new 
HoodieMetadataWriteWrapper();
   protected SparkRDDTableServiceClient(HoodieEngineContext context,
                                        HoodieWriteConfig clientConfig,
                                        Option<EmbeddedTimelineService> 
timelineService) {
     super(context, clientConfig, timelineService);
   }
 
   @Override
-  protected List<HoodieWriteStat> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    return writeMetadata.getWriteStatuses().map(writeStatus -> 
writeStatus.getStat()).collect();
+  protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
tableServiceWriteMetadata) {
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggered at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so each 
object will be larger.
+    // So, here we are dropping all additional stats and error records to 
retain only the required information and prevent collecting large objects on 
the driver.
+    List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList = 
tableServiceWriteMetadata.getWriteStatuses()
+        .map(writeStatus -> new 
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(), 
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),

Review Comment:
   sure, good suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to