This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b88d4beb23a5ba48fbe642d1f83e55a332b7ce5d
Author: rmahindra123 <[email protected]>
AuthorDate: Fri Dec 3 21:30:32 2021 -0800

    [HUDI-2890] Kafka Connect: Fix failed writes and avoid table service 
concurrent operations (#4211)
    
    * Fix kafka connect readme
    
    * Fix handling of errors in write records for kafka connect
    
    * By default, ensure we skip error records and keep the pipeline alive
    
    * Fix indentation
    
    Co-authored-by: Rajesh Mahindra <[email protected]>
    (cherry picked from commit 94f45e928c917c87934badb2bb4bc8c767a14e3a)
---
 hudi-kafka-connect/README.md                       |  71 +++++++------
 hudi-kafka-connect/demo/setupKafka.sh              |   2 +-
 .../transaction/ConnectTransactionCoordinator.java |  55 +++++++---
 .../writers/ConnectTransactionServices.java        |   2 +-
 .../hudi/connect/writers/KafkaConnectConfigs.java  |  14 +++
 .../writers/KafkaConnectTransactionServices.java   |  30 +++---
 .../writers/KafkaConnectWriterProvider.java        |  12 ++-
 .../connect/TestConnectTransactionCoordinator.java | 117 +++++++++++++++------
 .../helper/MockConnectTransactionServices.java     |   3 +-
 .../test/resources/log4j-surefire-quiet.properties |  30 ++++++
 .../src/test/resources/log4j-surefire.properties   |  31 ++++++
 11 files changed, 271 insertions(+), 96 deletions(-)

diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index 0bf53d6..9d63f5a 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -54,35 +54,17 @@ mvn package -DskipTests -pl 
packaging/hudi-kafka-connect-bundle -am
 cp 
$HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar
 /usr/local/share/kafka/plugins/lib
 ```
 
-### 2 - Set up the docker containers
-
-To run the connect locally, we need kafka, zookeeper, hdfs, hive etc. To make 
the setup easier, we use the docker 
-containers from the hudi docker demo. Refer to [this link for the 
setup](https://hudi.apache.org/docs/docker_demo)
-
-Essentially, follow the steps listed here:
-
-/etc/hosts : The demo references many services running in container by the 
hostname. Add the following settings to /etc/hosts
-```bash
-127.0.0.1 adhoc-1
-127.0.0.1 adhoc-2
-127.0.0.1 namenode
-127.0.0.1 datanode1
-127.0.0.1 hiveserver
-127.0.0.1 hivemetastore
-127.0.0.1 kafkabroker
-127.0.0.1 sparkmaster
-127.0.0.1 zookeeper
-```
-
-Bring up the docker containers
+Set up a Kafka broker locally. Download the latest apache kafka from 
[here](https://kafka.apache.org/downloads). 
+Once downloaded and built, run the Zookeeper server and Kafka server using the 
command line tools.
 ```bash
-cd ${HUDI_DIR}/docker
-./setup_demo.sh
+export KAFKA_HOME=/path/to/kafka_install_dir
+cd $KAFKA_HOME
+./bin/zookeeper-server-start.sh ./config/zookeeper.properties
+./bin/kafka-server-start.sh ./config/server.properties
 ```
+Wait until the kafka cluster is up and running.
 
-The schema registry and kafka connector can be run from host system directly 
(mac/ linux).
-
-### 3 - Set up the schema registry
+### 2 - Set up the schema registry
 
 Hudi leverages schema registry to obtain the latest schema when writing 
records. While it supports most popular schema
 registries, we use Confluent schema registry. Download the
@@ -97,7 +79,7 @@ cd $CONFLUENT_DIR
 ./bin/schema-registry-start etc/schema-registry/schema-registry.properties
 ```
 
-### 4 - Create the Hudi Control Topic for Coordination of the transactions
+### 3 - Create the Hudi Control Topic for Coordination of the transactions
 
 The control topic should only have `1` partition, since its used to coordinate 
the Hudi write transactions across the multiple Connect tasks.
 
@@ -107,7 +89,7 @@ cd $KAFKA_HOME
 ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 
--replication-factor 1 --bootstrap-server localhost:9092
 ```
 
-### 5 - Create the Hudi Topic for the Sink and insert data into the topic
+### 4 - Create the Hudi Topic for the Sink and insert data into the topic
 
 Open a terminal to execute the following command:
 
@@ -123,7 +105,7 @@ to generate, with each batch containing a number of 
messages and idle time betwe
 bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
 ```
 
-### 6 - Run the Sink connector worker (multiple workers can be run)
+### 5 - Run the Sink connector worker (multiple workers can be run)
 
 The Kafka connect is a distributed platform, with the ability to run one or 
more workers (each running multiple tasks) 
 that parallely process the records from the Kafka partitions for the same 
topic. We provide a properties file with 
@@ -137,7 +119,7 @@ cd ${KAFKA_HOME}
 ./bin/connect-distributed.sh 
${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties
 ```
 
-### 7 - To add the Hudi Sink to the Connector (delete it if you want to 
re-configure)
+### 6 - To add the Hudi Sink to the Connector (delete it if you want to 
re-configure)
 
 Once the Connector has started, it will not run the Sink, until the Hudi sink 
is added using the web api. The following 
 curl APIs can be used to delete and add a new Hudi Sink. Again, a default 
configuration is provided for the Hudi Sink, 
@@ -188,7 +170,7 @@ total 5168
 -rw-r--r--  1 user  wheel  440214 Sep 13 21:43 
E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
 ```
 
-### 8 - Run async compaction and clustering if scheduled
+### 7 - Run async compaction and clustering if scheduled
 
 When using Merge-On-Read (MOR) as the table type, async compaction and 
clustering can be scheduled when the Sink is
 running. Inline compaction and clustering are disabled by default due to 
performance reason. By default, async
@@ -318,11 +300,36 @@ hoodie.write.concurrency.mode=single_writer
 Note that you don't have to provide the instant time through `--instant-time`. 
In that case, the earliest scheduled
 clustering is going to be executed.
 
-### 9- Querying via Hive
+### 8- Querying via Hive
 
 In this section we explain how one can test syncing of the Hudi table with 
Hive server/ Hive Metastore, 
 that enable querying via Hive, Presto etc. 
 
+To ease the deployment of HDFS, Hive Server, Hive Metastore etc. for testing 
hive sync, we use the docker
+containers from the hudi docker demo. Refer to [this link for the 
setup](https://hudi.apache.org/docs/docker_demo).
+Additionally, the docker deploys kafka and zookeeper too, so you do not need 
to run them explicitly in this setup.
+
+Essentially, follow the steps listed here:
+
+/etc/hosts : The demo references many services running in container by the 
hostname. Add the following settings to /etc/hosts
+```bash
+127.0.0.1 adhoc-1
+127.0.0.1 adhoc-2
+127.0.0.1 namenode
+127.0.0.1 datanode1
+127.0.0.1 hiveserver
+127.0.0.1 hivemetastore
+127.0.0.1 kafkabroker
+127.0.0.1 sparkmaster
+127.0.0.1 zookeeper
+```
+
+Bring up the docker containers
+```bash
+cd ${HUDI_DIR}/docker
+./setup_demo.sh
+```
+
 Firstly, (re)-install a different connector that is configured to write the 
Hudi table to Hdfs instead of local filesystem.
 
 ```bash
diff --git a/hudi-kafka-connect/demo/setupKafka.sh 
b/hudi-kafka-connect/demo/setupKafka.sh
index bc615a5..4915564 100755
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -50,7 +50,7 @@ fi
 
 ## defaults
 rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json
-kafkaBrokerHostname=kafkabroker
+kafkaBrokerHostname=localhost
 kafkaTopicName=hudi-test-topic
 numKafkaPartitions=4
 recordKey=volume
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index 7acd875..14fd880 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -280,26 +280,56 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
 
   private void onReceiveWriteStatus(ControlMessage message) {
     ControlMessage.ParticipantInfo participantInfo = 
message.getParticipantInfo();
-    int partition = message.getSenderPartition();
-    partitionsWriteStatusReceived.put(partition, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
-    currentConsumedKafkaOffsets.put(partition, 
participantInfo.getKafkaOffset());
+    int partitionId = message.getSenderPartition();
+    partitionsWriteStatusReceived.put(partitionId, 
KafkaConnectUtils.getWriteStatuses(participantInfo));
+    currentConsumedKafkaOffsets.put(partitionId, 
participantInfo.getKafkaOffset());
     if (partitionsWriteStatusReceived.size() >= numPartitions
         && currentState.equals(State.ENDED_COMMIT)) {
       // Commit the kafka offsets to the commit file
       try {
         List<WriteStatus> allWriteStatuses = new ArrayList<>();
         partitionsWriteStatusReceived.forEach((key, value) -> 
allWriteStatuses.addAll(value));
-        // Commit the last write in Hudi, along with the latest kafka offset
-        if (!allWriteStatuses.isEmpty()) {
-          transactionServices.endCommit(currentCommitTime,
+
+        long totalErrorRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalErrorRecords).sum();
+        long totalRecords = (long) 
allWriteStatuses.stream().mapToDouble(WriteStatus::getTotalRecords).sum();
+        boolean hasErrors = totalErrorRecords > 0;
+
+        if ((!hasErrors || configs.allowCommitOnErrors()) && 
!allWriteStatuses.isEmpty()) {
+          boolean success = transactionServices.endCommit(currentCommitTime,
               allWriteStatuses,
               transformKafkaOffsets(currentConsumedKafkaOffsets));
+
+          if (success) {
+            LOG.info("Commit " + currentCommitTime + " successful!");
+            currentState = State.WRITE_STATUS_RCVD;
+            globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
+            submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
+                message.getTopicName(),
+                currentCommitTime));
+            return;
+          } else {
+            LOG.error("Commit " + currentCommitTime + " failed!");
+          }
+        } else if (hasErrors) {
+          LOG.error("Coordinator found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
+          LOG.error("Printing out the top 100 errors");
+          
allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws 
-> {
+            LOG.error("Global error :", ws.getGlobalError());
+            if (ws.getErrors().size() > 0) {
+              ws.getErrors().forEach((key, value) -> LOG.trace("Error for 
key:" + key + " is " + value));
+            }
+          });
+        } else {
+          LOG.warn("Empty write statuses were received from all Participants");
         }
-        currentState = State.WRITE_STATUS_RCVD;
-        globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
-        submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
-            message.getTopicName(),
-            currentCommitTime));
+
+        // Submit the next start commit, that will rollback the current commit.
+        currentState = State.FAILED_COMMIT;
+        LOG.warn("Current commit " + currentCommitTime + " failed, so starting 
a new commit after recovery delay");
+        submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
+                partition.topic(),
+                StringUtils.EMPTY_STRING),
+            RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
       } catch (Exception exception) {
         LOG.error("Fatal error while committing file", exception);
       }
@@ -310,7 +340,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     // If we are still stuck in ENDED_STATE
     if (currentState.equals(State.ENDED_COMMIT)) {
       currentState = State.WRITE_STATUS_TIMEDOUT;
-      LOG.warn("Did not receive the Write Status from all partitions");
+      LOG.warn("Current commit " + currentCommitTime + " failed after a write 
status timeout, so starting a new commit after recovery delay");
       // Submit the next start commit
       submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
               partition.topic(),
@@ -365,6 +395,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     INIT,
     STARTED_COMMIT,
     ENDED_COMMIT,
+    FAILED_COMMIT,
     WRITE_STATUS_RCVD,
     WRITE_STATUS_TIMEDOUT,
     ACKED_COMMIT,
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
index b36e1f1..2ce44ff 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
@@ -32,7 +32,7 @@ public interface ConnectTransactionServices {
 
   String startCommit();
 
-  void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata);
+  boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata);
 
   Map<String, String> fetchLatestExtraCommitMetadata();
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index 692edf7..ec03451 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -88,6 +88,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
       .defaultValue(HiveSyncTool.class.getName())
       .withDocumentation("Meta sync client tool, using comma to separate multi 
tools");
 
+  public static final ConfigProperty<Boolean> ALLOW_COMMIT_ON_ERRORS = 
ConfigProperty
+      .key("hoodie.kafka.allow.commit.on.errors")
+      .defaultValue(true)
+      .withDocumentation("Commit even when some records failed to be written");
+
   protected KafkaConnectConfigs() {
     super();
   }
@@ -136,6 +141,10 @@ public class KafkaConnectConfigs extends HoodieConfig {
     return getString(META_SYNC_CLASSES);
   }
 
+  public Boolean allowCommitOnErrors() {
+    return getBoolean(ALLOW_COMMIT_ON_ERRORS);
+  }
+
   public static class Builder {
 
     protected final KafkaConnectConfigs connectConfigs = new 
KafkaConnectConfigs();
@@ -160,6 +169,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
       return this;
     }
 
+    public Builder withAllowCommitOnErrors(Boolean allowCommitOnErrors) {
+      connectConfigs.setValue(ALLOW_COMMIT_ON_ERRORS, 
String.valueOf(allowCommitOnErrors));
+      return this;
+    }
+
     // Kafka connect task are passed with props with type Map<>
     public Builder withProperties(Map<?, ?> properties) {
       connectConfigs.getProps().putAll(properties);
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index b18e7c6..cca738a 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -77,6 +77,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
 
   public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) 
throws HoodieException {
     this.connectConfigs = connectConfigs;
+    // This is the writeConfig for the Transaction Coordinator
     this.writeConfig = HoodieWriteConfig.newBuilder()
         .withEngineType(EngineType.JAVA)
         .withProperties(connectConfigs.getProps())
@@ -122,20 +123,23 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
   }
 
   @Override
-  public void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
-    javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
-    LOG.info("Ending Hudi commit " + commitTime);
-
-    // Schedule clustering and compaction as needed.
-    if (writeConfig.isAsyncClusteringEnabled()) {
-      javaClient.scheduleClustering(Option.empty()).ifPresent(
-          instantTs -> LOG.info("Scheduled clustering at instant time:" + 
instantTs));
-    }
-    if (isAsyncCompactionEnabled()) {
-      javaClient.scheduleCompaction(Option.empty()).ifPresent(
-          instantTs -> LOG.info("Scheduled compaction at instant time:" + 
instantTs));
+  public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
+    boolean success = javaClient.commit(commitTime, writeStatuses, 
Option.of(extraMetadata));
+    if (success) {
+      LOG.info("Ending Hudi commit " + commitTime);
+
+      // Schedule clustering and compaction as needed.
+      if (writeConfig.isAsyncClusteringEnabled()) {
+        javaClient.scheduleClustering(Option.empty()).ifPresent(
+            instantTs -> LOG.info("Scheduled clustering at instant time:" + 
instantTs));
+      }
+      if (isAsyncCompactionEnabled()) {
+        javaClient.scheduleCompaction(Option.empty()).ifPresent(
+            instantTs -> LOG.info("Scheduled compaction at instant time:" + 
instantTs));
+      }
+      syncMeta();
     }
-    syncMeta();
+    return success;
   }
 
   @Override
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
index 5bac1f0..6ab0469 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
@@ -73,7 +75,7 @@ public class KafkaConnectWriterProvider implements 
ConnectWriterProvider<WriteSt
       this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
           new TypedProperties(connectConfigs.getProps()));
 
-      // Create the write client to write some records in
+      // This is the writeConfig for the writers for the individual 
Transaction Coordinators
       writeConfig = HoodieWriteConfig.newBuilder()
           .withEngineType(EngineType.JAVA)
           .withProperties(connectConfigs.getProps())
@@ -84,6 +86,14 @@ public class KafkaConnectWriterProvider implements 
ConnectWriterProvider<WriteSt
           .withSchema(schemaProvider.getSourceSchema().toString())
           .withAutoCommit(false)
           
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+          // participants should not trigger table services, and leave it to 
the coordinator
+          .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+              .withAutoClean(false)
+              .withAutoArchive(false)
+              .withInlineCompaction(false).build())
+          .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+              .withInlineClustering(false)
+              .build())
           .build();
 
       context = new HoodieJavaEngineContext(hadoopConf);
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
index 6e049c6..f003fe9 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -50,7 +50,7 @@ import static org.mockito.Mockito.mock;
 public class TestConnectTransactionCoordinator {
 
   private static final String TOPIC_NAME = "kafka-connect-test-topic";
-  private static final int NUM_PARTITIONS = 4;
+  private static final int TOTAL_KAFKA_PARTITIONS = 4;
   private static final int MAX_COMMIT_ROUNDS = 5;
   private static final int TEST_TIMEOUT_SECS = 60;
 
@@ -63,10 +63,6 @@ public class TestConnectTransactionCoordinator {
   @BeforeEach
   public void setUp() throws Exception {
     transactionServices = new MockConnectTransactionServices();
-    configs = KafkaConnectConfigs.newBuilder()
-        .withCommitIntervalSecs(1L)
-        .withCoordinatorWriteTimeoutSecs(1L)
-        .build();
     latch = new CountDownLatch(1);
   }
 
@@ -77,13 +73,22 @@ public class TestConnectTransactionCoordinator {
     participant = new MockParticipant(kafkaControlAgent, latch, scenario, 
MAX_COMMIT_ROUNDS);
     participant.start();
 
+    KafkaConnectConfigs.Builder configBuilder = 
KafkaConnectConfigs.newBuilder()
+        .withCommitIntervalSecs(1L)
+        .withCoordinatorWriteTimeoutSecs(1L);
+
+    if 
(scenario.equals(MockParticipant.TestScenarios.SUBSET_WRITE_STATUS_FAILED)) {
+      configBuilder.withAllowCommitOnErrors(false);
+    }
+    configs = configBuilder.build();
+
     // Test the coordinator using the mock participant
     TransactionCoordinator coordinator = new ConnectTransactionCoordinator(
         configs,
         new TopicPartition(TOPIC_NAME, 0),
         kafkaControlAgent,
         transactionServices,
-        (bootstrapServers, topicName) -> NUM_PARTITIONS);
+        (bootstrapServers, topicName) -> TOTAL_KAFKA_PARTITIONS);
     coordinator.start();
 
     latch.await(TEST_TIMEOUT_SECS, TimeUnit.SECONDS);
@@ -119,7 +124,7 @@ public class TestConnectTransactionCoordinator {
       this.latch = latch;
       this.testScenario = testScenario;
       this.maxNumberCommitRounds = maxNumberCommitRounds;
-      this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1));
+      this.partition = new TopicPartition(TOPIC_NAME, (TOTAL_KAFKA_PARTITIONS 
- 1));
       this.kafkaOffsetsCommitted = new HashMap<>();
       expectedMsgType = ControlMessage.EventType.START_COMMIT;
       numberCommitRounds = 0;
@@ -162,39 +167,40 @@ public class TestConnectTransactionCoordinator {
 
     private void testScenarios(ControlMessage message) {
       assertEquals(expectedMsgType, message.getType());
-
       switch (message.getType()) {
         case START_COMMIT:
           expectedMsgType = ControlMessage.EventType.END_COMMIT;
           break;
         case END_COMMIT:
           assertEquals(kafkaOffsetsCommitted, 
message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
-          int numSuccessPartitions;
+          int numPartitionsThatReportWriteStatus;
           Map<Integer, Long> kafkaOffsets = new HashMap<>();
           List<ControlMessage> controlEvents = new ArrayList<>();
-          // Prepare the WriteStatuses for all partitions
-          for (int i = 1; i <= NUM_PARTITIONS; i++) {
-            try {
-              long kafkaOffset = (long) (Math.random() * 10000);
-              kafkaOffsets.put(i, kafkaOffset);
-              ControlMessage event = successWriteStatus(
-                  message.getCommitTime(),
-                  new TopicPartition(TOPIC_NAME, i),
-                  kafkaOffset);
-              controlEvents.add(event);
-            } catch (Exception exception) {
-              throw new HoodieException("Fatal error sending control event to 
Coordinator");
-            }
-          }
-
           switch (testScenario) {
             case ALL_CONNECT_TASKS_SUCCESS:
-              numSuccessPartitions = NUM_PARTITIONS;
+              composeControlEvent(message.getCommitTime(), false, 
kafkaOffsets, controlEvents);
+              numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
+              // This commit round should succeed, and the kafka offsets 
getting committed
+              kafkaOffsetsCommitted.putAll(kafkaOffsets);
+              expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
+              break;
+            case SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED:
+              composeControlEvent(message.getCommitTime(), true, kafkaOffsets, 
controlEvents);
+              numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
+              // Despite error records, this commit round should succeed, and 
the kafka offsets getting committed
               kafkaOffsetsCommitted.putAll(kafkaOffsets);
               expectedMsgType = ControlMessage.EventType.ACK_COMMIT;
               break;
+            case SUBSET_WRITE_STATUS_FAILED:
+              composeControlEvent(message.getCommitTime(), true, kafkaOffsets, 
controlEvents);
+              numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS;
+              // This commit round should fail, and a new commit round should 
start without kafka offsets getting committed
+              expectedMsgType = ControlMessage.EventType.START_COMMIT;
+              break;
             case SUBSET_CONNECT_TASKS_FAILED:
-              numSuccessPartitions = NUM_PARTITIONS / 2;
+              composeControlEvent(message.getCommitTime(), false, 
kafkaOffsets, controlEvents);
+              numPartitionsThatReportWriteStatus = TOTAL_KAFKA_PARTITIONS / 2;
+              // This commit round should fail, and a new commit round should 
start without kafka offsets getting committed
               expectedMsgType = ControlMessage.EventType.START_COMMIT;
               break;
             default:
@@ -202,7 +208,7 @@ public class TestConnectTransactionCoordinator {
           }
 
           // Send events based on test scenario
-          for (int i = 0; i < numSuccessPartitions; i++) {
+          for (int i = 0; i < numPartitionsThatReportWriteStatus; i++) {
             kafkaControlAgent.publishMessage(controlEvents.get(i));
           }
           break;
@@ -227,18 +233,36 @@ public class TestConnectTransactionCoordinator {
 
     public enum TestScenarios {
       SUBSET_CONNECT_TASKS_FAILED,
+      SUBSET_WRITE_STATUS_FAILED,
+      SUBSET_WRITE_STATUS_FAILED_BUT_IGNORED,
       ALL_CONNECT_TASKS_SUCCESS
     }
 
-    private static ControlMessage successWriteStatus(String commitTime,
-                                                     TopicPartition partition,
-                                                     long kafkaOffset) throws 
Exception {
-      // send WS
-      WriteStatus writeStatus = new WriteStatus();
-      WriteStatus status = new WriteStatus(false, 1.0);
-      for (int i = 0; i < 1000; i++) {
-        status.markSuccess(mock(HoodieRecord.class), Option.empty());
+    private static void composeControlEvent(String commitTime, boolean 
shouldIncludeFailedRecords, Map<Integer, Long> kafkaOffsets, 
List<ControlMessage> controlEvents) {
+      // Prepare the WriteStatuses for all partitions
+      for (int i = 1; i <= TOTAL_KAFKA_PARTITIONS; i++) {
+        try {
+          long kafkaOffset = (long) (Math.random() * 10000);
+          kafkaOffsets.put(i, kafkaOffset);
+          ControlMessage event = composeWriteStatusResponse(
+              commitTime,
+              new TopicPartition(TOPIC_NAME, i),
+              kafkaOffset,
+              shouldIncludeFailedRecords);
+          controlEvents.add(event);
+        } catch (Exception exception) {
+          throw new HoodieException("Fatal error sending control event to 
Coordinator");
+        }
       }
+    }
+
+    private static ControlMessage composeWriteStatusResponse(String commitTime,
+                                                             TopicPartition 
partition,
+                                                             long kafkaOffset,
+                                                             boolean 
includeFailedRecords) throws Exception {
+      // send WS
+      WriteStatus writeStatus = includeFailedRecords ? 
getSubsetFailedRecordsWriteStatus() : getAllSuccessfulRecordsWriteStatus();
+
       return ControlMessage.newBuilder()
           .setType(ControlMessage.EventType.WRITE_STATUS)
           .setTopicName(partition.topic())
@@ -255,4 +279,27 @@ public class TestConnectTransactionCoordinator {
           ).build();
     }
   }
+
+  private static WriteStatus getAllSuccessfulRecordsWriteStatus() {
+    // send WS
+    WriteStatus status = new WriteStatus(false, 0.0);
+    for (int i = 0; i < 1000; i++) {
+      status.markSuccess(mock(HoodieRecord.class), Option.empty());
+    }
+    return status;
+  }
+
+  private static WriteStatus getSubsetFailedRecordsWriteStatus() {
+    // send WS
+    WriteStatus status = new WriteStatus(false, 0.0);
+    for (int i = 0; i < 1000; i++) {
+      if (i % 10 == 0) {
+        status.markFailure(mock(HoodieRecord.class), new Throwable("Error 
writing record on disk"), Option.empty());
+      } else {
+        status.markSuccess(mock(HoodieRecord.class), Option.empty());
+      }
+    }
+    status.setGlobalError(new Throwable("More than one records failed to be 
written to storage"));
+    return status;
+  }
 }
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
index 6994c65..b3314ad 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
@@ -46,8 +46,9 @@ public class MockConnectTransactionServices implements 
ConnectTransactionService
   }
 
   @Override
-  public void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
+  public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
     assertEquals(String.valueOf(this.commitTime), commitTime);
+    return true;
   }
 
   @Override
diff --git 
a/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties 
b/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties
new file mode 100644
index 0000000..ca0a50c
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/log4j-surefire-quiet.properties
@@ -0,0 +1,30 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache.hudi=DEBUG
+log4j.logger.org.apache.hadoop.hbase=ERROR
+
+# CONSOLE is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# CONSOLE uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-kafka-connect/src/test/resources/log4j-surefire.properties 
b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties
new file mode 100644
index 0000000..32af462
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,31 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.hudi=DEBUG
+log4j.logger.org.apache.hadoop.hbase=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

Reply via email to