vinothchandar commented on a change in pull request #4211:
URL: https://github.com/apache/hudi/pull/4211#discussion_r762349298



##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
##########
@@ -280,26 +280,56 @@ private void endExistingCommit() {
 
   private void onReceiveWriteStatus(ControlMessage message) {
     ControlMessage.ParticipantInfo participantInfo = 
message.getParticipantInfo();
-    int partition = message.getSenderPartition();
-    partitionsWriteStatusReceived.put(partition, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
-    currentConsumedKafkaOffsets.put(partition, 
participantInfo.getKafkaOffset());
+    int partitionId = message.getSenderPartition();
+    partitionsWriteStatusReceived.put(partitionId, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
+    currentConsumedKafkaOffsets.put(partitionId, 
participantInfo.getKafkaOffset());
     if (partitionsWriteStatusReceived.size() >= numPartitions
         && currentState.equals(State.ENDED_COMMIT)) {
       // Commit the kafka offsets to the commit file
       try {
         List<WriteStatus> allWriteStatuses = new ArrayList<>();
         partitionsWriteStatusReceived.forEach((key, value) -> 
allWriteStatuses.addAll(value));
-        // Commit the last write in Hudi, along with the latest kafka offset
-        if (!allWriteStatuses.isEmpty()) {
-          transactionServices.endCommit(currentCommitTime,
+
+        long totalErrorRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
+        long totalRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
+        boolean hasErrors = totalErrorRecords > 0;
+
+        if (!hasErrors && !allWriteStatuses.isEmpty()) {

Review comment:
       should we have a mode to skip and log to another topic may be if there 
are errors and move on? Could be a follow on JIRA. 

##########
File path: 
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
##########
@@ -84,6 +86,14 @@ public KafkaConnectWriterProvider(
           .withSchema(schemaProvider.getSourceSchema().toString())
           .withAutoCommit(false)
           
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+          // participants should not trigger table services, and leave it to 
the coordinator
+          .withCompactionConfig(HoodieCompactionConfig.newBuilder()

Review comment:
       lets file a JIRA to have one config that will turn off any table service?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to