This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 293b332  KAFKA-13128: wait for all keys to be fully processed in 
#shouldQueryStoresAfterAddingAndRemovingStreamThread (#11113)
293b332 is described below

commit 293b332c53019f9c3b6640ab33dbaa00831a6ed9
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Jul 23 14:56:46 2021 -0700

    KAFKA-13128: wait for all keys to be fully processed in 
#shouldQueryStoresAfterAddingAndRemovingStreamThread (#11113)
    
    This test is flaky due to waiting on all records to be processed for only a 
single key before issuing IQ lookups and asserting whether data was found.
    
    Reviewers:  Phil Hardwick, Walker Carlson <[email protected]>
---
 .../streams/integration/StoreQueryIntegrationTest.java | 18 +++++++-----------
 1 file changed, 7 insertions(+), 11 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index d764ed7..0452206 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -64,6 +65,7 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.ge
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -73,7 +75,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
+import static java.util.Collections.singletonList;
 
 @Category({IntegrationTest.class})
 public class StoreQueryIntegrationTest {
@@ -421,26 +423,20 @@ public class StoreQueryIntegrationTest {
         final int key2 = 2;
         final int key3 = 3;
         final Semaphore semaphore = new Semaphore(0);
-        final int numStreamThreads = 1;
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
                       Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
                           .withCachingDisabled())
                .toStream()
-               .peek((k, v) -> {
-                   if (k.equals(key3)) {
-                       semaphore.release();
-                   }
-               });
+               .peek((k, v) -> semaphore.release());
 
         final Properties streamsConfiguration1 = streamsConfiguration();
-        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
numStreamThreads);
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
 
         final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration1);
-        final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1);
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
+        startApplicationAndWaitUntilRunning(singletonList(kafkaStreams1), 
Duration.ofSeconds(60));
         //Add thread
         final Optional<String> streamThread = kafkaStreams1.addStreamThread();
         assertThat(streamThread.isPresent(), is(true));
@@ -450,7 +446,7 @@ public class StoreQueryIntegrationTest {
         produceValueRange(key3, 0, batch1NumMessages);
 
         // Assert that all messages in the batches were processed in a timely 
manner
-        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
+        assertThat(semaphore.tryAcquire(3 * batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
 
         until(() -> {
             final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();

Reply via email to