nsivabalan commented on code in PR #10158:
URL: https://github.com/apache/hudi/pull/10158#discussion_r1402927461
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -801,24 +765,25 @@ private HoodieWriteConfig
prepareHoodieConfigForRowWriter(Schema writerSchema) {
*
* @param instantTime instant time to use for ingest.
* @param inputBatch input batch that contains the records,
checkpoint, and schema provider
- * @param inputIsEmpty true if input batch is empty.
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
- private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean
inputIsEmpty,
+ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
- Pair<WriteClientWriteResult, Boolean> writeClientWriteResultIsEmptyPair =
writeToSink(inputBatch, instantTime, inputIsEmpty);
- JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
- Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
- boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
+ WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime);
+ JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
+ Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
// process write status
long totalErrorRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
+ long totalSuccessfulRecords = totalRecords - totalErrorRecords;
+ LOG.info(String.format("instantTime=%s, totalRecords=%d,
totalErrorRecords=%d, totalSuccessfulRecords=%d",
Review Comment:
can we have an explicit log statement when there are no records ingested,
but just for checkpoint purpose we had to trigger the commit. I guess prior to
this patch, we will print "No new data, perform empty commit.". May be
something similar.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]