This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new af867fb KAFKA-13128: wait for all keys to be fully processed in
#shouldQueryStoresAfterAddingAndRemovingStreamThread (#11113)
af867fb is described below
commit af867fb30fcf729d8c05e98bbb9d3c3f86d4e9f0
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Jul 23 14:56:46 2021 -0700
KAFKA-13128: wait for all keys to be fully processed in
#shouldQueryStoresAfterAddingAndRemovingStreamThread (#11113)
This test is flaky due to waiting on all records to be processed for only a
single key before issuing IQ lookups and asserting whether data was found.
Reviewers: Phil Hardwick, Walker Carlson <[email protected]>
---
.../streams/integration/StoreQueryIntegrationTest.java | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 97dc27c..82dd356 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -63,6 +64,7 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.ge
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -72,7 +74,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
+import static java.util.Collections.singletonList;
@Category({IntegrationTest.class})
public class StoreQueryIntegrationTest {
@@ -419,26 +421,20 @@ public class StoreQueryIntegrationTest {
final int key2 = 2;
final int key3 = 3;
final Semaphore semaphore = new Semaphore(0);
- final int numStreamThreads = 1;
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(),
Serdes.Integer()),
Materialized.<Integer, Integer, KeyValueStore<Bytes,
byte[]>>as(TABLE_NAME)
.withCachingDisabled())
.toStream()
- .peek((k, v) -> {
- if (k.equals(key3)) {
- semaphore.release();
- }
- });
+ .peek((k, v) -> semaphore.release());
final Properties streamsConfiguration1 = streamsConfiguration();
- streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numStreamThreads);
+ streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration1);
- final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1);
- startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
+ startApplicationAndWaitUntilRunning(singletonList(kafkaStreams1),
Duration.ofSeconds(60));
//Add thread
final Optional<String> streamThread = kafkaStreams1.addStreamThread();
assertThat(streamThread.isPresent(), is(true));
@@ -448,7 +444,7 @@ public class StoreQueryIntegrationTest {
produceValueRange(key3, 0, batch1NumMessages);
// Assert that all messages in the batches were processed in a timely
manner
- assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
+ assertThat(semaphore.tryAcquire(3 * batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
until(() -> {
final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>>
queryableStoreType = keyValueStore();