This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 43ff95f  Upgrade to Kafka 2.4 and fix flaky tests (#1581)
43ff95f is described below

commit 43ff95f58f193766b8adb6b85e1d45b078c43721
Author: jia-gao <[email protected]>
AuthorDate: Thu Feb 3 17:02:26 2022 -0800

    Upgrade to Kafka 2.4 and fix flaky tests (#1581)
    
    * Upgrade to Kafka 2.4
    
    fix consumer group rebalance issue
    
    Fix flaky TestZkLocalApplicationRunner
    
    * Add tearDown() to StreamApplicationIntegrationTestHarness to clean up 
topicConsumerMap
    
    Co-authored-by: Jiazhou Gao <[email protected]>
---
 gradle/dependency-versions.gradle                  |  2 +-
 .../CheckpointVersionIntegrationTest.java          |  6 +--
 .../kv/TransactionalStateIntegrationTest.java      |  6 +--
 ...ransactionalStateMultiStoreIntegrationTest.java |  6 +--
 .../StreamApplicationIntegrationTestHarness.java   | 46 +++++++++++++++-------
 ...estStreamApplicationIntegrationTestHarness.java |  3 +-
 .../operator/TestRepartitionJoinWindowApp.java     |  5 +--
 .../processor/TestZkLocalApplicationRunner.java    | 12 ++++--
 8 files changed, 53 insertions(+), 33 deletions(-)

diff --git a/gradle/dependency-versions.gradle 
b/gradle/dependency-versions.gradle
index ff99eb6..7d2b019 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,7 +36,7 @@
   jodaTimeVersion = "2.10.10"
   joptSimpleVersion = "5.0.4"
   junitVersion = "4.12"
-  kafkaVersion = "2.3.1"
+  kafkaVersion = "2.4.1"
   log4jVersion = "1.2.17"
   log4j2Version = "2.16.0"
   metricsVersion = "2.2.0"
diff --git 
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index 2b6f045..ef8409c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -128,7 +128,7 @@ public class CheckpointVersionIntegrationTest extends 
StreamApplicationIntegrati
     // verify that the input messages were produced successfully
     if (inputMessages.size() > 0) {
       List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(Collections.singletonList(INPUT_TOPIC), 
inputMessages.size());
+          consumeMessages(INPUT_TOPIC, inputMessages.size());
       List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(expectedInputTopicMessages, readInputMessages);
     }
@@ -143,7 +143,7 @@ public class CheckpointVersionIntegrationTest extends 
StreamApplicationIntegrati
     // consume and verify the changelog messages
     if (expectedChangelogMessages.size() > 0) {
       List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(Collections.singletonList(CHANGELOG_TOPIC), 
expectedChangelogMessages.size());
+          consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
       List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(expectedChangelogMessages, changelogMessages);
     }
@@ -172,7 +172,7 @@ public class CheckpointVersionIntegrationTest extends 
StreamApplicationIntegrati
 
     // consume and verify any additional changelog messages
     List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(Collections.singletonList(changelogTopic), 
expectedChangelogMessages.size());
+        consumeMessages(changelogTopic, expectedChangelogMessages.size());
     List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
     Assert.assertEquals(expectedChangelogMessages, changelogMessages);
   }
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
index 6f9427c..771f93d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
@@ -131,7 +131,7 @@ public class TransactionalStateIntegrationTest extends 
StreamApplicationIntegrat
     // verify that the input messages were produced successfully
     if (inputMessages.size() > 0) {
       List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(Collections.singletonList(INPUT_TOPIC), 
inputMessages.size());
+          consumeMessages(INPUT_TOPIC, inputMessages.size());
       List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(inputMessages, readInputMessages);
     }
@@ -147,7 +147,7 @@ public class TransactionalStateIntegrationTest extends 
StreamApplicationIntegrat
     // consume and verify the changelog messages
     if (expectedChangelogMessages.size() > 0) {
       List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(Collections.singletonList(CHANGELOG_TOPIC), 
expectedChangelogMessages.size());
+          consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
       List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(expectedChangelogMessages, changelogMessages);
     }
@@ -177,7 +177,7 @@ public class TransactionalStateIntegrationTest extends 
StreamApplicationIntegrat
 
     // consume and verify any additional changelog messages
     List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(Collections.singletonList(changelogTopic), 
expectedChangelogMessages.size());
+        consumeMessages(changelogTopic, expectedChangelogMessages.size());
     List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
     Assert.assertEquals(expectedChangelogMessages, changelogMessages);
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
index e0fbaca..765bd45 100644
--- 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
@@ -139,7 +139,7 @@ public class TransactionalStateMultiStoreIntegrationTest 
extends StreamApplicati
     // verify that the input messages were produced successfully
     if (inputMessages.size() > 0) {
       List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(Collections.singletonList(INPUT_TOPIC), 
inputMessages.size());
+          consumeMessages(INPUT_TOPIC, inputMessages.size());
       List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(inputMessages, readInputMessages);
     }
@@ -156,7 +156,7 @@ public class TransactionalStateMultiStoreIntegrationTest 
extends StreamApplicati
     // consume and verify the changelog messages
     if (expectedChangelogMessages.size() > 0) {
       List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(Collections.singletonList(STORE_1_CHANGELOG), 
expectedChangelogMessages.size());
+          consumeMessages(STORE_1_CHANGELOG, expectedChangelogMessages.size());
       List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(expectedChangelogMessages, changelogMessages);
     }
@@ -190,7 +190,7 @@ public class TransactionalStateMultiStoreIntegrationTest 
extends StreamApplicati
 
     // consume and verify any additional changelog messages
     List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(Collections.singletonList(changelogTopic), 
expectedChangelogMessages.size());
+        consumeMessages(changelogTopic, expectedChangelogMessages.size());
     List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
     Assert.assertEquals(expectedChangelogMessages, changelogMessages);
 
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 012eece..2960e9c 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -20,15 +20,15 @@ package org.apache.samza.test.framework;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -39,6 +39,7 @@ import org.apache.samza.execution.TestStreamManager;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.junit.After;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,7 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.*;
  *
  * Execution model: {@link SamzaApplication}s are run as their own {@link 
org.apache.samza.job.local.ThreadJob}s.
  * Similarly, embedded Kafka servers and Zookeeper servers are run as their 
own threads.
- * {@link #produceMessage(String, int, String, String)} and {@link 
#consumeMessages(Collection, int)} are blocking calls.
+ * {@link #produceMessage(String, int, String, String)} and {@link 
#consumeMessages(String, int)} are blocking calls.
  *
  * <h3>Usage Example</h3>
  * Here is an actual test that publishes a message into Kafka, runs an 
application, and verifies consumption
@@ -100,6 +101,11 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
   private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
   private static final int DEFAULT_REPLICATION_FACTOR = 1;
   private int numEmptyPolls = 3;
+  /**
+   A new Kafka consumer will be created to consume messages from every new 
topic to avoid consumer group rebalance
+   Note that the consumers in this map will not be initialized in {@link 
#setUp()} but will be shutdown in {@link #tearDown()}
+   */
+  private Map<String, KafkaConsumer> topicConsumerMap = new HashMap<>();
 
   /**
    * Creates a kafka topic with the provided name and the number of partitions
@@ -128,7 +134,6 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} 
messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
@@ -136,24 +141,25 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and 
the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost 
{@param threshold}
    */
-  public List<ConsumerRecord<String, String>> 
consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, 
int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);
-
+    KafkaConsumer kafkaConsumer =
+        topicConsumerMap.computeIfAbsent(topic, t -> new 
KafkaConsumer<>(createConsumerConfigs()));
+    kafkaConsumer.subscribe(Collections.singletonList(topic));
     while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) {
-      ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS);
-      LOG.info("Read {} messages from topics: {}", records.count(), 
StringUtils.join(topics, ","));
+      ConsumerRecords<String, String> records = 
kafkaConsumer.poll(POLL_TIMEOUT_MS);
+      LOG.info("Read {} messages from topic: {}", records.count(), topic);
       if (!records.isEmpty()) {
         Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
         while (iterator.hasNext() && recordList.size() < threshold) {
           ConsumerRecord record = iterator.next();
-          LOG.info("Read key: {} val: {} from topic: {} on partition: {}",
-              record.key(), record.value(), record.topic(), 
record.partition());
+          LOG.info("Read key: {} val: {} from topic: {} on partition: {}", 
record.key(), record.value(), record.topic(),
+              record.partition());
           recordList.add(record);
           emptyPollCount = 0;
         }
@@ -173,8 +179,7 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
    * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
    * if necessary
    */
-  protected RunApplicationContext runApplication(SamzaApplication 
streamApplication,
-      String appName,
+  protected RunApplicationContext runApplication(SamzaApplication 
streamApplication, String appName,
       Map<String, String> overriddenConfigs) {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(ApplicationConfig.APP_RUNNER_CLASS, 
"org.apache.samza.runtime.LocalApplicationRunner");
@@ -263,6 +268,19 @@ public class StreamApplicationIntegrationTestHarness 
extends IntegrationTestHarn
      */
     consumerProps.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, 
STRING_DESERIALIZER);
     consumerProps.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, 
STRING_DESERIALIZER);
+
+    // for every topic create a new consumer group to avoid group rebalance
+    consumerProps.setProperty(GROUP_ID_CONFIG, "group" + 
topicConsumerMap.size());
     return consumerProps;
   }
+
+  @After
+  @Override
+  public void tearDown() {
+    topicConsumerMap.values().forEach(c -> {
+      c.unsubscribe();
+      c.close();
+    });
+    super.tearDown();
+  }
 }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
index 4aeaf6a..f3ef0a1 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
@@ -20,7 +20,6 @@
 package org.apache.samza.test.framework;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -45,7 +44,7 @@ public class TestStreamApplicationIntegrationTestHarness 
extends StreamApplicati
     // verify that the input messages were produced successfully
     if (inputMessages.size() > 0) {
       List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(Collections.singletonList(INPUT_TOPIC), 
inputMessages.size());
+          consumeMessages(INPUT_TOPIC, inputMessages.size());
       List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
       Assert.assertEquals(inputMessages, readInputMessages);
     }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index ecd6c04..5e6b342 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -19,7 +19,6 @@
 package org.apache.samza.test.operator;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -92,7 +91,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     runApplication(app, appName, configs);
 
     // consume and validate result
-    List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
+    List<ConsumerRecord<String, String>> messages = 
consumeMessages(outputTopicName, 2);
     assertEquals(2, messages.size());
 
     Assert.assertFalse(KafkaSystemAdmin.deleteMessageCalled);
@@ -121,7 +120,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     runApplication(app, appName, configs);
 
     // consume and validate result
-    List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
+    List<ConsumerRecord<String, String>> messages = 
consumeMessages(outputTopicName, 2);
     assertEquals(2, messages.size());
 
     for (ConsumerRecord<String, String> message : messages) {
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 33c5a65..23821cc 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -125,6 +125,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
   private static final String JOB_DEBOUNCE_TIME_MS = "10000";
   private static final String BARRIER_TIMEOUT_MS = "10000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", 
"0000000001", "0000000002"};
+  private static final int NUM_PARTITION = 5;
 
   private String inputKafkaTopic;
   private String outputKafkaTopic;
@@ -137,6 +138,7 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
   private String testStreamAppName;
   private String testStreamAppId;
   private MetadataStore zkMetadataStore;
+  private Map<String, Integer> topicToPartitionCount;
 
   @Rule
   public Timeout testTimeOutInMillis = new Timeout(150000);
@@ -171,11 +173,11 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
     zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, 
ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
     zkUtils.connect();
 
-    ImmutableMap<String, Integer> topicToPartitionCount = ImmutableMap.of(
+    topicToPartitionCount = ImmutableMap.of(
         inputSinglePartitionKafkaTopic, 1,
         outputSinglePartitionKafkaTopic, 1,
-        inputKafkaTopic, 5,
-        outputKafkaTopic, 5);
+        inputKafkaTopic, NUM_PARTITION,
+        outputKafkaTopic, NUM_PARTITION);
 
     List<NewTopic> newTopics =
         ImmutableList.of(inputKafkaTopic, outputKafkaTopic, 
inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)
@@ -197,10 +199,12 @@ public class TestZkLocalApplicationRunner extends 
IntegrationTestHarness {
   }
 
   private void publishKafkaEvents(String topic, int startIndex, int endIndex, 
String streamProcessorId) {
+    int partitionCount = topicToPartitionCount.getOrDefault(topic, 1);
     for (int eventIndex = startIndex; eventIndex < endIndex; eventIndex++) {
       try {
         LOGGER.info("Publish kafka event with index : {} for stream processor: 
{}.", eventIndex, streamProcessorId);
-        producer.send(new ProducerRecord(topic, new 
TestKafkaEvent(streamProcessorId, 
String.valueOf(eventIndex)).toString().getBytes()));
+        producer.send(new ProducerRecord(topic, eventIndex % partitionCount, 
null,
+            new TestKafkaEvent(streamProcessorId, 
String.valueOf(eventIndex)).toString().getBytes()));
       } catch (Exception  e) {
         LOGGER.error("Publishing to kafka topic: {} resulted in exception: 
{}.", new Object[]{topic, e});
         throw new SamzaException(e);

Reply via email to