http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java index e1c3634..865d6c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -31,21 +32,21 @@ public class UpdateMetadataResponse extends AbstractResponse { * * STALE_CONTROLLER_EPOCH (11) */ - private final short errorCode; + private final Errors error; - public UpdateMetadataResponse(short errorCode) { + public UpdateMetadataResponse(Errors error) { super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - this.errorCode = errorCode; + struct.set(ERROR_CODE_KEY_NAME, error.code()); + this.error = error; } public UpdateMetadataResponse(Struct struct) { super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } - public short errorCode() { - return errorCode; + public Errors error() { + return error; } public static UpdateMetadataResponse parse(ByteBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 59eee83..2b445e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -324,7 +324,7 @@ public class SaslClientAuthenticator implements Authenticator { } private void handleSaslHandshakeResponse(SaslHandshakeResponse response) { - Errors error = Errors.forCode(response.errorCode()); + Errors error = response.error(); switch (error) { case NONE: break; @@ -336,7 +336,7 @@ public class SaslClientAuthenticator implements Authenticator { mechanism, response.enabledMechanisms())); default: throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s", - response.errorCode(), mechanism, response.enabledMechanisms())); + response.error(), mechanism, response.enabledMechanisms())); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 07792d2..7f6b7aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -359,11 +359,11 @@ public class SaslServerAuthenticator implements Authenticator { String clientMechanism = handshakeRequest.mechanism(); if (enabledMechanisms.contains(clientMechanism)) { LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism); - sendKafkaResponse(requestHeader, new SaslHandshakeResponse((short) 0, enabledMechanisms)); + sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms)); return clientMechanism; } else { LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism); - sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), enabledMechanisms)); + sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms)); throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 4aaa172..eb0b5c8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -459,7 +459,7 @@ public class KafkaConsumerTest { // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE.code())); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 50L, 5)); ConsumerRecords<String, String> records = consumer.poll(0); @@ -493,7 +493,7 @@ public class KafkaConsumerTest { consumer.assign(singletonList(tp0)); // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // fetch offset for one topic @@ -991,7 +991,7 @@ public class KafkaConsumerTest { rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1009,7 +1009,7 @@ public class KafkaConsumerTest { // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code())); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 10L, 1)); ConsumerRecords<String, String> records = consumer.poll(0); @@ -1056,7 +1056,7 @@ public class KafkaConsumerTest { rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1074,7 +1074,7 @@ public class KafkaConsumerTest { // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code())); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 10L, 1)); ConsumerRecords<String, String> records = consumer.poll(0); @@ -1117,7 +1117,7 @@ public class KafkaConsumerTest { rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment @@ -1139,8 +1139,8 @@ public class KafkaConsumerTest { assertEquals(0, consumer.committed(tp1).offset()); // fetch and verify consumer's position in the two partitions - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE.code())); - client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE.code())); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE)); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE)); assertEquals(3L, consumer.position(tp0)); assertEquals(3L, consumer.position(tp1)); @@ -1183,10 +1183,10 @@ public class KafkaConsumerTest { @Test public void testGracefulClose() throws Exception { - Map<TopicPartition, Short> response = new HashMap<>(); - response.put(tp0, Errors.NONE.code()); + Map<TopicPartition, Errors> response = new HashMap<>(); + response.put(tp0, Errors.NONE); OffsetCommitResponse commitResponse = offsetCommitResponse(response); - LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(Errors.NONE.code()); + LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(Errors.NONE); consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse), 0, false); } @@ -1197,8 +1197,8 @@ public class KafkaConsumerTest { @Test public void testLeaveGroupTimeout() throws Exception { - Map<TopicPartition, Short> response = new HashMap<>(); - response.put(tp0, Errors.NONE.code()); + Map<TopicPartition, Errors> response = new HashMap<>(); + response.put(tp0, Errors.NONE); OffsetCommitResponse commitResponse = offsetCommitResponse(response); consumerCloseTest(5000, Arrays.asList(commitResponse), 5000, false); } @@ -1332,7 +1332,7 @@ public class KafkaConsumerTest { private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); } @@ -1344,10 +1344,10 @@ public class KafkaConsumerTest { PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata()); return subscribedTopics.equals(new HashSet<>(subscription.topics())); } - }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); + }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); // sync group - client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator); + client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE), coordinator); return coordinator; } @@ -1355,15 +1355,15 @@ public class KafkaConsumerTest { private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator - client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node); + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node); coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); } // join group - client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator); + client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); // sync group - client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator); + client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE), coordinator); return coordinator; } @@ -1376,15 +1376,15 @@ public class KafkaConsumerTest { heartbeatReceived.set(true); return true; } - }, new HeartbeatResponse(Errors.NONE.code()), coordinator); + }, new HeartbeatResponse(Errors.NONE), coordinator); return heartbeatReceived; } private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final Map<TopicPartition, Long> partitionOffsets) { final AtomicBoolean commitReceived = new AtomicBoolean(true); - Map<TopicPartition, Short> response = new HashMap<>(); + Map<TopicPartition, Errors> response = new HashMap<>(); for (TopicPartition partition : partitionOffsets.keySet()) - response.put(partition, Errors.NONE.code()); + response.put(partition, Errors.NONE); client.prepareResponseFrom(new MockClient.RequestMatcher() { @Override @@ -1408,16 +1408,16 @@ public class KafkaConsumerTest { return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset)); } - private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) { + private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) { return new OffsetCommitResponse(responseData); } - private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) { + private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId, Collections.<String, ByteBuffer>emptyMap()); } - private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) { + private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) { ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); return new SyncGroupResponse(error, buf); } @@ -1430,7 +1430,7 @@ public class KafkaConsumerTest { return new OffsetFetchResponse(Errors.NONE, partitionData); } - private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) { + private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, Errors error) { Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) { partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error, @@ -1448,7 +1448,7 @@ public class KafkaConsumerTest { MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, fetchOffset); for (int i = 0; i < fetchCount; i++) records.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); - tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build())); + tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, records.build())); } return new FetchResponse(tpResponses, 0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index bb617ae..afdecfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -467,20 +467,20 @@ public class AbstractCoordinatorTest { } private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { - return new GroupCoordinatorResponse(error.code(), node); + return new GroupCoordinatorResponse(error, node); } private HeartbeatResponse heartbeatResponse(Errors error) { - return new HeartbeatResponse(error.code()); + return new HeartbeatResponse(error); } private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) { - return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId, + return new JoinGroupResponse(error, generationId, "dummy-subprotocol", memberId, leaderId, Collections.<String, ByteBuffer>emptyMap()); } private SyncGroupResponse syncGroupResponse(Errors error) { - return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)); + return new SyncGroupResponse(error, ByteBuffer.allocate(0)); } public class DummyCoordinator extends AbstractCoordinator { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index e11bf30..33f496b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest { @Test public void testNormalHeartbeat() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal heartbeat @@ -148,7 +148,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + client.prepareResponse(heartbeatResponse(Errors.NONE)); consumerClient.poll(0); assertTrue(future.isDone()); @@ -157,7 +157,7 @@ public class ConsumerCoordinatorTest { @Test(expected = GroupAuthorizationException.class) public void testGroupDescribeUnauthorized() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED)); coordinator.ensureCoordinatorReady(); } @@ -165,17 +165,17 @@ public class ConsumerCoordinatorTest { public void testGroupReadUnauthorized() { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(), - Errors.GROUP_AUTHORIZATION_FAILED.code())); + Errors.GROUP_AUTHORIZATION_FAILED)); coordinator.poll(time.milliseconds()); } @Test public void testCoordinatorNotAvailable() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown @@ -184,7 +184,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())); + client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)); time.sleep(sessionTimeoutMs); consumerClient.poll(0); @@ -196,7 +196,7 @@ public class ConsumerCoordinatorTest { @Test public void testNotCoordinator() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // not_coordinator will mark coordinator as unknown @@ -205,7 +205,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code())); + client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP)); time.sleep(sessionTimeoutMs); consumerClient.poll(0); @@ -217,7 +217,7 @@ public class ConsumerCoordinatorTest { @Test public void testIllegalGeneration() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // illegal_generation will cause re-partition @@ -229,7 +229,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code())); + client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION)); time.sleep(sessionTimeoutMs); consumerClient.poll(0); @@ -241,7 +241,7 @@ public class ConsumerCoordinatorTest { @Test public void testUnknownConsumerId() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // illegal_generation will cause re-partition @@ -253,7 +253,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code())); + client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID)); time.sleep(sessionTimeoutMs); consumerClient.poll(0); @@ -265,7 +265,7 @@ public class ConsumerCoordinatorTest { @Test public void testCoordinatorDisconnect() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // coordinator disconnect will mark coordinator as unknown @@ -274,7 +274,7 @@ public class ConsumerCoordinatorTest { assertEquals(1, consumerClient.pendingRequestCount()); assertFalse(future.isDone()); - client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected + client.prepareResponse(heartbeatResponse(Errors.NONE), true); // return disconnected time.sleep(sessionTimeoutMs); consumerClient.poll(0); @@ -294,11 +294,11 @@ public class ConsumerCoordinatorTest { metadata.setTopics(singletonList(topic1)); metadata.update(cluster, time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(), - Errors.INVALID_GROUP_ID.code())); + Errors.INVALID_GROUP_ID)); coordinator.poll(time.milliseconds()); } @@ -312,14 +312,14 @@ public class ConsumerCoordinatorTest { metadata.setTopics(singletonList(topic1)); metadata.update(cluster, time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p))); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -328,7 +328,7 @@ public class ConsumerCoordinatorTest { sync.generationId() == 1 && sync.groupAssignment().containsKey(consumerId); } - }, syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.milliseconds()); assertFalse(coordinator.needRejoin()); @@ -351,14 +351,14 @@ public class ConsumerCoordinatorTest { metadata.setTopics(singletonList(topic1)); metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(t1p, t2p))); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -367,7 +367,7 @@ public class ConsumerCoordinatorTest { sync.generationId() == 1 && sync.groupAssignment().containsKey(consumerId); } - }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code())); + }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE)); // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(cluster); @@ -393,7 +393,7 @@ public class ConsumerCoordinatorTest { assertEquals(singleton(topic1), subscriptions.subscription()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); Map<String, List<String>> initialSubscription = singletonMap(consumerId, singletonList(topic1)); @@ -403,7 +403,7 @@ public class ConsumerCoordinatorTest { final List<String> updatedSubscription = Arrays.asList(topic1, topic2); final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -413,7 +413,7 @@ public class ConsumerCoordinatorTest { metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds()); return true; } - }, syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); List<TopicPartition> newAssignment = Arrays.asList(t1p, t2p); Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment); @@ -431,8 +431,8 @@ public class ConsumerCoordinatorTest { protocolMetadata.metadata().rewind(); return subscription.topics().containsAll(updatedSubscriptionSet); } - }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE.code())); + }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE)); + client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE)); coordinator.poll(time.milliseconds()); @@ -455,14 +455,14 @@ public class ConsumerCoordinatorTest { metadata.setTopics(singletonList(topic1)); metadata.update(cluster, time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p))); // prepare only the first half of the join and then trigger the wakeup - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); consumerClient.wakeup(); try { @@ -472,7 +472,7 @@ public class ConsumerCoordinatorTest { } // now complete the second half - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.milliseconds()); assertFalse(coordinator.needRejoin()); @@ -489,11 +489,11 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -502,7 +502,7 @@ public class ConsumerCoordinatorTest { sync.generationId() == 1 && sync.groupAssignment().isEmpty(); } - }, syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); @@ -526,11 +526,11 @@ public class ConsumerCoordinatorTest { metadata.setTopics(singletonList(topic1)); metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // normal join group - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -539,7 +539,7 @@ public class ConsumerCoordinatorTest { sync.generationId() == 1 && sync.groupAssignment().isEmpty(); } - }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code())); + }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE)); // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(cluster); @@ -559,11 +559,11 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); final AtomicBoolean received = new AtomicBoolean(false); @@ -575,7 +575,7 @@ public class ConsumerCoordinatorTest { return leaveRequest.memberId().equals(consumerId) && leaveRequest.groupId().equals(groupId); } - }, new LeaveGroupResponse(Errors.NONE.code())); + }, new LeaveGroupResponse(Errors.NONE)); coordinator.close(0); assertTrue(received.get()); } @@ -586,11 +586,11 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); final AtomicBoolean received = new AtomicBoolean(false); @@ -602,7 +602,7 @@ public class ConsumerCoordinatorTest { return leaveRequest.memberId().equals(consumerId) && leaveRequest.groupId().equals(groupId); } - }, new LeaveGroupResponse(Errors.NONE.code())); + }, new LeaveGroupResponse(Errors.NONE)); coordinator.maybeLeaveGroup(); assertTrue(received.get()); @@ -616,12 +616,12 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join initially, but let coordinator rebalance on sync - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN)); coordinator.joinGroupIfNeeded(); } @@ -631,12 +631,12 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join initially, but let coordinator returns unknown member id - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID)); // now we should see a new join with the empty UNKNOWN_MEMBER_ID client.prepareResponse(new MockClient.RequestMatcher() { @@ -645,8 +645,8 @@ public class ConsumerCoordinatorTest { JoinGroupRequest joinRequest = (JoinGroupRequest) body; return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); } - }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); @@ -660,16 +660,16 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join initially, but let coordinator rebalance on sync - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS)); // then let the full join/sync finish successfully - client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); @@ -683,12 +683,12 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join initially, but let coordinator rebalance on sync - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION)); // then let the full join/sync finish successfully client.prepareResponse(new MockClient.RequestMatcher() { @@ -697,8 +697,8 @@ public class ConsumerCoordinatorTest { JoinGroupRequest joinRequest = (JoinGroupRequest) body; return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); } - }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); @@ -716,15 +716,15 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p))); // the leader is responsible for picking up metadata changes and forcing a group rebalance - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.poll(time.milliseconds()); @@ -753,14 +753,14 @@ public class ConsumerCoordinatorTest { // we only have metadata for one topic initially metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // prepare initial rebalance Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics); partitionAssignor.prepare(Collections.singletonMap(consumerId, Collections.singletonList(tp1))); - client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -777,11 +777,11 @@ public class ConsumerCoordinatorTest { } return false; } - }, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE.code())); + }, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE)); // the metadata update should trigger a second rebalance - client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code())); + client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE)); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE)); coordinator.poll(time.milliseconds()); @@ -815,12 +815,12 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // join the group once - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); assertEquals(1, rebalanceListener.revokedCount); @@ -830,8 +830,8 @@ public class ConsumerCoordinatorTest { // and join the group again subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); - client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); assertEquals(2, rebalanceListener.revokedCount); @@ -844,14 +844,14 @@ public class ConsumerCoordinatorTest { public void testDisconnectInJoin() { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // disconnected from original coordinator will cause re-discover and join again - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); assertFalse(coordinator.needRejoin()); @@ -865,11 +865,11 @@ public class ConsumerCoordinatorTest { public void testInvalidSessionTimeout() { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // coordinator doesn't like the session timeout - client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code())); + client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT)); coordinator.joinGroupIfNeeded(); } @@ -877,10 +877,10 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetOnly() { subscriptions.assignFromUser(singleton(t1p)); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success)); @@ -899,16 +899,16 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); subscriptions.seek(t1p, 100); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); coordinator.poll(time.milliseconds()); @@ -924,20 +924,20 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // haven't joined, so should not cause a commit time.sleep(autoCommitIntervalMs); consumerClient.poll(0); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); subscriptions.seek(t1p, 100); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); coordinator.poll(time.milliseconds()); @@ -952,10 +952,10 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); coordinator.poll(time.milliseconds()); @@ -978,12 +978,12 @@ public class ConsumerCoordinatorTest { assertNull(subscriptions.committed(t1p)); // now find the coordinator - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sleep only for the retry backoff time.sleep(retryBackoffMs); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.poll(time.milliseconds()); assertEquals(100L, subscriptions.committed(t1p).offset()); @@ -993,10 +993,10 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetMetadata() { subscriptions.assignFromUser(singleton(t1p)); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "hello")), callback(success)); @@ -1010,9 +1010,9 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncWithDefaultCallback() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); @@ -1024,18 +1024,18 @@ public class ConsumerCoordinatorTest { // enable auto-assignment subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); client.prepareMetadataUpdate(cluster); coordinator.joinGroupIfNeeded(); // now switch to manual assignment - client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code())); + client.prepareResponse(new LeaveGroupResponse(Errors.NONE)); subscriptions.unsubscribe(); coordinator.maybeLeaveGroup(); subscriptions.assignFromUser(singleton(t1p)); @@ -1048,7 +1048,7 @@ public class ConsumerCoordinatorTest { return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) && commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID; } - }, offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + }, offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); AtomicBoolean success = new AtomicBoolean(false); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success)); @@ -1059,9 +1059,9 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncFailedWithDefaultCallback() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); @@ -1070,12 +1070,12 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncCoordinatorNotAvailable() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // async commit with coordinator not available MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb); coordinator.invokeCompletedOffsetCommitCallbacks(); @@ -1086,12 +1086,12 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncNotCoordinator() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // async commit with not coordinator MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP))); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb); coordinator.invokeCompletedOffsetCommitCallbacks(); @@ -1102,12 +1102,12 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncDisconnected() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // async commit with coordinator disconnected MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())), true); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb); coordinator.invokeCompletedOffsetCommitCallbacks(); @@ -1118,109 +1118,109 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetSyncNotCoordinator() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP))); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test public void testCommitOffsetSyncCoordinatorNotAvailable() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test public void testCommitOffsetSyncCoordinatorDisconnected() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())), true); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test(expected = KafkaException.class) public void testCommitUnknownTopicOrPartition() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = OffsetMetadataTooLarge.class) public void testCommitOffsetMetadataTooLarge() { // since offset metadata is provided by the user, we have to propagate the exception so they can handle it - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.OFFSET_METADATA_TOO_LARGE.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.OFFSET_METADATA_TOO_LARGE))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) public void testCommitOffsetIllegalGeneration() { // we cannot retry if a rebalance occurs before the commit completed - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) public void testCommitOffsetUnknownMemberId() { // we cannot retry if a rebalance occurs before the commit completed - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = CommitFailedException.class) public void testCommitOffsetRebalanceInProgress() { // we cannot retry if a rebalance occurs before the commit completed - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS.code()))); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); } @Test(expected = KafkaException.class) public void testCommitOffsetSyncCallbackWithNonRetriableException() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sync commit with invalid partitions should throw if we have no callback - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN.code())), false); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN)), false); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } @Test(expected = IllegalArgumentException.class) public void testCommitSyncNegativeOffset() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE); } @Test public void testCommitAsyncNegativeOffset() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); @@ -1229,7 +1229,7 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffset() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); @@ -1242,7 +1242,7 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetLoadInProgress() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); @@ -1256,7 +1256,7 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetsGroupNotAuthorized() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); @@ -1272,7 +1272,7 @@ public class ConsumerCoordinatorTest { @Test(expected = KafkaException.class) public void testRefreshOffsetUnknownTopicOrPartition() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); @@ -1283,13 +1283,13 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetNotCoordinatorForConsumer() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP)); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(); assertFalse(subscriptions.refreshCommitsNeeded()); @@ -1298,7 +1298,7 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetWithNoFetchableOffsets() { - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); subscriptions.assignFromUser(singleton(t1p)); @@ -1429,12 +1429,12 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit); - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); if (useGroupManagement) { subscriptions.subscribe(singleton(topic1), rebalanceListener); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); } else subscriptions.assignFromUser(singleton(t1p)); @@ -1445,10 +1445,10 @@ public class ConsumerCoordinatorTest { return coordinator; } - private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors errorCode) { + private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors error) { time.sleep(sessionTimeoutMs); coordinator.sendHeartbeatRequest(); - client.prepareResponse(heartbeatResponse(errorCode.code())); + client.prepareResponse(heartbeatResponse(error)); time.sleep(sessionTimeoutMs); consumerClient.poll(0); assertTrue(coordinator.coordinatorUnknown()); @@ -1497,7 +1497,7 @@ public class ConsumerCoordinatorTest { OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.groupId().equals(groupId); } - }, new OffsetCommitResponse(new HashMap<TopicPartition, Short>())); + }, new OffsetCommitResponse(new HashMap<TopicPartition, Errors>())); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1505,7 +1505,7 @@ public class ConsumerCoordinatorTest { LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; return leaveRequest.groupId().equals(groupId); } - }, new LeaveGroupResponse(Errors.NONE.code())); + }, new LeaveGroupResponse(Errors.NONE)); coordinator.close(); assertTrue("Commit not requested", commitRequested.get()); @@ -1536,18 +1536,18 @@ public class ConsumerCoordinatorTest { excludeInternalTopics); } - private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) { + private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { return new GroupCoordinatorResponse(error, node); } - private HeartbeatResponse heartbeatResponse(short error) { + private HeartbeatResponse heartbeatResponse(Errors error) { return new HeartbeatResponse(error); } private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, Map<String, List<String>> subscriptions, - short error) { + Errors error) { Map<String, ByteBuffer> metadata = new HashMap<>(); for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) { PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue()); @@ -1557,17 +1557,17 @@ public class ConsumerCoordinatorTest { return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata); } - private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { + private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId, Collections.<String, ByteBuffer>emptyMap()); } - private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) { + private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) { ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); return new SyncGroupResponse(error, buf); } - private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) { + private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) { return new OffsetCommitResponse(responseData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 3694714..83bb145 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -47,7 +47,7 @@ public class ConsumerNetworkClientTest { @Test public void send() { - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + client.prepareResponse(heartbeatResponse(Errors.NONE)); RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); assertEquals(1, consumerClient.pendingRequestCount()); assertEquals(1, consumerClient.pendingRequestCount(node)); @@ -59,13 +59,13 @@ public class ConsumerNetworkClientTest { ClientResponse clientResponse = future.value(); HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody(); - assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(Errors.NONE, response.error()); } @Test public void multiSend() { - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + client.prepareResponse(heartbeatResponse(Errors.NONE)); + client.prepareResponse(heartbeatResponse(Errors.NONE)); RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat()); RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat()); assertEquals(2, consumerClient.pendingRequestCount()); @@ -150,7 +150,7 @@ public class ConsumerNetworkClientTest { } catch (WakeupException e) { } - client.respond(heartbeatResponse(Errors.NONE.code())); + client.respond(heartbeatResponse(Errors.NONE)); consumerClient.poll(future); assertTrue(future.isDone()); } @@ -201,11 +201,11 @@ public class ConsumerNetworkClientTest { // Enable send, the un-expired send should succeed on poll isReady.set(true); - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + client.prepareResponse(heartbeatResponse(Errors.NONE)); consumerClient.poll(future2); ClientResponse clientResponse = future2.value(); HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody(); - assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(Errors.NONE, response.error()); // Disable ready flag to delay send and queue another send. Disconnection should remove pending send isReady.set(false); @@ -224,7 +224,7 @@ public class ConsumerNetworkClientTest { return new HeartbeatRequest.Builder("group", 1, "memberId"); } - private HeartbeatResponse heartbeatResponse(short error) { + private HeartbeatResponse heartbeatResponse(Errors error) { return new HeartbeatResponse(error); }