Repository: kafka Updated Branches: refs/heads/trunk 80df43500 -> cd54fc881
KAFKA-2931: add system test for consumer rolling upgrades Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #619 from hachikuji/KAFKA-2931 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd54fc88 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd54fc88 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd54fc88 Branch: refs/heads/trunk Commit: cd54fc8816964f5a56469075c75c567e777b9656 Parents: 80df435 Author: Jason Gustafson <[email protected]> Authored: Thu Dec 3 21:17:51 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Dec 3 21:17:51 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 14 +-- .../consumer/internals/ConsumerCoordinator.java | 32 ++++--- .../kafka/common/requests/JoinGroupRequest.java | 14 +-- .../internals/ConsumerCoordinatorTest.java | 62 +++++++++---- .../common/requests/RequestResponseTest.java | 4 +- .../runtime/distributed/WorkerCoordinator.java | 10 +-- .../runtime/distributed/WorkerGroupMember.java | 2 - .../distributed/WorkerCoordinatorTest.java | 10 ++- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- tests/kafkatest/services/verifiable_consumer.py | 7 +- .../kafkatest/tests/consumer_rolling_upgrade.py | 94 ++++++++++++++++++++ .../apache/kafka/tools/VerifiableConsumer.java | 11 +++ 12 files changed, 198 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 322de5c..33886ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; @@ -48,9 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -132,14 +131,14 @@ public abstract class AbstractCoordinator implements Closeable { /** * Get the current list of protocols and their associated metadata supported - * by the local member. The order of the protocols in the map indicates the preference + * by the local member. The order of the protocols in the list indicates the preference * of the protocol (the first entry is the most preferred). The coordinator takes this * preference into account when selecting the generation protocol (generally more preferred * protocols will be selected as long as all members support them and there is no disagreement * on the preference). * @return Non-empty map of supported protocols and metadata */ - protected abstract LinkedHashMap<String, ByteBuffer> metadata(); + protected abstract List<ProtocolMetadata> metadata(); /** * Invoked prior to each group join or rejoin. This is typically used to perform any @@ -308,17 +307,12 @@ public abstract class AbstractCoordinator implements Closeable { // send a join group request to the coordinator log.debug("(Re-)joining group {}", groupId); - - List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>(); - for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet()) - protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue())); - JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs, this.memberId, protocolType(), - protocols); + metadata()); // create the request for the coordinator log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id()); http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8453c7b..4ac05a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; @@ -49,7 +50,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -61,7 +61,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class); - private final Map<String, PartitionAssignor> protocolMap; + private final List<PartitionAssignor> assignors; private final org.apache.kafka.clients.Metadata metadata; private final MetadataSnapshot metadataSnapshot; private final ConsumerCoordinatorMetrics sensors; @@ -104,10 +104,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = defaultOffsetCommitCallback; this.autoCommitEnabled = autoCommitEnabled; - - this.protocolMap = new HashMap<>(); - for (PartitionAssignor assignor : assignors) - this.protocolMap.put(assignor.name(), assignor); + this.assignors = assignors; addMetadataListener(); @@ -121,13 +118,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } @Override - public LinkedHashMap<String, ByteBuffer> metadata() { - LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>(); - for (PartitionAssignor assignor : protocolMap.values()) { + public List<ProtocolMetadata> metadata() { + List<ProtocolMetadata> metadataList = new ArrayList<>(); + for (PartitionAssignor assignor : assignors) { Subscription subscription = assignor.subscription(subscriptions.subscription()); - metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription)); + ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); + metadataList.add(new ProtocolMetadata(assignor.name(), metadata)); } - return metadata; + return metadataList; } private void addMetadataListener() { @@ -156,12 +154,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator { }); } + private PartitionAssignor lookupAssignor(String name) { + for (PartitionAssignor assignor : this.assignors) { + if (assignor.name().equals(name)) + return assignor; + } + return null; + } + @Override protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { - PartitionAssignor assignor = protocolMap.get(assignmentStrategy); + PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); @@ -198,7 +204,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) { - PartitionAssignor assignor = protocolMap.get(protocol); + PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 91a698c..cae07bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -40,13 +40,13 @@ public class JoinGroupRequest extends AbstractRequest { private final int sessionTimeout; private final String memberId; private final String protocolType; - private final List<GroupProtocol> groupProtocols; + private final List<ProtocolMetadata> groupProtocols; - public static class GroupProtocol { + public static class ProtocolMetadata { private final String name; private final ByteBuffer metadata; - public GroupProtocol(String name, ByteBuffer metadata) { + public ProtocolMetadata(String name, ByteBuffer metadata) { this.name = name; this.metadata = metadata; } @@ -64,7 +64,7 @@ public class JoinGroupRequest extends AbstractRequest { int sessionTimeout, String memberId, String protocolType, - List<GroupProtocol> groupProtocols) { + List<ProtocolMetadata> groupProtocols) { super(new Struct(CURRENT_SCHEMA)); struct.set(GROUP_ID_KEY_NAME, groupId); struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); @@ -72,7 +72,7 @@ public class JoinGroupRequest extends AbstractRequest { struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); List<Struct> groupProtocolsList = new ArrayList<>(); - for (GroupProtocol protocol : groupProtocols) { + for (ProtocolMetadata protocol : groupProtocols) { Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME); protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name); protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata); @@ -99,7 +99,7 @@ public class JoinGroupRequest extends AbstractRequest { Struct groupProtocolStruct = (Struct) groupProtocolObj; String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME); ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME); - groupProtocols.add(new GroupProtocol(name, metadata)); + groupProtocols.add(new ProtocolMetadata(name, metadata)); } } @@ -132,7 +132,7 @@ public class JoinGroupRequest extends AbstractRequest { return memberId; } - public List<GroupProtocol> groupProtocols() { + public List<ProtocolMetadata> groupProtocols() { return groupProtocols; } http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 500aaed..9f9682a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -38,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; @@ -75,7 +78,6 @@ public class ConsumerCoordinatorTest { private int sessionTimeoutMs = 10; private int heartbeatIntervalMs = 2; private long retryBackoffMs = 100; - private long requestTimeoutMs = 5000; private boolean autoCommitEnabled = false; private long autoCommitIntervalMs = 5000; private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor(); @@ -108,22 +110,7 @@ public class ConsumerCoordinatorTest { client.setNode(node); - this.coordinator = new ConsumerCoordinator( - consumerClient, - groupId, - sessionTimeoutMs, - heartbeatIntervalMs, - assignors, - metadata, - subscriptions, - metrics, - "consumer" + groupId, - metricTags, - time, - retryBackoffMs, - defaultOffsetCommitCallback, - autoCommitEnabled, - autoCommitIntervalMs); + this.coordinator = buildCoordinator(metrics, assignors); } @After @@ -892,6 +879,47 @@ public class ConsumerCoordinatorTest { assertEquals(null, subscriptions.committed(tp)); } + @Test + public void testProtocolMetadataOrder() { + RoundRobinAssignor roundRobin = new RoundRobinAssignor(); + RangeAssignor range = new RangeAssignor(); + + try (Metrics metrics = new Metrics(time)) { + ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range)); + List<ProtocolMetadata> metadata = coordinator.metadata(); + assertEquals(2, metadata.size()); + assertEquals(roundRobin.name(), metadata.get(0).name()); + assertEquals(range.name(), metadata.get(1).name()); + } + + try (Metrics metrics = new Metrics(time)) { + ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin)); + List<ProtocolMetadata> metadata = coordinator.metadata(); + assertEquals(2, metadata.size()); + assertEquals(range.name(), metadata.get(0).name()); + assertEquals(roundRobin.name(), metadata.get(1).name()); + } + } + + private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) { + return new ConsumerCoordinator( + consumerClient, + groupId, + sessionTimeoutMs, + heartbeatIntervalMs, + assignors, + metadata, + subscriptions, + metrics, + "consumer" + groupId, + metricTags, + time, + retryBackoffMs, + defaultOffsetCommitCallback, + autoCommitEnabled, + autoCommitIntervalMs); + } + private Struct consumerMetadataResponse(Node node, short error) { GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); return response.toStruct(); http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 5ee11d2..ab18817 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -188,8 +188,8 @@ public class RequestResponseTest { private AbstractRequest createJoinGroupRequest() { ByteBuffer metadata = ByteBuffer.wrap(new byte[] {}); - List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>(); - protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata)); + List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>(); + protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata)); return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 082e235..6275636 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.utils.CircularIterator; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.storage.KafkaConfigStorage; @@ -35,7 +36,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -69,7 +69,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos String metricGrpPrefix, Map<String, String> metricTags, Time time, - long requestTimeoutMs, long retryBackoffMs, String restUrl, KafkaConfigStorage configStorage, @@ -101,12 +100,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos } @Override - public LinkedHashMap<String, ByteBuffer> metadata() { - LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>(); + public List<ProtocolMetadata> metadata() { configSnapshot = configStorage.snapshot(); ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset()); - metadata.put(DEFAULT_SUBPROTOCOL, ConnectProtocol.serializeMetadata(workerState)); - return metadata; + ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState); + return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index c72e3ef..a36608a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -105,7 +104,6 @@ public class WorkerGroupMember { metricGrpPrefix, metricsTags, this.time, - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), retryBackoffMs, restUrl, configStorage, http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 38c1aeb..f47a9f9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.GroupCoordinatorResponse; +import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; @@ -68,7 +69,6 @@ public class WorkerCoordinatorTest { private int sessionTimeoutMs = 10; private int heartbeatIntervalMs = 2; private long retryBackoffMs = 100; - private long requestTimeoutMs = 5000; private MockTime time; private MockClient client; private Cluster cluster = TestUtils.singletonCluster("topic", 1); @@ -105,7 +105,6 @@ public class WorkerCoordinatorTest { "consumer" + groupId, metricTags, time, - requestTimeoutMs, retryBackoffMs, LEADER_URL, configStorage, @@ -149,9 +148,12 @@ public class WorkerCoordinatorTest { PowerMock.replayAll(); - LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata(); + List<ProtocolMetadata> serialized = coordinator.metadata(); assertEquals(1, serialized.size()); - ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL)); + + ProtocolMetadata defaultMetadata = serialized.get(0); + assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name()); + ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata()); assertEquals(1, state.offset()); PowerMock.verifyAll(); http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 26ab885..4aa8438 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -191,7 +191,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { private def createJoinGroupRequest = { new JoinGroupRequest(group, 30000, "", "consumer", - List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) + List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) } private def createSyncGroupRequest = { http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/services/verifiable_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 338646b..955dd5d 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -134,6 +134,7 @@ class VerifiableConsumer(BackgroundThreadService): def __init__(self, context, num_nodes, kafka, topic, group_id, max_messages=-1, session_timeout=30000, enable_autocommit=False, + assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", version=TRUNK): super(VerifiableConsumer, self).__init__(context, num_nodes) self.log_level = "TRACE" @@ -144,6 +145,7 @@ class VerifiableConsumer(BackgroundThreadService): self.max_messages = max_messages self.session_timeout = session_timeout self.enable_autocommit = enable_autocommit + self.assignment_strategy = assignment_strategy self.prop_file = "" self.security_config = kafka.security_config.client_config(self.prop_file) self.prop_file += str(self.security_config) @@ -223,9 +225,10 @@ class VerifiableConsumer(BackgroundThreadService): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \ - " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \ + " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \ (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout, - "--enable-autocommit" if self.enable_autocommit else "") + self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "") + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tests/kafkatest/tests/consumer_rolling_upgrade.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py new file mode 100644 index 0000000..f00937c --- /dev/null +++ b/tests/kafkatest/tests/consumer_rolling_upgrade.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.utils.util import wait_until + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.verifiable_consumer import VerifiableConsumer +from kafkatest.services.kafka import TopicPartition + +class ConsumerRollingUpgradeTest(KafkaTest): + TOPIC = "test_topic" + NUM_PARTITIONS = 4 + GROUP_ID = "test_group_id" + RANGE = "org.apache.kafka.clients.consumer.RangeAssignor" + ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor" + + def __init__(self, test_context): + super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 } + }) + self.num_consumers = 2 + self.session_timeout = 10000 + + def min_cluster_size(self): + return super(ConsumerRollingUpgradeTest, self).min_cluster_size() + self.num_consumers + + def _await_all_members(self, consumer): + # Wait until all members have joined the group + wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5, + err_msg="Consumers failed to join in a reasonable amount of time") + + def _verify_range_assignment(self, consumer): + # range assignment should give us two partition sets: (0, 1) and (2, 3) + assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()]) + assert assignment == set([ + frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]), + frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])]) + + def _verify_roundrobin_assignment(self, consumer): + assignment = set([frozenset(x) for x in consumer.current_assignment().values()]) + assert assignment == set([ + frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]), + frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])]) + + def rolling_update_test(self): + """ + Verify rolling updates of partition assignment strategies works correctly. In this + test, we use a rolling restart to change the group's assignment strategy from "range" + to "roundrobin." We verify after every restart that all members are still in the group + and that the correct assignment strategy was used. + """ + + # initialize the consumer using range assignment + consumer = VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, + self.TOPIC, self.GROUP_ID, session_timeout=self.session_timeout, + assignment_strategy=self.RANGE) + consumer.start() + self._await_all_members(consumer) + self._verify_range_assignment(consumer) + + # change consumer configuration to prefer round-robin assignment, but still support range assignment + consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE + + # restart one of the nodes and verify that we are still using range assignment + consumer.stop_node(consumer.nodes[0]) + consumer.start_node(consumer.nodes[0]) + self._await_all_members(consumer) + self._verify_range_assignment(consumer) + + # now restart the other node and verify that we have switched to round-robin + consumer.stop_node(consumer.nodes[1]) + consumer.start_node(consumer.nodes[1]) + self._await_all_members(consumer) + self._verify_roundrobin_assignment(consumer) + + # if we want, we can now drop support for range assignment + consumer.assignment_strategy = self.ROUND_ROBIN + for node in consumer.nodes: + consumer.stop_node(node) + consumer.start_node(node) + self._await_all_members(consumer) + self._verify_roundrobin_assignment(consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/cd54fc88/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 93c0bc6..25b87bd 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -35,6 +35,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; @@ -538,6 +540,14 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons .dest("resetPolicy") .help("Set reset policy (must be either 'earliest', 'latest', or 'none'"); + parser.addArgument("--assignment-strategy") + .action(store()) + .required(false) + .setDefault(RangeAssignor.class.getName()) + .type(String.class) + .dest("assignmentStrategy") + .help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")"); + parser.addArgument("--consumer.config") .action(store()) .required(false) @@ -571,6 +581,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); StringDeserializer deserializer = new StringDeserializer(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
