nsivabalan commented on code in PR #10158:
URL: https://github.com/apache/hudi/pull/10158#discussion_r1402927461


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -801,24 +765,25 @@ private HoodieWriteConfig 
prepareHoodieConfigForRowWriter(Schema writerSchema) {
    *
    * @param instantTime         instant time to use for ingest.
    * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
-   * @param inputIsEmpty             true if input batch is empty.
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean 
inputIsEmpty,
+  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
                                                                               
HoodieIngestionMetrics metrics,
                                                                               
Timer.Context overallTimerContext) {
     Option<String> scheduledCompactionInstant = Option.empty();
     // write to hudi and fetch result
-    Pair<WriteClientWriteResult, Boolean>  writeClientWriteResultIsEmptyPair = 
writeToSink(inputBatch, instantTime, inputIsEmpty);
-    JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
-    Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
-    boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
+    WriteClientWriteResult  writeClientWriteResult = writeToSink(inputBatch, 
instantTime);
+    JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
+    Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
 
     // process write status
     long totalErrorRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
     long totalRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
+    long totalSuccessfulRecords = totalRecords - totalErrorRecords;
+    LOG.info(String.format("instantTime=%s, totalRecords=%d, 
totalErrorRecords=%d, totalSuccessfulRecords=%d",

Review Comment:
   can we have an explicit log statement when there are no records ingested, 
but just for checkpoint purpose we had to trigger the commit. I guess prior to 
this patch, we will print "No new data, perform empty commit.". May be 
something similar. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to