rmahindra123 commented on a change in pull request #4211:
URL: https://github.com/apache/hudi/pull/4211#discussion_r762354174



##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
##########
@@ -280,26 +280,56 @@ private void endExistingCommit() {
 
   private void onReceiveWriteStatus(ControlMessage message) {
     ControlMessage.ParticipantInfo participantInfo = 
message.getParticipantInfo();
-    int partition = message.getSenderPartition();
-    partitionsWriteStatusReceived.put(partition, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
-    currentConsumedKafkaOffsets.put(partition, 
participantInfo.getKafkaOffset());
+    int partitionId = message.getSenderPartition();
+    partitionsWriteStatusReceived.put(partitionId, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
+    currentConsumedKafkaOffsets.put(partitionId, 
participantInfo.getKafkaOffset());
     if (partitionsWriteStatusReceived.size() >= numPartitions
         && currentState.equals(State.ENDED_COMMIT)) {
       // Commit the kafka offsets to the commit file
       try {
         List<WriteStatus> allWriteStatuses = new ArrayList<>();
         partitionsWriteStatusReceived.forEach((key, value) -> 
allWriteStatuses.addAll(value));
-        // Commit the last write in Hudi, along with the latest kafka offset
-        if (!allWriteStatuses.isEmpty()) {
-          transactionServices.endCommit(currentCommitTime,
+
+        long totalErrorRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
+        long totalRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
+        boolean hasErrors = totalErrorRecords > 0;
+
+        if (!hasErrors && !allWriteStatuses.isEmpty()) {

Review comment:
       I have added a config, and by default its enabled. So users can disable 
it if they want to have a strict check for write record failures.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to