Repository: kafka Updated Branches: refs/heads/0.10.2 9eb0cdb54 -> 29214d336
KAFKA-4881: add internal.leave.group.config to consumer Backport from https://github.com/apache/kafka/pull/2650 Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3025 from dguy/kafka-4881-bp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29214d33 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29214d33 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29214d33 Branch: refs/heads/0.10.2 Commit: 29214d33634ef613618ef7ad32ad08eae9f83a40 Parents: 9eb0cdb Author: Damian Guy <[email protected]> Authored: Thu May 11 23:26:52 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu May 11 23:26:52 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 18 ++++- .../kafka/clients/consumer/KafkaConsumer.java | 31 ++++----- .../consumer/internals/AbstractCoordinator.java | 9 ++- .../consumer/internals/ConsumerCoordinator.java | 20 +++--- .../apache/kafka/common/config/ConfigDef.java | 35 ++++++++-- .../clients/consumer/KafkaConsumerTest.java | 3 +- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 69 +++++++++++--------- .../kafka/common/config/ConfigDefTest.java | 22 +++++++ .../runtime/distributed/WorkerCoordinator.java | 17 ++--- 10 files changed, 153 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 1b33517..4d66f21 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -218,7 +218,18 @@ public class ConsumerConfig extends AbstractConfig { private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. " + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it."; public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; - + + /** + * <code>internal.leave.group.on.close</code> + * Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance + * won't occur until <code>session.timeout.ms</code> expires. + * + * <p> + * Note: this is an internal configuration and could be changed in the future in a backward incompatible way + * + */ + static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -386,7 +397,10 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) - + .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, + Type.BOOLEAN, + true, + Importance.LOW) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 64d64ba..e54c5a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -674,21 +674,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); this.coordinator = new ConsumerCoordinator(this.client, - config.getString(ConsumerConfig.GROUP_ID_CONFIG), - config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), - config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), - assignors, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - this.time, - retryBackoffMs, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), - config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); + config.getString(ConsumerConfig.GROUP_ID_CONFIG), + config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), + config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), + assignors, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + this.time, + retryBackoffMs, + config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), + config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); this.fetcher = new Fetcher<>(this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b72769e..d24bde4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -98,6 +98,7 @@ public abstract class AbstractCoordinator implements Closeable { protected final int rebalanceTimeoutMs; private final int sessionTimeoutMs; + private final boolean leaveGroupOnClose; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; protected final String groupId; @@ -126,12 +127,14 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs) { + long retryBackoffMs, + boolean leaveGroupOnClose) { this.client = client; this.time = time; this.groupId = groupId; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs; + this.leaveGroupOnClose = leaveGroupOnClose; this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; @@ -673,7 +676,9 @@ public abstract class AbstractCoordinator implements Closeable { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - maybeLeaveGroup(); + if (leaveGroupOnClose) { + maybeLeaveGroup(); + } // At this point, there may be pending commits (async commits or sync commits that were // interrupted using wakeup) and the leave group request which have been queued, but not http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2e37636..7f62489 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -104,16 +104,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, - boolean excludeInternalTopics) { + boolean excludeInternalTopics, + final boolean leaveGroupOnClose) { super(client, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - metricGrpPrefix, - time, - retryBackoffMs); + groupId, + rebalanceTimeoutMs, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + time, + retryBackoffMs, + leaveGroupOnClose); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch()); this.subscriptions = subscriptions; http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 5257f6e..312205f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -124,7 +124,7 @@ public class ConfigDef { */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) { - return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender)); + return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false)); } /** @@ -374,6 +374,19 @@ public class ConfigDef { } /** + * Define a new internal configuration. Internal configuration won't show up in the docs and aren't + * intended for general use. + * @param name The name of the config parameter + * @param type The type of the config + * @param defaultValue The default value to use if this config isn't present + * @param importance + * @return This ConfigDef so you can chain calls + */ + public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) { + return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.<String>emptyList(), null, true)); + } + + /** * Get the configuration keys * @return a map containing all configuration keys */ @@ -890,11 +903,13 @@ public class ConfigDef { public final String displayName; public final List<String> dependents; public final Recommender recommender; + public final boolean internalConfig; public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, - List<String> dependents, Recommender recommender) { + List<String> dependents, Recommender recommender, + boolean internalConfig) { this.name = name; this.type = type; this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); @@ -909,6 +924,7 @@ public class ConfigDef { this.width = width; this.displayName = displayName; this.recommender = recommender; + this.internalConfig = internalConfig; } public boolean hasDefault() { @@ -961,6 +977,10 @@ public class ConfigDef { } b.append("</tr>\n"); for (ConfigKey def : configs) { + if (def.internalConfig) { + continue; + } + b.append("<tr>\n"); // print column values for (String headerName : headers()) { @@ -981,6 +1001,9 @@ public class ConfigDef { public String toRst() { StringBuilder b = new StringBuilder(); for (ConfigKey def : sortedConfigs()) { + if (def.internalConfig) { + continue; + } getConfigKeyRst(def, b); b.append("\n"); } @@ -996,10 +1019,12 @@ public class ConfigDef { String lastKeyGroupName = ""; for (ConfigKey def : sortedConfigs()) { + if (def.internalConfig) { + continue; + } if (def.group != null) { if (!lastKeyGroupName.equalsIgnoreCase(def.group)) { b.append(def.group).append("\n"); - char[] underLine = new char[def.group.length()]; Arrays.fill(underLine, '^'); b.append(new String(underLine)).append("\n\n"); @@ -1104,8 +1129,8 @@ public class ConfigDef { key.width, key.displayName, embeddedDependents(keyPrefix, key.dependents), - embeddedRecommender(keyPrefix, key.recommender) - )); + embeddedRecommender(keyPrefix, key.recommender), + key.internalConfig)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f2905a9..369cb30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1508,7 +1508,8 @@ public class KafkaConsumerTest { autoCommitEnabled, autoCommitIntervalMs, interceptors, - excludeInternalTopics); + excludeInternalTopics, + true); Fetcher<String, String> fetcher = new Fetcher<>( consumerClient, http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 8846b5e..e6165cf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -492,7 +492,7 @@ public class AbstractCoordinatorTest { Metrics metrics, Time time) { super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, - METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS); + METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 66fe76d..501db2d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -129,7 +129,7 @@ public class ConsumerCoordinatorTest { this.partitionAssignor.clear(); client.setNode(node); - this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled); + this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true); } @After @@ -861,7 +861,7 @@ public class ConsumerCoordinatorTest { @Test public void testIncludeInternalTopicsConfigOption() { - coordinator = buildCoordinator(new Metrics(), assignors, false, false); + coordinator = buildCoordinator(new Metrics(), assignors, false, false, true); subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds()); @@ -955,7 +955,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -980,7 +980,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -1007,7 +1007,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -1025,7 +1025,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(roundRobin.name(), metadata.get(0).name()); @@ -1385,7 +1385,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(range.name(), metadata.get(0).name()); @@ -1395,19 +1395,25 @@ public class ConsumerCoordinatorTest { @Test public void testCloseDynamicAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); gracefulCloseTest(coordinator, true); } @Test public void testCloseManualAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true); + gracefulCloseTest(coordinator, false); + } + + @Test + public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception { + final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false); gracefulCloseTest(coordinator, false); } @Test public void testCloseCoordinatorNotKnownManualAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1415,14 +1421,14 @@ public class ConsumerCoordinatorTest { @Test public void testCloseCoordinatorNotKnownNoCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); } @Test public void testCloseCoordinatorNotKnownWithCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1430,14 +1436,14 @@ public class ConsumerCoordinatorTest { @Test public void testCloseCoordinatorUnavailableNoCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); } @Test public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1445,7 +1451,7 @@ public class ConsumerCoordinatorTest { @Test public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); @@ -1453,20 +1459,20 @@ public class ConsumerCoordinatorTest { @Test public void testCloseNoResponseForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); } @Test public void testCloseNoResponseForLeaveGroup() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); } @Test public void testCloseNoWait() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 0, 60000, 0, 0); } @@ -1474,7 +1480,7 @@ public class ConsumerCoordinatorTest { @Test public void testHeartbeatThreadClose() throws Exception { groupId = "testCloseTimeoutWithHeartbeatThread"; - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); coordinator.ensureActiveGroup(); time.sleep(heartbeatIntervalMs + 100); Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat @@ -1485,10 +1491,12 @@ public class ConsumerCoordinatorTest { assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); } - private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit) { + private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement, + final boolean autoCommit, + final boolean leaveGroup) { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); if (useGroupManagement) { @@ -1547,7 +1555,7 @@ public class ConsumerCoordinatorTest { } } - private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean dynamicAssignment) throws Exception { + private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception { final AtomicBoolean commitRequested = new AtomicBoolean(); final AtomicBoolean leaveGroupRequested = new AtomicBoolean(); client.prepareResponse(new MockClient.RequestMatcher() { @@ -1569,14 +1577,14 @@ public class ConsumerCoordinatorTest { coordinator.close(); assertTrue("Commit not requested", commitRequested.get()); - if (dynamicAssignment) - assertTrue("Leave group not requested", leaveGroupRequested.get()); + assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get()); } - private ConsumerCoordinator buildCoordinator(Metrics metrics, - List<PartitionAssignor> assignors, - boolean excludeInternalTopics, - boolean autoCommitEnabled) { + private ConsumerCoordinator buildCoordinator(final Metrics metrics, + final List<PartitionAssignor> assignors, + final boolean excludeInternalTopics, + final boolean autoCommitEnabled, + final boolean leaveGroup) { return new ConsumerCoordinator( consumerClient, groupId, @@ -1593,7 +1601,8 @@ public class ConsumerCoordinatorTest { autoCommitEnabled, autoCommitIntervalMs, null, - excludeInternalTopics); + excludeInternalTopics, + leaveGroup); } private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 5a6339e..4305779 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -31,6 +31,7 @@ import java.util.Properties; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; public class ConfigDefTest { @@ -319,6 +320,27 @@ public class ConfigDefTest { } } + @Test + public void testCanAddInternalConfig() throws Exception { + final String configName = "internal.config"; + final ConfigDef configDef = new ConfigDef().defineInternal(configName, Type.STRING, "", Importance.LOW); + final HashMap<String, String> properties = new HashMap<>(); + properties.put(configName, "value"); + final List<ConfigValue> results = configDef.validate(properties); + final ConfigValue configValue = results.get(0); + assertEquals("value", configValue.value()); + assertEquals(configName, configValue.name()); + } + + @Test + public void testInternalConfigDoesntShowUpInDocs() throws Exception { + final String name = "my.config"; + final ConfigDef configDef = new ConfigDef().defineInternal(name, Type.STRING, "", Importance.LOW); + assertFalse(configDef.toHtmlTable().contains("my.config")); + assertFalse(configDef.toEnrichedRst().contains("my.config")); + assertFalse(configDef.toRst().contains("my.config")); + } + private static class IntegerRecommender implements ConfigDef.Recommender { private boolean hasParent; http://git-wip-us.apache.org/repos/asf/kafka/blob/29214d33/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 58525c5..9146925 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -74,14 +74,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos ConfigBackingStore configStorage, WorkerRebalanceListener listener) { super(client, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - metricGrpPrefix, - time, - retryBackoffMs); + groupId, + rebalanceTimeoutMs, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + time, + retryBackoffMs, + true); this.restUrl = restUrl; this.configStorage = configStorage; this.assignmentSnapshot = null;
