Repository: kafka Updated Branches: refs/heads/trunk 374336382 -> faa1803aa
KAFKA-5309: Stores not queryable after one thread died - introduces a new thread state DEAD - ignores DEAD threads when querying Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #3140 from mjsax/kafka-5309-stores-not-queryable Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/faa1803a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/faa1803a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/faa1803a Branch: refs/heads/trunk Commit: faa1803aa35871e5e040a22b7fcec61a2be16e24 Parents: 3743363 Author: Matthias J. Sax <[email protected]> Authored: Fri May 26 09:42:02 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 26 09:42:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 6 +- .../processor/internals/StreamThread.java | 59 ++++--- .../StreamThreadStateStoreProvider.java | 4 + .../QueryableStateIntegrationTest.java | 164 ++++++++++++++----- .../processor/internals/StreamThreadTest.java | 2 +- 5 files changed, 166 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/faa1803a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3b801a9..6da22ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -265,7 +265,11 @@ public class KafkaStreams { public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) { - threadState.put(thread.getId(), newState); + if (newState != StreamThread.State.DEAD) { + threadState.put(thread.getId(), newState); + } else { + threadState.remove(thread.getId()); + } if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.ASSIGNING_PARTITIONS) { setState(State.REBALANCING); http://git-wip-us.apache.org/repos/asf/kafka/blob/faa1803a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f16e323..1e73c89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -81,35 +81,40 @@ public class StreamThread extends Thread { * * <pre> * +-------------+ - * | Not Running | <-------+ - * +-----+-------+ | - * | | - * v | - * +-----+-------+ | - * +<--- | Running | <----+ | - * | +-----+-------+ | | - * | | | | - * | v | | - * | +-----+-------+ | | - * +<--- | Partitions | | | - * | | Revoked | | | - * | +-----+-------+ | | - * | | | | - * | v | | - * | +-----+-------+ | | - * | | Assigning | | | - * | | Partitions | ---->+ | - * | +-----+-------+ | - * | | | - * | v | - * | +-----+-------+ | - * +---> | Pending | ------->+ + * | Created | + * +-----+-------+ + * | + * v + * +-----+-------+ + * +<--- | Running | <----+ + * | +-----+-------+ | + * | | | + * | v | + * | +-----+-------+ | + * +<--- | Partitions | | + * | | Revoked | | + * | +-----+-------+ | + * | | | + * | v | + * | +-----+-------+ | + * | | Assigning | | + * | | Partitions | ---->+ + * | +-----+-------+ + * | | + * | v + * | +-----+-------+ + * +---> | Pending | * | Shutdown | + * +-----+-------+ + * | + * v + * +-----+-------+ + * | Dead | * +-------------+ * </pre> */ public enum State { - NOT_RUNNING(1), RUNNING(1, 2, 4), PARTITIONS_REVOKED(3, 4), ASSIGNING_PARTITIONS(1, 4), PENDING_SHUTDOWN(0); + CREATED(1), RUNNING(1, 2, 4), PARTITIONS_REVOKED(3, 4), ASSIGNING_PARTITIONS(1, 4), PENDING_SHUTDOWN(5), DEAD; private final Set<Integer> validTransitions = new HashSet<>(); @@ -118,7 +123,7 @@ public class StreamThread extends Thread { } public boolean isRunning() { - return !equals(PENDING_SHUTDOWN) && !equals(NOT_RUNNING); + return !equals(PENDING_SHUTDOWN) && !equals(CREATED) && !equals(DEAD); } public boolean isValidTransition(final State newState) { @@ -377,7 +382,7 @@ public class StreamThread extends Thread { } - private volatile State state = State.NOT_RUNNING; + private volatile State state = State.CREATED; private StreamThread.StateListener stateListener = null; final PartitionGrouper partitionGrouper; private final StreamsMetadataState streamsMetadataState; @@ -1062,7 +1067,7 @@ public class StreamThread extends Thread { // clean up global tasks log.info("{} Stream thread shutdown complete", logPrefix); - setState(State.NOT_RUNNING); + setState(State.DEAD); streamsMetrics.removeAllSensors(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/faa1803a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 2d7ff82..45d9898 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.state.QueryableStoreType; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -40,6 +41,9 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider { @SuppressWarnings("unchecked") @Override public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { + if (streamThread.state() == StreamThread.State.DEAD) { + return Collections.emptyList(); + } if (!streamThread.isInitialized()) { throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } http://git-wip-us.apache.org/repos/asf/kafka/blob/faa1803a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 509a7fd..d5cea24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueIterator; @@ -70,6 +71,7 @@ import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; @@ -81,7 +83,6 @@ import static org.junit.Assert.fail; @Category({IntegrationTest.class}) public class QueryableStateIntegrationTest { private static final int NUM_BROKERS = 1; - private static final long COMMIT_INTERVAL_MS = 300L; @ClassRule public static final EmbeddedKafkaCluster CLUSTER = @@ -129,10 +130,8 @@ public class QueryableStateIntegrationTest { final String applicationId = "queryable-state-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); @@ -191,11 +190,6 @@ public class QueryableStateIntegrationTest { /** * Creates a typical word count topology - * - * @param inputTopic - * @param outputTopic - * @param streamsConfiguration config - * @return */ private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) { final KStreamBuilder builder = new KStreamBuilder(); @@ -223,7 +217,7 @@ public class QueryableStateIntegrationTest { private class StreamRunnable implements Runnable { private final KafkaStreams myStream; private boolean closed = false; - private KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub(); + private final KafkaStreamsTest.StateListenerStub stateListener = new KafkaStreamsTest.StateListenerStub(); StreamRunnable(final String inputTopic, final String outputTopic, final int queryPort) { final Properties props = (Properties) streamsConfiguration.clone(); @@ -253,7 +247,7 @@ public class QueryableStateIntegrationTest { return myStream; } - public final KafkaStreamsTest.StateListenerStub getStateListener() { + final KafkaStreamsTest.StateListenerStub getStateListener() { return stateListener; } } @@ -309,7 +303,7 @@ public class QueryableStateIntegrationTest { } catch (final IllegalStateException e) { // Kafka Streams instance may have closed but rebalance hasn't happened return false; - } catch (InvalidStateStoreException e) { + } catch (final InvalidStateStoreException e) { // there must have been at least one rebalance state assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1); return false; @@ -423,8 +417,8 @@ public class QueryableStateIntegrationTest { @Test public void shouldBeAbleToQueryFilterState() throws Exception { - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); final KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; final Set<KeyValue<String, Long>> batch1 = new HashSet<>(); @@ -449,7 +443,7 @@ public class QueryableStateIntegrationTest { mockTime); final Predicate<String, Long> filterPredicate = new Predicate<String, Long>() { @Override - public boolean test(String key, Long value) { + public boolean test(final String key, final Long value) { return key.contains("kafka"); } }; @@ -489,8 +483,8 @@ public class QueryableStateIntegrationTest { @Test public void shouldBeAbleToQueryMapValuesState() throws Exception { - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; final Set<KeyValue<String, String>> batch1 = new HashSet<>(); @@ -514,7 +508,7 @@ public class QueryableStateIntegrationTest { final KTable<String, String> t1 = builder.table(streamOne); final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() { @Override - public Long apply(String value) { + public Long apply(final String value) { return Long.valueOf(value); } }, Serdes.Long(), "queryMapValues"); @@ -535,8 +529,8 @@ public class QueryableStateIntegrationTest { @Test public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception { - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; final Set<KeyValue<String, String>> batch1 = new HashSet<>(); @@ -562,7 +556,7 @@ public class QueryableStateIntegrationTest { final Predicate<String, String> filterPredicate = new Predicate<String, String>() { @Override - public boolean test(String key, String value) { + public boolean test(final String key, final String value) { return key.contains("kafka"); } }; @@ -570,7 +564,7 @@ public class QueryableStateIntegrationTest { final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter"); final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() { @Override - public Long apply(String value) { + public Long apply(final String value) { return Long.valueOf(value); } }, Serdes.Long(), "queryMapValues"); @@ -595,7 +589,7 @@ public class QueryableStateIntegrationTest { } } - private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException { + private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); final KStreamBuilder builder = new KStreamBuilder(); final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; @@ -671,17 +665,7 @@ public class QueryableStateIntegrationTest { mockTime); final int maxWaitMs = 30000; - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - try { - kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); - return true; - } catch (InvalidStateStoreException ise) { - return false; - } - } - }, maxWaitMs, "waiting for store " + storeName); + TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); @@ -706,7 +690,7 @@ public class QueryableStateIntegrationTest { try { assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello")); return true; - } catch (InvalidStateStoreException ise) { + } catch (final InvalidStateStoreException ise) { return false; } } @@ -714,6 +698,107 @@ public class QueryableStateIntegrationTest { } + private class WaitForStore implements TestCondition { + private final String storeName; + + WaitForStore(final String storeName) { + this.storeName = storeName; + } + @Override + public boolean conditionMet() { + try { + kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()); + return true; + } catch (final InvalidStateStoreException ise) { + return false; + } + } + } + + @Test + public void shouldAllowToQueryAfterThreadDied() throws Exception { + final AtomicBoolean beforeFailure = new AtomicBoolean(true); + final AtomicBoolean failed = new AtomicBoolean(false); + final String storeName = "store"; + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<String, String> input = builder.stream(streamOne); + input + .groupByKey() + .reduce(new Reducer<String>() { + @Override + public String apply(final String value1, final String value2) { + if (beforeFailure.get() && value1.length() > 1) { + beforeFailure.set(false); + throw new RuntimeException("Injected test exception"); + } + return value1 + value2; + } + }, storeName) + .to(outputTopic); + + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + kafkaStreams = new KafkaStreams(builder, streamsConfiguration); + kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + failed.set(true); + } + }); + kafkaStreams.start(); + + IntegrationTestUtils.produceKeyValuesSynchronously( + streamOne, + Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b", "3"), KeyValue.pair("b", "4")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + mockTime); + + final int maxWaitMs = 30000; + TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); + + final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore()); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return "12".equals(store.get("a")) && "34".equals(store.get("b")); + } + }, maxWaitMs, "wait for agg to be '123'"); + + IntegrationTestUtils.produceKeyValuesSynchronously( + streamOne, + Arrays.asList(KeyValue.pair("a", "5")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + mockTime); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return failed.get(); + } + }, 30000, "wait for thread to fail"); + + TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName); + + final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore()); + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return "125".equals(store2.get("a")) && "34".equals(store2.get("b")); + } + }, maxWaitMs, "wait for agg to be '123'"); + + } + private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount, final ReadOnlyKeyValueStore<String, Long> myCount) { final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator); @@ -783,7 +868,6 @@ public class QueryableStateIntegrationTest { * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, * the method merely inserts the new found key into the list of * expected keys. - * @throws InterruptedException */ private void verifyGreaterOrEqual(final String[] keys, final Map<String, Long> expectedWindowedCount, @@ -888,11 +972,11 @@ public class QueryableStateIntegrationTest { currIteration++; } - public synchronized int getCurrIteration() { + synchronized int getCurrIteration() { return currIteration; } - public synchronized void shutdown() { + synchronized void shutdown() { shutdown = true; } @@ -909,8 +993,8 @@ public class QueryableStateIntegrationTest { new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { while (getCurrIteration() < numIterations && !shutdown) { - for (int i = 0; i < inputValues.size(); i++) { - producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i))); + for (final String value : inputValues) { + producer.send(new ProducerRecord<String, String>(topic, value)); } incrementInteration(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/faa1803a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e5b96ff..e255350 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -363,7 +363,7 @@ public class StreamThreadTest { thread.close(); assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) || - (thread.state() == StreamThread.State.NOT_RUNNING)); + (thread.state() == StreamThread.State.CREATED)); } private final static String TOPIC = "topic";
