Repository: kafka Updated Branches: refs/heads/trunk 81f76bde8 -> f34164eed
MINOR: Fix bugs in KafkaStreams.close() Initially proposed by ijuma in https://github.com/apache/kafka/pull/1362#issuecomment-218293662 mjsax commented: > StreamThread.close() should be extended to call metrics.close() (the class > need a private member to reference the Metrics object, too) The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead. cc guozhangwang Author: Jeff Klukas <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #1379 from jklukas/close-streams-metrics Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f34164ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f34164ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f34164ee Branch: refs/heads/trunk Commit: f34164eed53d791768f05df21f4dfeca89859b2e Parents: 81f76bd Author: Jeff Klukas <[email protected]> Authored: Thu May 12 21:14:51 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu May 12 21:14:51 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 14 ++- .../apache/kafka/streams/KafkaStreamsTest.java | 106 +++++++++++++++++++ 2 files changed, 115 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b3e3f5d..af6d973 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -91,6 +91,7 @@ public class KafkaStreams { private int state = CREATED; private final StreamThread[] threads; + private final Metrics metrics; // processId is expected to be unique across JVMs and to be used // in userData of the subscription request to allow assignor be aware @@ -147,7 +148,7 @@ public class KafkaStreams { .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); - Metrics metrics = new Metrics(metricConfig, reporters, time); + this.metrics = new Metrics(metricConfig, reporters, time); this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; for (int i = 0; i < this.threads.length; i++) { @@ -169,8 +170,10 @@ public class KafkaStreams { state = RUNNING; log.info("Started Kafka Stream process"); - } else { + } else if (state == RUNNING) { throw new IllegalStateException("This process was already started."); + } else { + throw new IllegalStateException("Cannot restart after closing."); } } @@ -194,13 +197,14 @@ public class KafkaStreams { Thread.interrupted(); } } + } + if (state != STOPPED) { + metrics.close(); state = STOPPED; - log.info("Stopped Kafka Stream process"); - } else { - throw new IllegalStateException("This process has not started yet."); } + } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java new file mode 100644 index 0000000..22d8bf2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.test.MockMetricsReporter; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Properties; + +public class KafkaStreamsTest { + + @Test + public void testStartAndClose() throws Exception { + Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); + + KStreamBuilder builder = new KStreamBuilder(); + KafkaStreams streams = new KafkaStreams(builder, props); + + streams.start(); + final int newInitCount = MockMetricsReporter.INIT_COUNT.get(); + final int initCountDifference = newInitCount - oldInitCount; + Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0); + + streams.close(); + Assert.assertEquals("each reporter initialized should also be closed", + oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get()); + } + + @Test + public void testCloseIsIdempotent() throws Exception { + Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + + KStreamBuilder builder = new KStreamBuilder(); + KafkaStreams streams = new KafkaStreams(builder, props); + streams.close(); + final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); + + streams.close(); + Assert.assertEquals("subsequent close() calls should do nothing", + closeCount, MockMetricsReporter.CLOSE_COUNT.get()); + } + + @Test + public void testCannotStartOnceClosed() throws Exception { + Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + KStreamBuilder builder = new KStreamBuilder(); + KafkaStreams streams = new KafkaStreams(builder, props); + streams.close(); + + try { + streams.start(); + } catch (IllegalStateException e) { + Assert.assertEquals("Cannot restart after closing.", e.getMessage()); + return; + } + Assert.fail("should have caught an exception and returned"); + } + + @Test + public void testCannotStartTwice() throws Exception { + Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + KStreamBuilder builder = new KStreamBuilder(); + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + try { + streams.start(); + } catch (IllegalStateException e) { + Assert.assertEquals("This process was already started.", e.getMessage()); + return; + } + Assert.fail("should have caught an exception and returned"); + } +}
