This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f11090 HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392
(#6780)
4f11090 is described below
commit 4f110905975188d0855dc0c7b168724679aae97d
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue May 21 22:51:56 2019 -0700
HOTFIX: Fix recent protocol breakage from KIP-345 and KIP-392 (#6780)
KIP-345 and KIP-392 introduced a couple breaking changes for old versions
of bumped protocols. This patch fixes them.
Reviewers: Colin Patrick McCabe <[email protected]>, Ismael Juma
<[email protected]>, Boyang Chen <[email protected]>, Guozhang Wang
<[email protected]>
---
.../requests/OffsetsForLeaderEpochRequest.java | 2 +-
.../resources/common/message/HeartbeatRequest.json | 3 +-
.../resources/common/message/JoinGroupRequest.json | 3 +-
.../common/message/OffsetCommitRequest.json | 3 +-
.../resources/common/message/SyncGroupRequest.json | 3 +-
.../apache/kafka/common/message/MessageTest.java | 196 +++++++++++++++------
.../kafka/common/requests/RequestResponseTest.java | 9 +-
.../apache/kafka/message/MessageDataGenerator.java | 6 +-
8 files changed, 165 insertions(+), 60 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 6599a70..d3df6cf 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -166,7 +166,7 @@ public class OffsetsForLeaderEpochRequest extends
AbstractRequest {
@Override
protected Struct toStruct() {
Struct requestStruct = new
Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
- requestStruct.set(REPLICA_ID, replicaId);
+ requestStruct.setIfExists(REPLICA_ID, replicaId);
Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs =
CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json
b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 148e661..aa7f337 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID." },
- { "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+ "nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end
user." }
]
}
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json
b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 6db24da..8e58b87 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -34,7 +34,8 @@
"about": "The maximum time in milliseconds that the coordinator will
wait for each member to rejoin when rebalancing the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member id assigned by the group coordinator." },
- { "name": "GroupInstanceId", "type": "string", "versions": "5+",
"nullableVersions": "5+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "5+",
+ "nullableVersions": "5+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end
user." },
{ "name": "ProtocolType", "type": "string", "versions": "0+",
"about": "The unique name the for class of protocols implemented by the
group we want to join." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 0ad7565..adda079 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -35,7 +35,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable":
true,
"about": "The member ID assigned by the group coordinator." },
- { "name": "GroupInstanceId", "type": "string", "versions": "7+",
"ignorable": true, "nullableVersions": "7+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "7+",
+ "nullableVersions": "7+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end
user." },
{ "name": "RetentionTimeMs", "type": "int64", "versions": "2-4",
"default": "-1", "ignorable": true,
"about": "The time period in ms to retain the offset." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json
b/clients/src/main/resources/common/message/SyncGroupRequest.json
index 282cb2a..3ace70d 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -27,7 +27,8 @@
"about": "The generation of the group." },
{ "name": "MemberId", "type": "string", "versions": "0+",
"about": "The member ID assigned by the group." },
- { "name": "GroupInstanceId", "type": "string", "versions": "3+",
"nullableVersions": "3+",
+ { "name": "GroupInstanceId", "type": "string", "versions": "3+",
+ "nullableVersions": "3+", "default": "null",
"about": "The unique identifier of the consumer instance provided by end
user." },
{ "name": "Assignments", "type": "[]SyncGroupRequestAssignment",
"versions": "0+",
"about": "Each assignment.", "fields": [
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 1780a71..547dbc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.common.message;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
@@ -25,22 +27,20 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.Utils;
-import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -49,50 +49,127 @@ public final class MessageTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
- /**
- * Test serializing and deserializing some messages.
- */
@Test
- public void testRoundTrips() throws Exception {
- testMessageRoundTrips(new MetadataRequestData().setTopics(
- Arrays.asList(new
MetadataRequestData.MetadataRequestTopic().setName("foo"),
- new MetadataRequestData.MetadataRequestTopic().setName("bar")
- )), (short) 6);
- testMessageRoundTrips(new AddOffsetsToTxnRequestData().
- setTransactionalId("foobar").
- setProducerId(0xbadcafebadcafeL).
- setProducerEpoch((short) 123).
- setGroupId("baaz"), (short) 1);
- testMessageRoundTrips(new AddOffsetsToTxnResponseData().
- setThrottleTimeMs(42).
- setErrorCode((short) 0), (short) 0);
- testMessageRoundTrips(new AddPartitionsToTxnRequestData().
- setTransactionalId("blah").
- setProducerId(0xbadcafebadcafeL).
- setProducerEpoch((short) 30000).
- setTopics(new
AddPartitionsToTxnTopicCollection(Collections.singletonList(
- new AddPartitionsToTxnTopic().
- setName("Topic").
- setPartitions(Collections.singletonList(1))).iterator())));
- testMessageRoundTrips(new CreateTopicsRequestData().
- setTimeoutMs(1000).setTopics(new
CreateTopicsRequestData.CreatableTopicCollection()));
- testMessageRoundTrips(new DescribeAclsRequestData().
- setResourceType((byte) 42).
- setResourceNameFilter(null).
- setResourcePatternType((byte) 3).
- setPrincipalFilter("abc").
- setHostFilter(null).
- setOperation((byte) 0).
- setPermissionType((byte) 0), (short) 0);
- testMessageRoundTrips(new MetadataRequestData().
- setTopics(null).
- setAllowAutoTopicCreation(false).
- setIncludeClusterAuthorizedOperations(false).
- setIncludeTopicAuthorizedOperations(false));
+ public void testAddOffsetsToTxnVersions() throws Exception {
+ testAllMessageRoundTrips(new AddOffsetsToTxnRequestData().
+ setTransactionalId("foobar").
+ setProducerId(0xbadcafebadcafeL).
+ setProducerEpoch((short) 123).
+ setGroupId("baaz"));
+ testAllMessageRoundTrips(new AddOffsetsToTxnResponseData().
+ setThrottleTimeMs(42).
+ setErrorCode((short) 0));
+ }
+
+ @Test
+ public void testAddPartitionsToTxnVersions() throws Exception {
+ testAllMessageRoundTrips(new AddPartitionsToTxnRequestData().
+ setTransactionalId("blah").
+ setProducerId(0xbadcafebadcafeL).
+ setProducerEpoch((short) 30000).
+ setTopics(new
AddPartitionsToTxnTopicCollection(Collections.singletonList(
+ new AddPartitionsToTxnTopic().
+ setName("Topic").
+
setPartitions(Collections.singletonList(1))).iterator())));
+ }
+
+ @Test
+ public void testCreateTopicsVersions() throws Exception {
+ testAllMessageRoundTrips(new CreateTopicsRequestData().
+ setTimeoutMs(1000).setTopics(new
CreateTopicsRequestData.CreatableTopicCollection()));
}
- private void testMessageRoundTrips(Message message) throws Exception {
- testMessageRoundTrips(message, message.highestSupportedVersion());
+ @Test
+ public void testDescribeAclsRequest() throws Exception {
+ testAllMessageRoundTrips(new DescribeAclsRequestData().
+ setResourceType((byte) 42).
+ setResourceNameFilter(null).
+ setResourcePatternType((byte) 3).
+ setPrincipalFilter("abc").
+ setHostFilter(null).
+ setOperation((byte) 0).
+ setPermissionType((byte) 0));
+ }
+
+ @Test
+ public void testMetadataVersions() throws Exception {
+ testAllMessageRoundTrips(new MetadataRequestData().setTopics(
+ Arrays.asList(new
MetadataRequestData.MetadataRequestTopic().setName("foo"),
+ new
MetadataRequestData.MetadataRequestTopic().setName("bar")
+ )));
+ testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+ setTopics(null).
+ setAllowAutoTopicCreation(true).
+ setIncludeClusterAuthorizedOperations(false).
+ setIncludeTopicAuthorizedOperations(false), (short) 1);
+ testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+ setTopics(null).
+ setAllowAutoTopicCreation(false).
+ setIncludeClusterAuthorizedOperations(false).
+ setIncludeTopicAuthorizedOperations(false), (short) 4);
+ }
+
+ @Test
+ public void testHeartbeatVersions() throws Exception {
+ Supplier<HeartbeatRequestData> newRequest = () -> new
HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setGenerationId(15);
+ testAllMessageRoundTrips(newRequest.get());
+ testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+
testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"),
(short) 3);
+ }
+
+ @Test
+ public void testJoinGroupVersions() throws Exception {
+ Supplier<JoinGroupRequestData> newRequest = () -> new
JoinGroupRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setProtocolType("consumer")
+ .setProtocols(new
JoinGroupRequestData.JoinGroupRequestProtocolCollection())
+ .setSessionTimeoutMs(10000);
+ testAllMessageRoundTrips(newRequest.get());
+
testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000),
(short) 1);
+ testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
+
testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"),
(short) 5);
+ }
+
+ @Test
+ public void testSyncGroupDefaultGroupInstanceId() throws Exception {
+ Supplier<SyncGroupRequestData> request = () -> new
SyncGroupRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setGenerationId(15)
+ .setAssignments(new ArrayList<>());
+ testAllMessageRoundTrips(request.get());
+ testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
+
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"),
(short) 3);
+ }
+
+ @Test
+ public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
+ testAllMessageRoundTrips(new OffsetCommitRequestData()
+ .setTopics(new ArrayList<>())
+ .setGroupId("groupId"));
+
+ Supplier<OffsetCommitRequestData> request = () -> new
OffsetCommitRequestData()
+ .setGroupId("groupId")
+ .setMemberId("memberId")
+ .setTopics(new ArrayList<>())
+ .setGenerationId(15);
+ testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
+
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null),
(short) 1);
+
testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"),
(short) 7);
+ }
+
+ private void testAllMessageRoundTrips(Message message) throws Exception {
+ testAllMessageRoundTripsFromVersion(message,
message.lowestSupportedVersion());
+ }
+
+ private void testAllMessageRoundTripsFromVersion(Message message, short
fromVersion) throws Exception {
+ for (short version = fromVersion; version <
message.highestSupportedVersion(); version++) {
+ testMessageRoundTrips(message, version);
+ }
}
private void testMessageRoundTrips(Message message, short version) throws
Exception {
@@ -294,6 +371,25 @@ public final class MessageTest {
new FetchRequestData.ForgottenTopic().setName("foo"))));
}
+ @Test
+ public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+ // Test non-ignorable string field `groupInstanceId` with default null
+ verifySizeRaisesUve((short) 0, "groupInstanceId", new
HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId")
+ .setGroupInstanceId("instanceId"));
+ verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId")
+ .setGroupInstanceId(null));
+ verifySizeSucceeds((short) 0, new HeartbeatRequestData()
+ .setGroupId("groupId")
+ .setGenerationId(15)
+ .setMemberId("memberId"));
+ }
+
private void verifySizeRaisesUve(short version, String problemFieldName,
Message message) throws Exception {
try {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d7e9223..e8f349f 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -290,9 +290,10 @@ public class RequestResponseTest {
checkRequest(createListOffsetRequest(0), true);
checkErrorResponse(createListOffsetRequest(0), new
UnknownServerException(), true);
checkResponse(createListOffsetResponse(0), 0, true);
- checkRequest(createLeaderEpochRequest(), true);
+ checkRequest(createLeaderEpochRequest(0), true);
+
checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()),
true);
checkResponse(createLeaderEpochResponse(), 0, true);
- checkErrorResponse(createLeaderEpochRequest(), new
UnknownServerException(), true);
+
checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()),
new UnknownServerException(), true);
checkRequest(createAddPartitionsToTxnRequest(), true);
checkErrorResponse(createAddPartitionsToTxnRequest(), new
UnknownServerException(), true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@@ -1250,7 +1251,7 @@ public class RequestResponseTest {
return new InitProducerIdResponse(responseData);
}
- private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+ private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version)
{
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs
= new HashMap<>();
epochs.put(new TopicPartition("topic1", 0),
@@ -1260,7 +1261,7 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic2", 2),
new
OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
- return
OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
epochs).build();
+ return OffsetsForLeaderEpochRequest.Builder.forConsumer((short)
version, epochs).build();
}
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index d6cd5f3..ad00137 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -928,7 +928,11 @@ public final class MessageDataGenerator {
} else if (field.type().isBytes()) {
buffer.printf("if (%s.length != 0) {%n", field.camelCaseName());
} else if (field.type().isString()) {
- buffer.printf("if (%s.equals(%s)) {%n", field.camelCaseName(),
fieldDefault(field));
+ if (fieldDefault(field).equals("null")) {
+ buffer.printf("if (%s != null) {%n", field.camelCaseName());
+ } else {
+ buffer.printf("if (!%s.equals(%s)) {%n",
field.camelCaseName(), fieldDefault(field));
+ }
} else if (field.type() instanceof FieldType.BoolFieldType) {
buffer.printf("if (%s%s) {%n",
fieldDefault(field).equals("true") ? "!" : "",