This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 3b729ac KAFKA-8602: Backport bugfix for standby task creation (#7148) 3b729ac is described below commit 3b729ac960392514c68f3e84eff0c51e79a0d06e Author: cadonna <br...@confluent.io> AuthorDate: Tue Aug 6 19:01:23 2019 +0200 KAFKA-8602: Backport bugfix for standby task creation (#7148) Reviewers: Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/StreamThread.java | 22 +-- .../StandbyTaskCreationIntegrationTest.java | 189 +++++++++++++++++++++ .../processor/internals/StreamThreadTest.java | 100 ++++++++++- 3 files changed, 300 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f633395..b269faf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -466,16 +466,18 @@ public class StreamThread extends Thread implements ThreadDataProvider { final ProcessorTopology topology = builder.build(taskId.topicGroupId); - if (!topology.stateStores().isEmpty()) { - return new StandbyTask(taskId, - applicationId, - partitions, - topology, - consumer, - storeChangelogReader, - config, - streamsMetrics, - stateDirectory); + if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) { + return new StandbyTask( + taskId, + applicationId, + partitions, + topology, + consumer, + storeChangelogReader, + config, + streamsMetrics, + stateDirectory + ); } else { log.trace("Skipped standby task {} with assigned partitions {} " + "since it does not have any state stores to materialize", taskId, partitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java new file mode 100644 index 0000000..ed2781f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KafkaStreams.StateListener; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Properties; + +@Category({IntegrationTest.class}) +public class StandbyTaskCreationIntegrationTest { + + private static final int NUM_BROKERS = 1; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + private static final String INPUT_TOPIC = "input-topic"; + + private KafkaStreams client1; + private KafkaStreams client2; + private volatile boolean client1IsOk = false; + private volatile boolean client2IsOk = false; + + @BeforeClass + public static void createTopics() throws InterruptedException { + CLUSTER.createTopic(INPUT_TOPIC, 2, 1); + } + + @After + public void after() { + client1.close(); + client2.close(); + } + + private Properties streamsConfiguration() { + final String applicationId = "testApp"; + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + return streamsConfiguration; + } + + @Test + public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final String stateStoreName = "myTransformState"; + final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), + Serdes.Integer(), + Serdes.Integer()).withLoggingDisabled(); + builder.addStateStore(keyValueStoreBuilder); + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) + .transform(new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() { + @Override + public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() { + return new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) {} + + + @Override + public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { + return null; + } + + @Override + public KeyValue<Integer, Integer> punctuate(final long timestamp) { + return null; + } + + @Override + public void close() {}; + }; + } + }, stateStoreName); + + final Topology topology = builder.build(); + createClients(topology, streamsConfiguration(), topology, streamsConfiguration()); + + setStateListenersForVerification(); + + startClients(); + + waitUntilBothClientAreOK( + "At least one client did not reach state RUNNING with active tasks but no stand-by tasks" + ); + } + + private void createClients(final Topology topology1, + final Properties streamsConfiguration1, + final Topology topology2, + final Properties streamsConfiguration2) { + + client1 = new KafkaStreams(topology1, streamsConfiguration1); + client2 = new KafkaStreams(topology2, streamsConfiguration2); + } + + private void setStateListenersForVerification() { + client1.setStateListener(new StateListener() { + @Override + public void onChange(final State newState, final State oldState) { + if (newState == State.RUNNING) { + client1IsOk = true; + for (final ThreadMetadata metadata : client1.localThreadsMetadata()) { + if (!(metadata.standbyTasks().isEmpty() && !metadata.activeTasks().isEmpty())) { + client1IsOk = false; + } + } + } + } + }); + client2.setStateListener(new StateListener() { + @Override + public void onChange(final State newState, final State oldState) { + if (newState == State.RUNNING) { + client2IsOk = true; + for (final ThreadMetadata metadata : client2.localThreadsMetadata()) { + if (!(metadata.standbyTasks().isEmpty() && !metadata.activeTasks().isEmpty())) { + client2IsOk = false; + } + } + } + } + }); + } + + private void startClients() { + client1.start(); + client2.start(); + } + + private void waitUntilBothClientAreOK(final String message) throws Exception { + TestUtils.waitForCondition( + new TestCondition() { + @Override + public boolean conditionMet() { + return client1IsOk && client2IsOk; + } + }, + 30 * 1000, + message + ": " + + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, " + + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK." + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7e0be76..d64e69a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -21,12 +21,14 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -39,6 +41,11 @@ import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.StreamThread.StreamsMetricsThreadImpl; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; @@ -50,6 +57,7 @@ import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; import java.lang.reflect.Field; import java.util.ArrayList; @@ -67,11 +75,13 @@ import java.util.regex.Pattern; import static java.util.Collections.EMPTY_SET; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1078,6 +1088,94 @@ public class StreamThreadTest { assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); } + private static class MockProcessor implements Processor { + @Override + public void init(final ProcessorContext context) { + + } + + @Override + public void process(final Object key, final Object value) { + + } + + @Override + public void punctuate(final long timestamp) { + + } + + @Override + public void close() { + + } + } + + @Test + public void shouldCreateStandbyTask() { + setupInternalTopologyWithoutState(); + internalTopologyBuilder.addStateStore( + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), null, null), + "processor1" + ); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, not(nullValue())); + } + + @Test + public void shouldNotCreateStandbyTaskWithoutStateStores() { + setupInternalTopologyWithoutState(); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, nullValue()); + } + + + @Test + public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() { + setupInternalTopologyWithoutState(); + final StoreBuilder storeBuilder = + Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), null, null); + storeBuilder.withLoggingDisabled(); + internalTopologyBuilder.addStateStore(storeBuilder, "processor1"); + + final StandbyTask standbyTask = createStandbyTask(); + + assertThat(standbyTask, nullValue()); + } + + private void setupInternalTopologyWithoutState() { + internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1"); + internalTopologyBuilder.addProcessor("processor1", new ProcessorSupplier() { + @Override + public Processor get() { + return new MockProcessor(); + } + }, "source1"); + } + + private StandbyTask createStandbyTask() { + final LogContext logContext = new LogContext("test"); + final Logger log = logContext.logger(StreamThreadTest.class); + final StreamsMetricsThreadImpl streamsMetrics = + new StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap()); + final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator( + internalTopologyBuilder, + config, + streamsMetrics, + stateDirectory, + streamsMetrics.taskCreatedSensor, + new MockChangelogReader(), + mockTime, + log); + return standbyTaskCreator.createTask( + new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST), + new TaskId(1, 2), + Collections.<TopicPartition>emptySet()); + } + @Test public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException { internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");