This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a0e65e KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799)
5a0e65e is described below
commit 5a0e65ed394da76ddebf387739f9dec8687a9485
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jun 5 14:38:08 2020 -0700
KAFKA-8011: Fix flaky RegexSourceIntegrationTest (#8799)
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang
<[email protected]>
---
.../processor/internals/RecordDeserializer.java | 4 +-
.../integration/RegexSourceIntegrationTest.java | 185 ++++++++++-----------
2 files changed, 95 insertions(+), 94 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index d0a2a80..86b1b44 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -63,7 +63,9 @@ class RecordDeserializer {
rawRecord.serializedKeySize(),
rawRecord.serializedValueSize(),
sourceNode.deserializeKey(rawRecord.topic(),
rawRecord.headers(), rawRecord.key()),
- sourceNode.deserializeValue(rawRecord.topic(),
rawRecord.headers(), rawRecord.value()), rawRecord.headers());
+ sourceNode.deserializeValue(rawRecord.topic(),
rawRecord.headers(), rawRecord.value()),
+ rawRecord.headers()
+ );
} catch (final Exception deserializationException) {
final
DeserializationExceptionHandler.DeserializationHandlerResponse response;
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 3e205f7..44cc745 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -41,8 +41,8 @@ import
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -52,6 +52,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import java.io.IOException;
import java.util.ArrayList;
@@ -124,11 +125,13 @@ public class RegexSourceIntegrationTest {
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration =
StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
-
CLUSTER.bootstrapServers(),
-
STRING_SERDE_CLASSNAME,
-
STRING_SERDE_CLASSNAME,
- properties);
+ streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new
TestName()),
+ CLUSTER.bootstrapServers(),
+ STRING_SERDE_CLASSNAME,
+ STRING_SERDE_CLASSNAME,
+ properties
+ );
}
@After
@@ -142,83 +145,89 @@ public class RegexSourceIntegrationTest {
@Test
public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+ try {
+ final Serde<String> stringSerde = Serdes.String();
- final Serde<String> stringSerde = Serdes.String();
-
- final List<String> expectedFirstAssignment =
Collections.singletonList("TEST-TOPIC-1");
- // we compare lists of subscribed topics and hence requiring the order
as well; this is guaranteed
- // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add
TEST-TOPIC-2 so the list is always
- // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429
behavior ever changed it may become a flaky test
- final List<String> expectedSecondAssignment =
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+ final List<String> expectedFirstAssignment =
Collections.singletonList("TEST-TOPIC-1");
+ // we compare lists of subscribed topics and hence requiring the
order as well; this is guaranteed
+ // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only
add TEST-TOPIC-2 so the list is always
+ // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429
behavior ever changed it may become a flaky test
+ final List<String> expectedSecondAssignment =
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
- CLUSTER.createTopic("TEST-TOPIC-1");
+ CLUSTER.createTopic("TEST-TOPIC-1");
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
- pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
- final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- streams = new KafkaStreams(builder.build(), streamsConfiguration, new
DefaultKafkaClientSupplier() {
- @Override
- public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
- return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
- @Override
- public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
- super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
- }
- };
+ pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
+ final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+ streams = new KafkaStreams(builder.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
+ @Override
+ public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
+ return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+ @Override
+ public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
+ super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
+ }
+ };
- }
- });
+ }
+ });
- streams.start();
- TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+ streams.start();
+ TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
- CLUSTER.createTopic("TEST-TOPIC-2");
+ CLUSTER.createTopic("TEST-TOPIC-2");
- TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
- streams.close();
- CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+ streams.close();
+ } finally {
+ CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+ }
}
@Test
public void testRegexRecordsAreProcessedAfterReassignment() throws
Exception {
final String topic1 = "TEST-TOPIC-1";
- CLUSTER.createTopic(topic1);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
- pattern1Stream.to(outputTopic, Produced.with(Serdes.String(),
Serdes.String()));
- streams = new KafkaStreams(builder.build(), streamsConfiguration);
final String topic2 = "TEST-TOPIC-2";
- streams.start();
- CLUSTER.createTopic(topic2);
+ try {
+ CLUSTER.createTopic(topic1);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ pattern1Stream.to(outputTopic, Produced.with(Serdes.String(),
Serdes.String()));
+ streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ streams.start();
- final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
- final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
- IntegrationTestUtils.produceKeyValuesSynchronously(
+ CLUSTER.createTopic(topic2);
+
+ final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+ final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
+ IntegrationTestUtils.produceKeyValuesSynchronously(
topic1,
Collections.singletonList(record1),
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
CLUSTER.time
- );
- IntegrationTestUtils.produceKeyValuesSynchronously(
+ );
+ IntegrationTestUtils.produceKeyValuesSynchronously(
topic2,
Collections.singletonList(record2),
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
CLUSTER.time
- );
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ );
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class),
outputTopic,
Arrays.asList(record1, record2)
- );
+ );
- streams.close();
- CLUSTER.deleteTopicsAndWait(topic1, topic2);
+ streams.close();
+ } finally {
+ CLUSTER.deleteTopicsAndWait(topic1, topic2);
+ }
}
private String createTopic(final int suffix) throws InterruptedException {
@@ -229,44 +238,44 @@ public class RegexSourceIntegrationTest {
@Test
public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
-
final Serde<String> stringSerde = Serdes.String();
final List<String> expectedFirstAssignment =
Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
final List<String> expectedSecondAssignment =
Collections.singletonList("TEST-TOPIC-B");
+ final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
+ try {
+ CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
- pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
-
- final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- streams = new KafkaStreams(builder.build(), streamsConfiguration, new
DefaultKafkaClientSupplier() {
- @Override
- public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
- return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
- @Override
- public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
- super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
- }
- };
- }
- });
+ pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
+ streams = new KafkaStreams(builder.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
+ @Override
+ public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
+ return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+ @Override
+ public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
+ super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
+ }
+ };
+ }
+ });
- streams.start();
- TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
- CLUSTER.deleteTopic("TEST-TOPIC-A");
+ streams.start();
+ TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+ } finally {
+ CLUSTER.deleteTopic("TEST-TOPIC-A");
+ }
TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
}
@Test
- public void shouldAddStateStoreToRegexDefinedSource() throws
InterruptedException {
-
+ public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
final ProcessorSupplier<String, String> processorSupplier = new
MockProcessorSupplier<>();
final StoreBuilder<KeyValueStore<Object, Object>> storeBuilder = new
MockKeyValueStoreBuilder("testStateStore", false);
final long thirtySecondTimeout = 30 * 1000;
@@ -277,26 +286,19 @@ public class RegexSourceIntegrationTest {
topology.addStateStore(storeBuilder, "my-processor");
streams = new KafkaStreams(topology, streamsConfiguration);
+ streams.start();
- try {
- streams.start();
-
- final TestCondition stateStoreNameBoundToSourceTopic = () -> {
- final Map<String, List<String>> stateStoreToSourceTopic =
topology.getInternalBuilder().stateStoreNameToSourceTopics();
- final List<String> topicNamesList =
stateStoreToSourceTopic.get("testStateStore");
- return topicNamesList != null && !topicNamesList.isEmpty() &&
topicNamesList.get(0).equals("topic-1");
- };
-
- TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic,
thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store:
[testStateStore]");
+ final TestCondition stateStoreNameBoundToSourceTopic = () -> {
+ final Map<String, List<String>> stateStoreToSourceTopic =
topology.getInternalBuilder().stateStoreNameToSourceTopics();
+ final List<String> topicNamesList =
stateStoreToSourceTopic.get("testStateStore");
+ return topicNamesList != null && !topicNamesList.isEmpty() &&
topicNamesList.get(0).equals("topic-1");
+ };
- } finally {
- streams.close();
- }
+ TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic,
thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store:
[testStateStore]");
}
@Test
public void testShouldReadFromRegexAndNamedTopics() throws Exception {
-
final String topic1TestMessage = "topic-1 test";
final String topic2TestMessage = "topic-2 test";
final String topicATestMessage = "topic-A test";
@@ -346,7 +348,6 @@ public class RegexSourceIntegrationTest {
@Test
public void testMultipleConsumersCanReadFromPartitionedTopic() throws
Exception {
-
KafkaStreams partitionedStreamsLeader = null;
KafkaStreams partitionedStreamsFollower = null;
try {
@@ -401,12 +402,10 @@ public class RegexSourceIntegrationTest {
partitionedStreamsFollower.close();
}
}
-
}
@Test
public void testNoMessagesSentExceptionFromOverlappingPatterns() throws
Exception {
-
final String fMessage = "fMessage";
final String fooMessage = "fooMessage";
final Serde<String> stringSerde = Serdes.String();