This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 42aaccec8b86fc26dc6da5ea3f1c3c7fd7a496a5 Author: Jason Gustafson <[email protected]> AuthorDate: Tue May 21 22:51:56 2019 -0700 HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780) KIP-345 and KIP-392 introduced a couple breaking changes for old versions of bumped protocols. This patch fixes them. Reviewers: Colin Patrick McCabe <[email protected]>, Ismael Juma <[email protected]>, Boyang Chen <[email protected]>, Guozhang Wang <[email protected]> --- .../requests/OffsetsForLeaderEpochRequest.java | 2 +- .../resources/common/message/HeartbeatRequest.json | 3 +- .../resources/common/message/JoinGroupRequest.json | 3 +- .../common/message/OffsetCommitRequest.json | 3 +- .../resources/common/message/SyncGroupRequest.json | 3 +- .../apache/kafka/common/message/MessageTest.java | 196 +++++++++++++++------ .../kafka/common/requests/RequestResponseTest.java | 9 +- .../apache/kafka/message/MessageDataGenerator.java | 6 +- 8 files changed, 165 insertions(+), 60 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index 6599a70..d3df6cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version())); - requestStruct.set(REPLICA_ID, replicaId); + requestStruct.setIfExists(REPLICA_ID, replicaId); Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition); diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json index 148e661..aa7f337 100644 --- a/clients/src/main/resources/common/message/HeartbeatRequest.json +++ b/clients/src/main/resources/common/message/HeartbeatRequest.json @@ -27,7 +27,8 @@ "about": "The generation of the group." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, - { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+", + { "name": "GroupInstanceId", "type": "string", "versions": "3+", + "nullableVersions": "3+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." } ] } diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json index 6db24da..8e58b87 100644 --- a/clients/src/main/resources/common/message/JoinGroupRequest.json +++ b/clients/src/main/resources/common/message/JoinGroupRequest.json @@ -34,7 +34,8 @@ "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member id assigned by the group coordinator." }, - { "name": "GroupInstanceId", "type": "string", "versions": "5+", "nullableVersions": "5+", + { "name": "GroupInstanceId", "type": "string", "versions": "5+", + "nullableVersions": "5+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "ProtocolType", "type": "string", "versions": "0+", "about": "The unique name the for class of protocols implemented by the group we want to join." }, diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index 0ad7565..adda079 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -35,7 +35,8 @@ "about": "The generation of the group." }, { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true, "about": "The member ID assigned by the group coordinator." }, - { "name": "GroupInstanceId", "type": "string", "versions": "7+", "ignorable": true, "nullableVersions": "7+", + { "name": "GroupInstanceId", "type": "string", "versions": "7+", + "nullableVersions": "7+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "RetentionTimeMs", "type": "int64", "versions": "2-4", "default": "-1", "ignorable": true, "about": "The time period in ms to retain the offset." }, diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json index 282cb2a..3ace70d 100644 --- a/clients/src/main/resources/common/message/SyncGroupRequest.json +++ b/clients/src/main/resources/common/message/SyncGroupRequest.json @@ -27,7 +27,8 @@ "about": "The generation of the group." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID assigned by the group." }, - { "name": "GroupInstanceId", "type": "string", "versions": "3+", "nullableVersions": "3+", + { "name": "GroupInstanceId", "type": "string", "versions": "3+", + "nullableVersions": "3+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "Assignments", "type": "[]SyncGroupRequestAssignment", "versions": "0+", "about": "Each assignment.", "fields": [ diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 1780a71..547dbc4 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.message; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Message; @@ -25,22 +27,20 @@ import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.BoundField; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; -import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -49,50 +49,127 @@ public final class MessageTest { @Rule final public Timeout globalTimeout = Timeout.millis(120000); - /** - * Test serializing and deserializing some messages. - */ @Test - public void testRoundTrips() throws Exception { - testMessageRoundTrips(new MetadataRequestData().setTopics( - Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"), - new MetadataRequestData.MetadataRequestTopic().setName("bar") - )), (short) 6); - testMessageRoundTrips(new AddOffsetsToTxnRequestData(). - setTransactionalId("foobar"). - setProducerId(0xbadcafebadcafeL). - setProducerEpoch((short) 123). - setGroupId("baaz"), (short) 1); - testMessageRoundTrips(new AddOffsetsToTxnResponseData(). - setThrottleTimeMs(42). - setErrorCode((short) 0), (short) 0); - testMessageRoundTrips(new AddPartitionsToTxnRequestData(). - setTransactionalId("blah"). - setProducerId(0xbadcafebadcafeL). - setProducerEpoch((short) 30000). - setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList( - new AddPartitionsToTxnTopic(). - setName("Topic"). - setPartitions(Collections.singletonList(1))).iterator()))); - testMessageRoundTrips(new CreateTopicsRequestData(). - setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection())); - testMessageRoundTrips(new DescribeAclsRequestData(). - setResourceType((byte) 42). - setResourceNameFilter(null). - setResourcePatternType((byte) 3). - setPrincipalFilter("abc"). - setHostFilter(null). - setOperation((byte) 0). - setPermissionType((byte) 0), (short) 0); - testMessageRoundTrips(new MetadataRequestData(). - setTopics(null). - setAllowAutoTopicCreation(false). - setIncludeClusterAuthorizedOperations(false). - setIncludeTopicAuthorizedOperations(false)); + public void testAddOffsetsToTxnVersions() throws Exception { + testAllMessageRoundTrips(new AddOffsetsToTxnRequestData(). + setTransactionalId("foobar"). + setProducerId(0xbadcafebadcafeL). + setProducerEpoch((short) 123). + setGroupId("baaz")); + testAllMessageRoundTrips(new AddOffsetsToTxnResponseData(). + setThrottleTimeMs(42). + setErrorCode((short) 0)); + } + + @Test + public void testAddPartitionsToTxnVersions() throws Exception { + testAllMessageRoundTrips(new AddPartitionsToTxnRequestData(). + setTransactionalId("blah"). + setProducerId(0xbadcafebadcafeL). + setProducerEpoch((short) 30000). + setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList( + new AddPartitionsToTxnTopic(). + setName("Topic"). + setPartitions(Collections.singletonList(1))).iterator()))); + } + + @Test + public void testCreateTopicsVersions() throws Exception { + testAllMessageRoundTrips(new CreateTopicsRequestData(). + setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection())); } - private void testMessageRoundTrips(Message message) throws Exception { - testMessageRoundTrips(message, message.highestSupportedVersion()); + @Test + public void testDescribeAclsRequest() throws Exception { + testAllMessageRoundTrips(new DescribeAclsRequestData(). + setResourceType((byte) 42). + setResourceNameFilter(null). + setResourcePatternType((byte) 3). + setPrincipalFilter("abc"). + setHostFilter(null). + setOperation((byte) 0). + setPermissionType((byte) 0)); + } + + @Test + public void testMetadataVersions() throws Exception { + testAllMessageRoundTrips(new MetadataRequestData().setTopics( + Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"), + new MetadataRequestData.MetadataRequestTopic().setName("bar") + ))); + testAllMessageRoundTripsFromVersion(new MetadataRequestData(). + setTopics(null). + setAllowAutoTopicCreation(true). + setIncludeClusterAuthorizedOperations(false). + setIncludeTopicAuthorizedOperations(false), (short) 1); + testAllMessageRoundTripsFromVersion(new MetadataRequestData(). + setTopics(null). + setAllowAutoTopicCreation(false). + setIncludeClusterAuthorizedOperations(false). + setIncludeTopicAuthorizedOperations(false), (short) 4); + } + + @Test + public void testHeartbeatVersions() throws Exception { + Supplier<HeartbeatRequestData> newRequest = () -> new HeartbeatRequestData() + .setGroupId("groupId") + .setMemberId("memberId") + .setGenerationId(15); + testAllMessageRoundTrips(newRequest.get()); + testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null)); + testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3); + } + + @Test + public void testJoinGroupVersions() throws Exception { + Supplier<JoinGroupRequestData> newRequest = () -> new JoinGroupRequestData() + .setGroupId("groupId") + .setMemberId("memberId") + .setProtocolType("consumer") + .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection()) + .setSessionTimeoutMs(10000); + testAllMessageRoundTrips(newRequest.get()); + testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1); + testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null)); + testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5); + } + + @Test + public void testSyncGroupDefaultGroupInstanceId() throws Exception { + Supplier<SyncGroupRequestData> request = () -> new SyncGroupRequestData() + .setGroupId("groupId") + .setMemberId("memberId") + .setGenerationId(15) + .setAssignments(new ArrayList<>()); + testAllMessageRoundTrips(request.get()); + testAllMessageRoundTrips(request.get().setGroupInstanceId(null)); + testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3); + } + + @Test + public void testOffsetCommitDefaultGroupInstanceId() throws Exception { + testAllMessageRoundTrips(new OffsetCommitRequestData() + .setTopics(new ArrayList<>()) + .setGroupId("groupId")); + + Supplier<OffsetCommitRequestData> request = () -> new OffsetCommitRequestData() + .setGroupId("groupId") + .setMemberId("memberId") + .setTopics(new ArrayList<>()) + .setGenerationId(15); + testAllMessageRoundTripsFromVersion(request.get(), (short) 1); + testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1); + testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7); + } + + private void testAllMessageRoundTrips(Message message) throws Exception { + testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion()); + } + + private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception { + for (short version = fromVersion; version < message.highestSupportedVersion(); version++) { + testMessageRoundTrips(message, version); + } } private void testMessageRoundTrips(Message message, short version) throws Exception { @@ -294,6 +371,25 @@ public final class MessageTest { new FetchRequestData.ForgottenTopic().setName("foo")))); } + @Test + public void testNonIgnorableFieldWithDefaultNull() throws Exception { + // Test non-ignorable string field `groupInstanceId` with default null + verifySizeRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData() + .setGroupId("groupId") + .setGenerationId(15) + .setMemberId("memberId") + .setGroupInstanceId("instanceId")); + verifySizeSucceeds((short) 0, new HeartbeatRequestData() + .setGroupId("groupId") + .setGenerationId(15) + .setMemberId("memberId") + .setGroupInstanceId(null)); + verifySizeSucceeds((short) 0, new HeartbeatRequestData() + .setGroupId("groupId") + .setGenerationId(15) + .setMemberId("memberId")); + } + private void verifySizeRaisesUve(short version, String problemFieldName, Message message) throws Exception { try { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d7e9223..e8f349f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -290,9 +290,10 @@ public class RequestResponseTest { checkRequest(createListOffsetRequest(0), true); checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true); checkResponse(createListOffsetResponse(0), 0, true); - checkRequest(createLeaderEpochRequest(), true); + checkRequest(createLeaderEpochRequest(0), true); + checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true); checkResponse(createLeaderEpochResponse(), 0, true); - checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException(), true); + checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true); checkRequest(createAddPartitionsToTxnRequest(), true); checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true); checkResponse(createAddPartitionsToTxnResponse(), 0, true); @@ -1250,7 +1251,7 @@ public class RequestResponseTest { return new InitProducerIdResponse(responseData); } - private OffsetsForLeaderEpochRequest createLeaderEpochRequest() { + private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) { Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>(); epochs.put(new TopicPartition("topic1", 0), @@ -1260,7 +1261,7 @@ public class RequestResponseTest { epochs.put(new TopicPartition("topic2", 2), new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3)); - return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build(); + return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build(); } private OffsetsForLeaderEpochResponse createLeaderEpochResponse() { diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index d6cd5f3..ad00137 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -928,7 +928,11 @@ public final class MessageDataGenerator { } else if (field.type().isBytes()) { buffer.printf("if (%s.length != 0) {%n", field.camelCaseName()); } else if (field.type().isString()) { - buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field)); + if (fieldDefault(field).equals("null")) { + buffer.printf("if (%s != null) {%n", field.camelCaseName()); + } else { + buffer.printf("if (!%s.equals(%s)) {%n", field.camelCaseName(), fieldDefault(field)); + } } else if (field.type() instanceof FieldType.BoolFieldType) { buffer.printf("if (%s%s) {%n", fieldDefault(field).equals("true") ? "!" : "",
