This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 708901a  MINOR: Add ability to wait for all instances in an 
application to be RUNNING (#7500)
708901a is described below

commit 708901acf1aa64655cb21387d326b8da3620f297
Author: Chris Pettitt <[email protected]>
AuthorDate: Thu Oct 17 11:33:08 2019 -0600

    MINOR: Add ability to wait for all instances in an application to be 
RUNNING (#7500)
    
    Reviewers: Matthias J. Sax <[email protected]>, A. Sophie Blee-Goldman 
<[email protected]>, Guozhang Wang <[email protected]>
---
 .../OptimizedKTableIntegrationTest.java            | 68 +++--------------
 .../integration/utils/CompositeStateListener.java  | 50 ++++++++++++
 .../integration/utils/IntegrationTestUtils.java    | 89 ++++++++++++++++++++++
 3 files changed, 149 insertions(+), 58 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 767a5a9..de0ea39 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -16,26 +16,22 @@
  */
 package org.apache.kafka.streams.integration;
 
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.fail;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -46,7 +42,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -76,9 +71,7 @@ public class OptimizedKTableIntegrationTest {
     @Rule
     public final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    private final Map<KafkaStreams, State> kafkaStreamsStates = new 
HashMap<>();
-    private final Lock kafkaStreamsStatesLock = new ReentrantLock();
-    private final Condition kafkaStreamsStateUpdate = 
kafkaStreamsStatesLock.newCondition();
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
     private final MockTime mockTime = cluster.time;
 
     @Before
@@ -88,7 +81,7 @@ public class OptimizedKTableIntegrationTest {
 
     @After
     public void after() {
-        for (final KafkaStreams kafkaStreams : kafkaStreamsStates.keySet()) {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
             kafkaStreams.close();
         }
     }
@@ -116,9 +109,8 @@ public class OptimizedKTableIntegrationTest {
         final AtomicLong restoreStartOffset = new AtomicLong(-1);
         kafkaStreamsList.forEach(kafkaStreams -> {
             
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset,
 new AtomicLong()));
-            kafkaStreams.start();
         });
-        waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, 
TimeUnit.SECONDS);
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(numMessages, 60, TimeUnit.SECONDS), 
is(equalTo(true)));
@@ -150,9 +142,8 @@ public class OptimizedKTableIntegrationTest {
         final AtomicLong restoreEndOffset = new AtomicLong(-1L);
         kafkaStreamsList.forEach(kafkaStreams -> {
             
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset,
 restoreEndOffset));
-            kafkaStreams.start();
         });
-        waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, 
TimeUnit.SECONDS);
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
 
         produceValueRange(key, 0, batch1NumMessages);
 
@@ -226,49 +217,10 @@ public class OptimizedKTableIntegrationTest {
             mockTime);
     }
 
-    private void waitForKafkaStreamssToEnterRunningState(final 
Collection<KafkaStreams> kafkaStreamss,
-                                                         final long time,
-                                                         final TimeUnit 
timeUnit) throws InterruptedException {
-
-        final long expectedEnd = System.currentTimeMillis() + 
timeUnit.toMillis(time);
-
-        kafkaStreamsStatesLock.lock();
-        try {
-            while (!kafkaStreamss.stream().allMatch(kafkaStreams -> 
kafkaStreamsStates.get(kafkaStreams) == State.RUNNING)) {
-                if (expectedEnd <= System.currentTimeMillis()) {
-                    fail("one or more kafkaStreamss did not enter RUNNING in a 
timely manner");
-                }
-                final long millisRemaining = Math.max(1, expectedEnd - 
System.currentTimeMillis());
-                kafkaStreamsStateUpdate.await(millisRemaining, 
TimeUnit.MILLISECONDS);
-            }
-        } finally {
-            kafkaStreamsStatesLock.unlock();
-        }
-    }
-
     private KafkaStreams createKafkaStreams(final StreamsBuilder builder, 
final Properties config) {
-        final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(config), config);
-        kafkaStreamsStatesLock.lock();
-        try {
-            kafkaStreamsStates.put(kafkaStreams, kafkaStreams.state());
-        } finally {
-            kafkaStreamsStatesLock.unlock();
-        }
-
-        kafkaStreams.setStateListener((newState, oldState) -> {
-            kafkaStreamsStatesLock.lock();
-            try {
-                kafkaStreamsStates.put(kafkaStreams, newState);
-                if (newState == State.RUNNING) {
-                    if (kafkaStreamsStates.values().stream().allMatch(state -> 
state == State.RUNNING)) {
-                        kafkaStreamsStateUpdate.signalAll();
-                    }
-                }
-            } finally {
-                kafkaStreamsStatesLock.unlock();
-            }
-        });
-        return kafkaStreams;
+        final KafkaStreams streams = new KafkaStreams(builder.build(config), 
config);
+        streamsToCleanup.add(streams);
+        return streams;
     }
 
     private StateRestoreListener createTrackingRestoreListener(final 
AtomicLong restoreStartOffset,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
new file mode 100644
index 0000000..8258942
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+
+/**
+ * A {@link StateListener} that holds zero or more listeners internally and 
invokes all of them
+ * when a state transition occurs (i.e. {@link #onChange(State, State)} is 
called). If any listener
+ * throws {@link RuntimeException} or {@link Error} this immediately stops 
execution of listeners
+ * and causes the thrown exception to be raised.
+ */
+public class CompositeStateListener implements StateListener {
+    private final List<StateListener> listeners;
+
+    public CompositeStateListener(final StateListener... listeners) {
+        this(Arrays.asList(listeners));
+    }
+
+    public CompositeStateListener(final Collection<StateListener> 
stateListeners) {
+        this.listeners = Collections.unmodifiableList(new 
ArrayList<>(stateListeners));
+    }
+
+    @Override
+    public void onChange(final State newState, final State oldState) {
+        for (final StateListener listener : listeners) {
+            listener.onChange(newState, oldState);
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6c67a9d..4921c4f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,6 +16,12 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
+import java.lang.reflect.Field;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
@@ -35,6 +41,8 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -70,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
 
 /**
  * Utility functions to make integration testing more convenient.
@@ -731,6 +740,86 @@ public class IntegrationTestUtils {
         });
     }
 
+    /**
+     * Starts the given {@link KafkaStreams} instances and waits for all of 
them to reach the
+     * {@link State#RUNNING} state at the same time. Note that states may 
change between the time
+     * that this method returns and the calling function executes its next 
statement.
+     *
+     * @param streamsList the list of streams instances to run.
+     * @param timeout the time to wait for the streams to all be in @{link 
State#RUNNING} state.
+     */
+    public static void startApplicationAndWaitUntilRunning(final 
List<KafkaStreams> streamsList,
+                                                           final Duration 
timeout) throws InterruptedException {
+        final Lock stateLock = new ReentrantLock();
+        final Condition stateUpdate = stateLock.newCondition();
+        final Map<KafkaStreams, State> stateMap = new HashMap<>();
+        for (final KafkaStreams streams : streamsList) {
+            stateMap.put(streams, streams.state());
+            final StateListener prevStateListener = getStateListener(streams);
+            final StateListener newStateListener = (newState, oldState) -> {
+                stateLock.lock();
+                try {
+                    stateMap.put(streams, newState);
+                    if (newState == State.RUNNING) {
+                        if (stateMap.values().stream().allMatch(state -> state 
== State.RUNNING)) {
+                            stateUpdate.signalAll();
+                        }
+                    }
+                } finally {
+                    stateLock.unlock();
+                }
+            };
+
+            streams.setStateListener(prevStateListener != null
+                ? new CompositeStateListener(prevStateListener, 
newStateListener)
+                : newStateListener);
+        }
+
+        for (final KafkaStreams streams : streamsList) {
+            streams.start();
+        }
+
+        final long expectedEnd = System.currentTimeMillis() + 
timeout.toMillis();
+        stateLock.lock();
+        try {
+            // We use while true here because we want to run this test at 
least once, even if the
+            // timeout has expired
+            while (true) {
+                final Map<KafkaStreams, State> nonRunningStreams = new 
HashMap<>();
+                for (final Entry<KafkaStreams, State> entry : 
stateMap.entrySet()) {
+                    if (entry.getValue() != State.RUNNING) {
+                        nonRunningStreams.put(entry.getKey(), 
entry.getValue());
+                    }
+                }
+
+                if (nonRunningStreams.isEmpty()) {
+                    return;
+                }
+
+                final long millisRemaining = expectedEnd - 
System.currentTimeMillis();
+                if (millisRemaining <= 0) {
+                    fail("Application did not reach a RUNNING state for all 
streams instances. Non-running instances: " +
+                        nonRunningStreams);
+                }
+
+                stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS);
+            }
+        } finally {
+            stateLock.unlock();
+        }
+    }
+
+    private static StateListener getStateListener(final KafkaStreams streams) {
+        try {
+            final Field field = 
streams.getClass().getDeclaredField("stateListener");
+            field.setAccessible(true);
+            return (StateListener) field.get(streams);
+        } catch (final IllegalAccessException | NoSuchFieldException e) {
+            throw new RuntimeException("Failed to get StateListener through 
reflection", e);
+        }
+    }
+
+
     public static <K, V> void verifyKeyValueTimestamps(final Properties 
consumerConfig,
                                                        final String topic,
                                                        final 
List<KeyValueTimestamp<K, V>> expected) {

Reply via email to