This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c39e342b154cd88d1905122d30b8887f8028d204 Author: Y Ethan Guo <[email protected]> AuthorDate: Mon Jan 10 12:31:25 2022 -0800 [HUDI-2735] Allow empty commits in Kafka Connect Sink for Hudi (#4544) --- .../transaction/ConnectTransactionCoordinator.java | 4 +-- .../connect/TestConnectTransactionCoordinator.java | 42 ++++++++++++++++------ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java index 14fd880..1157b21 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java @@ -294,7 +294,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru long totalRecords = (long) allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum(); boolean hasErrors = totalErrorRecords > 0; - if ((!hasErrors || configs.allowCommitOnErrors()) && !allWriteStatuses.isEmpty()) { + if (!hasErrors || configs.allowCommitOnErrors()) { boolean success = transactionServices.endCommit(currentCommitTime, allWriteStatuses, transformKafkaOffsets(currentConsumedKafkaOffsets)); @@ -319,8 +319,6 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); } }); - } else { - LOG.warn("Empty write statuses were received from all Participants"); } // Submit the next start commit, that will rollback the current commit. diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java index f003fe9..d939351 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java @@ -178,27 +178,39 @@ public class TestConnectTransactionCoordinator { List<ControlMessage> controlEvents = new ArrayList<>(); switch (testScenario) { case ALL_CONNECT_TASKS_SUCCESS: - composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), false, false, kafkaOffsets, controlEvents); + numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; + // This commit round should succeed, and the kafka offsets getting committed + kafkaOffsetsCommitted.putAll(kafkaOffsets); + expectedMsgType = ControlMessage.EventType.ACK_COMMIT; + break; + case ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS: + composeControlEvent( + message.getCommitTime(), false, true, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // This commit round should succeed, and the kafka offsets getting committed kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED: - composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), true, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // Despite error records, this commit round should succeed, and the kafka offsets getting committed kafkaOffsetsCommitted.putAll(kafkaOffsets); expectedMsgType = ControlMessage.EventType.ACK_COMMIT; break; case SUBSET_WRITE_STATUS_FAILED: - composeControlEvent(message.getCommitTime(), true, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), true, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS; // This commit round should fail, and a new commit round should start without kafka offsets getting committed expectedMsgType = ControlMessage.EventType.START_COMMIT; break; case SUBSET_CONNECT_TASKS_FAILED: - composeControlEvent(message.getCommitTime(), false, kafkaOffsets, controlEvents); + composeControlEvent( + message.getCommitTime(), false, false, kafkaOffsets, controlEvents); numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2; // This commit round should fail, and a new commit round should start without kafka offsets getting committed expectedMsgType = ControlMessage.EventType.START_COMMIT; @@ -235,10 +247,13 @@ public class TestConnectTransactionCoordinator { SUBSET_CONNECT_TASKS_FAILED, SUBSET_WRITE_STATUS_FAILED, SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED, - ALL_CONNECT_TASKS_SUCCESS + ALL_CONNECT_TASKS_SUCCESS, + ALL_CONNECT_TASKS_WITH_EMPTY_WRITE_STATUS } - private static void composeControlEvent(String commitTime, boolean shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) { + private static void composeControlEvent( + String commitTime, boolean shouldIncludeFailedRecords, boolean useEmptyWriteStatus, + Map<Integer, Long> kafkaOffsets, List<ControlMessage> controlEvents) { // Prepare the WriteStatuses for all partitions for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) { try { @@ -248,7 +263,8 @@ public class TestConnectTransactionCoordinator { commitTime, new TopicPartition(TOPIC_NAME, i), kafkaOffset, - shouldIncludeFailedRecords); + shouldIncludeFailedRecords, + useEmptyWriteStatus); controlEvents.add(event); } catch (Exception exception) { throw new HoodieException("Fatal error sending control event to Coordinator"); @@ -259,9 +275,13 @@ public class TestConnectTransactionCoordinator { private static ControlMessage composeWriteStatusResponse(String commitTime, TopicPartition partition, long kafkaOffset, - boolean includeFailedRecords) throws Exception { - // send WS - WriteStatus writeStatus = includeFailedRecords ? getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus(); + boolean includeFailedRecords, + boolean useEmptyWriteStatus) throws Exception { + List<WriteStatus> writeStatusList = useEmptyWriteStatus ? Collections.emptyList() + : Collections.singletonList( + includeFailedRecords + ? getSubsetFailedRecordsWriteStatus() + : getAllSuccessfulRecordsWriteStatus()); return ControlMessage.newBuilder() .setType(ControlMessage.EventType.WRITE_STATUS) @@ -273,7 +293,7 @@ public class TestConnectTransactionCoordinator { .setCommitTime(commitTime) .setParticipantInfo( ControlMessage.ParticipantInfo.newBuilder() - .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(Collections.singletonList(writeStatus))) + .setWriteStatus(KafkaConnectUtils.buildWriteStatuses(writeStatusList)) .setKafkaOffset(kafkaOffset) .build() ).build();
