Repository: kafka Updated Branches: refs/heads/0.10.2 05c13552f -> 767cc31a8
KAFKA-4965: set internal.leave.group.on.close to false in StreamsConfig Backport from trunk Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3032 from dguy/kafka-4965-bp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/767cc31a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/767cc31a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/767cc31a Branch: refs/heads/0.10.2 Commit: 767cc31a8929efbcc827a3724700090330f948e6 Parents: 05c1355 Author: Damian Guy <[email protected]> Authored: Fri May 12 16:54:41 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 16:54:41 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../java/org/apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++ .../streams/integration/GlobalKTableIntegrationTest.java | 1 + .../integration/InternalTopicIntegrationTest.java | 1 + .../kafka/streams/integration/JoinIntegrationTest.java | 1 + .../KStreamAggregationDedupIntegrationTest.java | 1 + .../integration/KStreamAggregationIntegrationTest.java | 1 + .../integration/KStreamKTableJoinIntegrationTest.java | 1 + .../streams/integration/KStreamRepartitionJoinTest.java | 1 + .../KStreamsFineGrainedAutoResetIntegrationTest.java | 1 + .../integration/KTableKTableJoinIntegrationTest.java | 1 + .../integration/QueryableStateIntegrationTest.java | 2 +- .../streams/integration/RegexSourceIntegrationTest.java | 11 +++++++---- .../kafka/streams/integration/ResetIntegrationTest.java | 1 + .../streams/integration/utils/IntegrationTestUtils.java | 1 + 15 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bc73f24..d9c5f26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -405,6 +405,7 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); // MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when // streams is recovering data from state stores. We may set it to Integer.MAX_VALUE since // the streams code itself catches most exceptions and acts accordingly without needing @@ -412,7 +413,6 @@ public class StreamsConfig extends AbstractConfig { // are losing the ability to detect them by setting this value to large. Hopefully // deadlocks happen very rarely or never. tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); - CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index f03bed9..15cc1af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; +import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; @@ -37,6 +38,7 @@ import java.util.Properties; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -257,6 +259,13 @@ public class StreamsConfigTest { streamsConfig.getRestoreConsumerConfigs("client"); } + @Test + public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception { + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client"); + assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false)); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) { http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index a844a3c..d6a3c90 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -94,6 +94,7 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore); stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream); table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table"); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 20556c4..be6a5da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -89,6 +89,7 @@ public class InternalTopicIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } private Properties getTopicConfigProperties(final String changelog) { http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 0c052f5..fc68516 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -127,6 +127,7 @@ public class JoinIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), 30000, http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index f2a767c..5e7d02b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -86,6 +86,7 @@ public class KStreamAggregationDedupIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper(); stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index f6708a7..ce30212 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -102,6 +102,7 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper(); stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 0a16494..7ad5710 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -92,6 +92,7 @@ public class KStreamKTableJoinIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 43e5d87..ebe5114 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -105,6 +105,7 @@ public class KStreamRepartitionJoinTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java index fe73b73..fc4d218 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -131,6 +131,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest { Properties props = new Properties(); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "testAutoOffsetId", http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 322dd59..3574b21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -139,6 +139,7 @@ public class KTableKTableJoinIntegrationTest { streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 0e4cdc7..64e8459 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -143,7 +143,7 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // override this to make the rebalances happen quickly + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); stringComparator = new Comparator<KeyValue<String, String>>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 0ea36ea..a5ec8db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -111,10 +111,13 @@ public class RegexSourceIntegrationTest { @Before public void setUp() { - - streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(), - STRING_SERDE_CLASSNAME, - STRING_SERDE_CLASSNAME); + final Properties properties = new Properties(); + properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", + CLUSTER.bootstrapServers(), + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME, + properties); } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index d653d00..f032f5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -277,6 +277,7 @@ public class ResetIntegrationTest { streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); http://git-wip-us.apache.org/repos/asf/kafka/blob/767cc31a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 08e22cc..38ef64d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -55,6 +55,7 @@ public class IntegrationTestUtils { public static final int UNLIMITED_MESSAGES = -1; public static final long DEFAULT_TIMEOUT = 30 * 1000L; + public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; /** * Returns up to `maxMessages` message-values from the topic.
