This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c39e342b154cd88d1905122d30b8887f8028d204
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Jan 10 12:31:25 2022 -0800

    [HUDI-2735] Allow empty commits in Kafka Connect Sink for Hudi (#4544)
---
 .../transaction/ConnectTransactionCoordinator.java |  4 +--
 .../connect/TestConnectTransactionCoordinator.java | 42 ++++++++++++++++------
 2 files changed, 32 insertions(+), 14 deletions(-)

diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index 14fd880..1157b21 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -294,7 +294,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
         long totalRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
         boolean hasErrors = totalErrorRecords > 0;
 
-        if ((!hasErrors || configs.allowCommitOnErrors()) && 
!allWriteStatuses.isEmpty()) {
+        if (!hasErrors || configs.allowCommitOnErrors()) {
           boolean success = transactionServices.endCommit(currentCommitTime,
               allWriteStatuses,
               transformKafkaOffsets(currentConsumedKafkaOffsets));
@@ -319,8 +319,6 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
               ws.getErrors().forEach((key, value) -> LOG.trace("Error for 
key:" + key + " is " + value));
             }
           });
-        } else {
-          LOG.warn("Empty write statuses were received from all Participants");
         }
 
         // Submit the next start commit, that will rollback the current commit.
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
index f003fe9..d939351 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -178,27 +178,39 @@ public class TestConnectTransactionCoordinator {
           List<ControlMessage> controlEvents = new ArrayList<>();
           switch (testScenario) {
             case ALL_CONNECT_TASKS_SUCCESS:
-              composeControlEvent(message.getCommitTime(), false, 
kafkaOffsets, controlEvents);
+              composeControlEvent(
+                  message.getCommitTime(), false, false, kafkaOffsets, 
controlEvents);
+              numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
+              // This commit round should succeed, and the kafka offsets 
getting committed
+              kafkaOffsetsCommitted.putAll(kafkaOffsets);
+              expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
+              break;
+            case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS:
+              composeControlEvent(
+                  message.getCommitTime(), false, true, kafkaOffsets, 
controlEvents);
               numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
               // This commit round should succeed, and the kafka offsets 
getting committed
               kafkaOffsetsCommitted.putAll(kafkaOffsets);
               expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
               break;
             case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
-              composeControlEvent(message.getCommitTime(), true, kafkaOffsets, 
controlEvents);
+              composeControlEvent(
+                  message.getCommitTime(), true, false, kafkaOffsets, 
controlEvents);
               numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
               // Despite error records, this commit round should succeed, and 
the kafka offsets getting committed
               kafkaOffsetsCommitted.putAll(kafkaOffsets);
               expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
               break;
             case SUBSET_WRITE_STATUS_FAILED:
-              composeControlEvent(message.getCommitTime(), true, kafkaOffsets, 
controlEvents);
+              composeControlEvent(
+                  message.getCommitTime(), true, false, kafkaOffsets, 
controlEvents);
               numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
               // This commit round should fail, and a new commit round should 
start without kafka offsets getting committed
               expectedMsgType = ControlMessage.EventType.START_COMMIT;
               break;
             case SUBSET_CONNECT_TASKS_FAILED:
-              composeControlEvent(message.getCommitTime(), false, 
kafkaOffsets, controlEvents);
+              composeControlEvent(
+                  message.getCommitTime(), false, false, kafkaOffsets, 
controlEvents);
               numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
               // This commit round should fail, and a new commit round should 
start without kafka offsets getting committed
               expectedMsgType = ControlMessage.EventType.START_COMMIT;
@@ -235,10 +247,13 @@ public class TestConnectTransactionCoordinator {
       SUBSET_CONNECT_TASKS_FAILED,
       SUBSET_WRITE_STATUS_FAILED,
       SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
-      ALL_CONNECT_TASKS_SUCCESS
+      ALL_CONNECT_TASKS_SUCCESS,
+      ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS
     }
 
-    private static void composeControlEvent(String commitTime, boolean 
shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, 
List<ControlMessage> controlEvents) {
+    private static void composeControlEvent(
+        String commitTime, boolean shouldIncludeFailedRecords, boolean 
useEmptyWriteStatus,
+        Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) {
       // Prepare the WriteStatuses for all partitions
       for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
         try {
@@ -248,7 +263,8 @@ public class TestConnectTransactionCoordinator {
               commitTime,
               new TopicPartition(TOPIC_NAME, i),
               kafkaOffset,
-              shouldIncludeFailedRecords);
+              shouldIncludeFailedRecords,
+              useEmptyWriteStatus);
           controlEvents.add(event);
         } catch (Exception exception) {
           throw new HoodieException("Fatal error sending control event to 
Coordinator");
@@ -259,9 +275,13 @@ public class TestConnectTransactionCoordinator {
     private static ControlMessage composeWriteStatusResponse(String commitTime,
                                                              TopicPartition 
partition,
                                                              long kafkaOffset,
-                                                             boolean 
includeFailedRecords) throws Exception {
-      // send WS
-      WriteStatus writeStatus = includeFailedRecords ? 
getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus();
+                                                             boolean 
includeFailedRecords,
+                                                             boolean 
useEmptyWriteStatus) throws Exception {
+      List<WriteStatus> writeStatusList = useEmptyWriteStatus ? 
Collections.emptyList()
+          : Collections.singletonList(
+          includeFailedRecords
+              ? getSubsetFailedRecordsWriteStatus()
+              : getAllSuccessfulRecordsWriteStatus());
 
       return ControlMessage.newBuilder()
           .setType(ControlMessage.EventType.WRITE_STATUS)
@@ -273,7 +293,7 @@ public class TestConnectTransactionCoordinator {
           .setCommitTime(commitTime)
           .setParticipantInfo(
               ControlMessage.ParticipantInfo.newBuilder()
-                  
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus)))
+                  
.setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList))
                   .setKafkaOffset(kafkaOffset)
                   .build()
           ).build();

Reply via email to