This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 43ff95f Upgrade to Kafka 2.4 and fix flaky tests (#1581)
43ff95f is described below
commit 43ff95f58f193766b8adb6b85e1d45b078c43721
Author: jia-gao <[email protected]>
AuthorDate: Thu Feb 3 17:02:26 2022 -0800
Upgrade to Kafka 2.4 and fix flaky tests (#1581)
* Upgrade to Kafka 2.4
fix consumer group rebalance issue
Fix flaky TestZkLocalApplicationRunner
* Add tearDown() to StreamApplicationIntegrationTestHarness to clean up
topicConsumerMap
Co-authored-by: Jiazhou Gao <[email protected]>
---
gradle/dependency-versions.gradle | 2 +-
.../CheckpointVersionIntegrationTest.java | 6 +--
.../kv/TransactionalStateIntegrationTest.java | 6 +--
...ransactionalStateMultiStoreIntegrationTest.java | 6 +--
.../StreamApplicationIntegrationTestHarness.java | 46 +++++++++++++++-------
...estStreamApplicationIntegrationTestHarness.java | 3 +-
.../operator/TestRepartitionJoinWindowApp.java | 5 +--
.../processor/TestZkLocalApplicationRunner.java | 12 ++++--
8 files changed, 53 insertions(+), 33 deletions(-)
diff --git a/gradle/dependency-versions.gradle
b/gradle/dependency-versions.gradle
index ff99eb6..7d2b019 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -36,7 +36,7 @@
jodaTimeVersion = "2.10.10"
joptSimpleVersion = "5.0.4"
junitVersion = "4.12"
- kafkaVersion = "2.3.1"
+ kafkaVersion = "2.4.1"
log4jVersion = "1.2.17"
log4j2Version = "2.16.0"
metricsVersion = "2.2.0"
diff --git
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index 2b6f045..ef8409c 100644
---
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -128,7 +128,7 @@ public class CheckpointVersionIntegrationTest extends
StreamApplicationIntegrati
// verify that the input messages were produced successfully
if (inputMessages.size() > 0) {
List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(Collections.singletonList(INPUT_TOPIC),
inputMessages.size());
+ consumeMessages(INPUT_TOPIC, inputMessages.size());
List<String> readInputMessages =
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedInputTopicMessages, readInputMessages);
}
@@ -143,7 +143,7 @@ public class CheckpointVersionIntegrationTest extends
StreamApplicationIntegrati
// consume and verify the changelog messages
if (expectedChangelogMessages.size() > 0) {
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(CHANGELOG_TOPIC),
expectedChangelogMessages.size());
+ consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
}
@@ -172,7 +172,7 @@ public class CheckpointVersionIntegrationTest extends
StreamApplicationIntegrati
// consume and verify any additional changelog messages
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(changelogTopic),
expectedChangelogMessages.size());
+ consumeMessages(changelogTopic, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
}
diff --git
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
index 6f9427c..771f93d 100644
---
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
@@ -131,7 +131,7 @@ public class TransactionalStateIntegrationTest extends
StreamApplicationIntegrat
// verify that the input messages were produced successfully
if (inputMessages.size() > 0) {
List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(Collections.singletonList(INPUT_TOPIC),
inputMessages.size());
+ consumeMessages(INPUT_TOPIC, inputMessages.size());
List<String> readInputMessages =
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(inputMessages, readInputMessages);
}
@@ -147,7 +147,7 @@ public class TransactionalStateIntegrationTest extends
StreamApplicationIntegrat
// consume and verify the changelog messages
if (expectedChangelogMessages.size() > 0) {
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(CHANGELOG_TOPIC),
expectedChangelogMessages.size());
+ consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
}
@@ -177,7 +177,7 @@ public class TransactionalStateIntegrationTest extends
StreamApplicationIntegrat
// consume and verify any additional changelog messages
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(changelogTopic),
expectedChangelogMessages.size());
+ consumeMessages(changelogTopic, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
diff --git
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
index e0fbaca..765bd45 100644
---
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
@@ -139,7 +139,7 @@ public class TransactionalStateMultiStoreIntegrationTest
extends StreamApplicati
// verify that the input messages were produced successfully
if (inputMessages.size() > 0) {
List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(Collections.singletonList(INPUT_TOPIC),
inputMessages.size());
+ consumeMessages(INPUT_TOPIC, inputMessages.size());
List<String> readInputMessages =
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(inputMessages, readInputMessages);
}
@@ -156,7 +156,7 @@ public class TransactionalStateMultiStoreIntegrationTest
extends StreamApplicati
// consume and verify the changelog messages
if (expectedChangelogMessages.size() > 0) {
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(STORE_1_CHANGELOG),
expectedChangelogMessages.size());
+ consumeMessages(STORE_1_CHANGELOG, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
}
@@ -190,7 +190,7 @@ public class TransactionalStateMultiStoreIntegrationTest
extends StreamApplicati
// consume and verify any additional changelog messages
List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(Collections.singletonList(changelogTopic),
expectedChangelogMessages.size());
+ consumeMessages(changelogTopic, expectedChangelogMessages.size());
List<String> changelogMessages =
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(expectedChangelogMessages, changelogMessages);
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 012eece..2960e9c 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -20,15 +20,15 @@ package org.apache.samza.test.framework;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.ApplicationConfig;
@@ -39,6 +39,7 @@ import org.apache.samza.execution.TestStreamManager;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ import static
org.apache.kafka.clients.producer.ProducerConfig.*;
*
* Execution model: {@link SamzaApplication}s are run as their own {@link
org.apache.samza.job.local.ThreadJob}s.
* Similarly, embedded Kafka servers and Zookeeper servers are run as their
own threads.
- * {@link #produceMessage(String, int, String, String)} and {@link
#consumeMessages(Collection, int)} are blocking calls.
+ * {@link #produceMessage(String, int, String, String)} and {@link
#consumeMessages(String, int)} are blocking calls.
*
* <h3>Usage Example</h3>
* Here is an actual test that publishes a message into Kafka, runs an
application, and verifies consumption
@@ -100,6 +101,11 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
private static final int DEFAULT_REPLICATION_FACTOR = 1;
private int numEmptyPolls = 3;
+ /**
+ A new Kafka consumer will be created to consume messages from every new
topic to avoid consumer group rebalance
+ Note that the consumers in this map will not be initialized in {@link
#setUp()} but will be shutdown in {@link #tearDown()}
+ */
+ private Map<String, KafkaConsumer> topicConsumerMap = new HashMap<>();
/**
* Creates a kafka topic with the provided name and the number of partitions
@@ -128,7 +134,6 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
}
-
/**
* Read messages from the provided list of topics until {@param threshold}
messages have been read or until
* {@link #numEmptyPolls} polls return no messages.
@@ -136,24 +141,25 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
* The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and
the number of empty polls are
* determined by {@link #numEmptyPolls}
*
- * @param topics the list of topics to consume from
+ * @param topic the topic to consume from
* @param threshold the number of messages to consume
* @return the list of {@link ConsumerRecord}s whose size can be atmost
{@param threshold}
*/
- public List<ConsumerRecord<String, String>>
consumeMessages(Collection<String> topics, int threshold) {
+ public List<ConsumerRecord<String, String>> consumeMessages(String topic,
int threshold) {
int emptyPollCount = 0;
List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
- consumer.subscribe(topics);
-
+ KafkaConsumer kafkaConsumer =
+ topicConsumerMap.computeIfAbsent(topic, t -> new
KafkaConsumer<>(createConsumerConfigs()));
+ kafkaConsumer.subscribe(Collections.singletonList(topic));
while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) {
- ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS);
- LOG.info("Read {} messages from topics: {}", records.count(),
StringUtils.join(topics, ","));
+ ConsumerRecords<String, String> records =
kafkaConsumer.poll(POLL_TIMEOUT_MS);
+ LOG.info("Read {} messages from topic: {}", records.count(), topic);
if (!records.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext() && recordList.size() < threshold) {
ConsumerRecord record = iterator.next();
- LOG.info("Read key: {} val: {} from topic: {} on partition: {}",
- record.key(), record.value(), record.topic(),
record.partition());
+ LOG.info("Read key: {} val: {} from topic: {} on partition: {}",
record.key(), record.value(), record.topic(),
+ record.partition());
recordList.add(record);
emptyPollCount = 0;
}
@@ -173,8 +179,7 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
* @return RunApplicationContext which contains objects created within
runApplication, to be used for verification
* if necessary
*/
- protected RunApplicationContext runApplication(SamzaApplication
streamApplication,
- String appName,
+ protected RunApplicationContext runApplication(SamzaApplication
streamApplication, String appName,
Map<String, String> overriddenConfigs) {
Map<String, String> configMap = new HashMap<>();
configMap.put(ApplicationConfig.APP_RUNNER_CLASS,
"org.apache.samza.runtime.LocalApplicationRunner");
@@ -263,6 +268,19 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
*/
consumerProps.setProperty(KEY_DESERIALIZER_CLASS_CONFIG,
STRING_DESERIALIZER);
consumerProps.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG,
STRING_DESERIALIZER);
+
+ // for every topic create a new consumer group to avoid group rebalance
+ consumerProps.setProperty(GROUP_ID_CONFIG, "group" +
topicConsumerMap.size());
return consumerProps;
}
+
+ @After
+ @Override
+ public void tearDown() {
+ topicConsumerMap.values().forEach(c -> {
+ c.unsubscribe();
+ c.close();
+ });
+ super.tearDown();
+ }
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
index 4aeaf6a..f3ef0a1 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/TestStreamApplicationIntegrationTestHarness.java
@@ -20,7 +20,6 @@
package org.apache.samza.test.framework;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -45,7 +44,7 @@ public class TestStreamApplicationIntegrationTestHarness
extends StreamApplicati
// verify that the input messages were produced successfully
if (inputMessages.size() > 0) {
List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(Collections.singletonList(INPUT_TOPIC),
inputMessages.size());
+ consumeMessages(INPUT_TOPIC, inputMessages.size());
List<String> readInputMessages =
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
Assert.assertEquals(inputMessages, readInputMessages);
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index ecd6c04..5e6b342 100644
---
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -19,7 +19,6 @@
package org.apache.samza.test.operator;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -92,7 +91,7 @@ public class TestRepartitionJoinWindowApp extends
StreamApplicationIntegrationTe
runApplication(app, appName, configs);
// consume and validate result
- List<ConsumerRecord<String, String>> messages =
consumeMessages(Collections.singletonList(outputTopicName), 2);
+ List<ConsumerRecord<String, String>> messages =
consumeMessages(outputTopicName, 2);
assertEquals(2, messages.size());
Assert.assertFalse(KafkaSystemAdmin.deleteMessageCalled);
@@ -121,7 +120,7 @@ public class TestRepartitionJoinWindowApp extends
StreamApplicationIntegrationTe
runApplication(app, appName, configs);
// consume and validate result
- List<ConsumerRecord<String, String>> messages =
consumeMessages(Collections.singletonList(outputTopicName), 2);
+ List<ConsumerRecord<String, String>> messages =
consumeMessages(outputTopicName, 2);
assertEquals(2, messages.size());
for (ConsumerRecord<String, String> message : messages) {
diff --git
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 33c5a65..23821cc 100644
---
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -125,6 +125,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
private static final String JOB_DEBOUNCE_TIME_MS = "10000";
private static final String BARRIER_TIMEOUT_MS = "10000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000",
"0000000001", "0000000002"};
+ private static final int NUM_PARTITION = 5;
private String inputKafkaTopic;
private String outputKafkaTopic;
@@ -137,6 +138,7 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
private String testStreamAppName;
private String testStreamAppId;
private MetadataStore zkMetadataStore;
+ private Map<String, Integer> topicToPartitionCount;
@Rule
public Timeout testTimeOutInMillis = new Timeout(150000);
@@ -171,11 +173,11 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS,
ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
- ImmutableMap<String, Integer> topicToPartitionCount = ImmutableMap.of(
+ topicToPartitionCount = ImmutableMap.of(
inputSinglePartitionKafkaTopic, 1,
outputSinglePartitionKafkaTopic, 1,
- inputKafkaTopic, 5,
- outputKafkaTopic, 5);
+ inputKafkaTopic, NUM_PARTITION,
+ outputKafkaTopic, NUM_PARTITION);
List<NewTopic> newTopics =
ImmutableList.of(inputKafkaTopic, outputKafkaTopic,
inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic)
@@ -197,10 +199,12 @@ public class TestZkLocalApplicationRunner extends
IntegrationTestHarness {
}
private void publishKafkaEvents(String topic, int startIndex, int endIndex,
String streamProcessorId) {
+ int partitionCount = topicToPartitionCount.getOrDefault(topic, 1);
for (int eventIndex = startIndex; eventIndex < endIndex; eventIndex++) {
try {
LOGGER.info("Publish kafka event with index : {} for stream processor:
{}.", eventIndex, streamProcessorId);
- producer.send(new ProducerRecord(topic, new
TestKafkaEvent(streamProcessorId,
String.valueOf(eventIndex)).toString().getBytes()));
+ producer.send(new ProducerRecord(topic, eventIndex % partitionCount,
null,
+ new TestKafkaEvent(streamProcessorId,
String.valueOf(eventIndex)).toString().getBytes()));
} catch (Exception e) {
LOGGER.error("Publishing to kafka topic: {} resulted in exception:
{}.", new Object[]{topic, e});
throw new SamzaException(e);