This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 708901a MINOR: Add ability to wait for all instances in an
application to be RUNNING (#7500)
708901a is described below
commit 708901acf1aa64655cb21387d326b8da3620f297
Author: Chris Pettitt <[email protected]>
AuthorDate: Thu Oct 17 11:33:08 2019 -0600
MINOR: Add ability to wait for all instances in an application to be
RUNNING (#7500)
Reviewers: Matthias J. Sax <[email protected]>, A. Sophie Blee-Goldman
<[email protected]>, Guozhang Wang <[email protected]>
---
.../OptimizedKTableIntegrationTest.java | 68 +++--------------
.../integration/utils/CompositeStateListener.java | 50 ++++++++++++
.../integration/utils/IntegrationTestUtils.java | 89 ++++++++++++++++++++++
3 files changed, 149 insertions(+), 58 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 767a5a9..de0ea39 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -16,26 +16,22 @@
*/
package org.apache.kafka.streams.integration;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.fail;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -46,7 +42,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -76,9 +71,7 @@ public class OptimizedKTableIntegrationTest {
@Rule
public final EmbeddedKafkaCluster cluster = new
EmbeddedKafkaCluster(NUM_BROKERS);
- private final Map<KafkaStreams, State> kafkaStreamsStates = new
HashMap<>();
- private final Lock kafkaStreamsStatesLock = new ReentrantLock();
- private final Condition kafkaStreamsStateUpdate =
kafkaStreamsStatesLock.newCondition();
+ private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
@Before
@@ -88,7 +81,7 @@ public class OptimizedKTableIntegrationTest {
@After
public void after() {
- for (final KafkaStreams kafkaStreams : kafkaStreamsStates.keySet()) {
+ for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
}
}
@@ -116,9 +109,8 @@ public class OptimizedKTableIntegrationTest {
final AtomicLong restoreStartOffset = new AtomicLong(-1);
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset,
new AtomicLong()));
- kafkaStreams.start();
});
- waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60,
TimeUnit.SECONDS);
+ startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(numMessages, 60, TimeUnit.SECONDS),
is(equalTo(true)));
@@ -150,9 +142,8 @@ public class OptimizedKTableIntegrationTest {
final AtomicLong restoreEndOffset = new AtomicLong(-1L);
kafkaStreamsList.forEach(kafkaStreams -> {
kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset,
restoreEndOffset));
- kafkaStreams.start();
});
- waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60,
TimeUnit.SECONDS);
+ startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
produceValueRange(key, 0, batch1NumMessages);
@@ -226,49 +217,10 @@ public class OptimizedKTableIntegrationTest {
mockTime);
}
- private void waitForKafkaStreamssToEnterRunningState(final
Collection<KafkaStreams> kafkaStreamss,
- final long time,
- final TimeUnit
timeUnit) throws InterruptedException {
-
- final long expectedEnd = System.currentTimeMillis() +
timeUnit.toMillis(time);
-
- kafkaStreamsStatesLock.lock();
- try {
- while (!kafkaStreamss.stream().allMatch(kafkaStreams ->
kafkaStreamsStates.get(kafkaStreams) == State.RUNNING)) {
- if (expectedEnd <= System.currentTimeMillis()) {
- fail("one or more kafkaStreamss did not enter RUNNING in a
timely manner");
- }
- final long millisRemaining = Math.max(1, expectedEnd -
System.currentTimeMillis());
- kafkaStreamsStateUpdate.await(millisRemaining,
TimeUnit.MILLISECONDS);
- }
- } finally {
- kafkaStreamsStatesLock.unlock();
- }
- }
-
private KafkaStreams createKafkaStreams(final StreamsBuilder builder,
final Properties config) {
- final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(config), config);
- kafkaStreamsStatesLock.lock();
- try {
- kafkaStreamsStates.put(kafkaStreams, kafkaStreams.state());
- } finally {
- kafkaStreamsStatesLock.unlock();
- }
-
- kafkaStreams.setStateListener((newState, oldState) -> {
- kafkaStreamsStatesLock.lock();
- try {
- kafkaStreamsStates.put(kafkaStreams, newState);
- if (newState == State.RUNNING) {
- if (kafkaStreamsStates.values().stream().allMatch(state ->
state == State.RUNNING)) {
- kafkaStreamsStateUpdate.signalAll();
- }
- }
- } finally {
- kafkaStreamsStatesLock.unlock();
- }
- });
- return kafkaStreams;
+ final KafkaStreams streams = new KafkaStreams(builder.build(config),
config);
+ streamsToCleanup.add(streams);
+ return streams;
}
private StateRestoreListener createTrackingRestoreListener(final
AtomicLong restoreStartOffset,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
new file mode 100644
index 0000000..8258942
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/CompositeStateListener.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+
+/**
+ * A {@link StateListener} that holds zero or more listeners internally and
invokes all of them
+ * when a state transition occurs (i.e. {@link #onChange(State, State)} is
called). If any listener
+ * throws {@link RuntimeException} or {@link Error} this immediately stops
execution of listeners
+ * and causes the thrown exception to be raised.
+ */
+public class CompositeStateListener implements StateListener {
+ private final List<StateListener> listeners;
+
+ public CompositeStateListener(final StateListener... listeners) {
+ this(Arrays.asList(listeners));
+ }
+
+ public CompositeStateListener(final Collection<StateListener>
stateListeners) {
+ this.listeners = Collections.unmodifiableList(new
ArrayList<>(stateListeners));
+ }
+
+ @Override
+ public void onChange(final State newState, final State oldState) {
+ for (final StateListener listener : listeners) {
+ listener.onChange(newState, oldState);
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6c67a9d..4921c4f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,6 +16,12 @@
*/
package org.apache.kafka.streams.integration.utils;
+import java.lang.reflect.Field;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
@@ -35,6 +41,8 @@ import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@@ -70,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
/**
* Utility functions to make integration testing more convenient.
@@ -731,6 +740,86 @@ public class IntegrationTestUtils {
});
}
+ /**
+ * Starts the given {@link KafkaStreams} instances and waits for all of
them to reach the
+ * {@link State#RUNNING} state at the same time. Note that states may
change between the time
+ * that this method returns and the calling function executes its next
statement.
+ *
+ * @param streamsList the list of streams instances to run.
+ * @param timeout the time to wait for the streams to all be in @{link
State#RUNNING} state.
+ */
+ public static void startApplicationAndWaitUntilRunning(final
List<KafkaStreams> streamsList,
+ final Duration
timeout) throws InterruptedException {
+ final Lock stateLock = new ReentrantLock();
+ final Condition stateUpdate = stateLock.newCondition();
+ final Map<KafkaStreams, State> stateMap = new HashMap<>();
+ for (final KafkaStreams streams : streamsList) {
+ stateMap.put(streams, streams.state());
+ final StateListener prevStateListener = getStateListener(streams);
+ final StateListener newStateListener = (newState, oldState) -> {
+ stateLock.lock();
+ try {
+ stateMap.put(streams, newState);
+ if (newState == State.RUNNING) {
+ if (stateMap.values().stream().allMatch(state -> state
== State.RUNNING)) {
+ stateUpdate.signalAll();
+ }
+ }
+ } finally {
+ stateLock.unlock();
+ }
+ };
+
+ streams.setStateListener(prevStateListener != null
+ ? new CompositeStateListener(prevStateListener,
newStateListener)
+ : newStateListener);
+ }
+
+ for (final KafkaStreams streams : streamsList) {
+ streams.start();
+ }
+
+ final long expectedEnd = System.currentTimeMillis() +
timeout.toMillis();
+ stateLock.lock();
+ try {
+ // We use while true here because we want to run this test at
least once, even if the
+ // timeout has expired
+ while (true) {
+ final Map<KafkaStreams, State> nonRunningStreams = new
HashMap<>();
+ for (final Entry<KafkaStreams, State> entry :
stateMap.entrySet()) {
+ if (entry.getValue() != State.RUNNING) {
+ nonRunningStreams.put(entry.getKey(),
entry.getValue());
+ }
+ }
+
+ if (nonRunningStreams.isEmpty()) {
+ return;
+ }
+
+ final long millisRemaining = expectedEnd -
System.currentTimeMillis();
+ if (millisRemaining <= 0) {
+ fail("Application did not reach a RUNNING state for all
streams instances. Non-running instances: " +
+ nonRunningStreams);
+ }
+
+ stateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ private static StateListener getStateListener(final KafkaStreams streams) {
+ try {
+ final Field field =
streams.getClass().getDeclaredField("stateListener");
+ field.setAccessible(true);
+ return (StateListener) field.get(streams);
+ } catch (final IllegalAccessException | NoSuchFieldException e) {
+ throw new RuntimeException("Failed to get StateListener through
reflection", e);
+ }
+ }
+
+
public static <K, V> void verifyKeyValueTimestamps(final Properties
consumerConfig,
final String topic,
final
List<KeyValueTimestamp<K, V>> expected) {