This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b94c7f4 MINOR: Add ignorable field check to `toStruct` and fix usage
(#7710)
b94c7f4 is described below
commit b94c7f479b917d4ec602c31a24f11390627c479b
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Nov 22 22:05:03 2019 -0800
MINOR: Add ignorable field check to `toStruct` and fix usage (#7710)
If a field is not marked as ignorable, we should raise an exception if it
has been set to a non-default value. This check already exists in
`Message.write`, so this patch adds it to `Message.toStruct`. Additionally, we
fix several fields which should have been marked ignorable and we fix some
related test assertions.
Reviewers: Ismael Juma <[email protected]>, Manikumar Reddy
<[email protected]>, Colin Patrick McCabe <[email protected]>
---
.../common/requests/DescribeGroupsResponse.java | 4 ++-
.../kafka/common/requests/MetadataResponse.java | 2 +-
.../common/message/ApiVersionsRequest.json | 4 +--
.../common/message/OffsetFetchResponse.json | 4 +--
.../common/message/SaslAuthenticateResponse.json | 2 +-
.../common/message/SimpleExampleMessageTest.java | 12 +++++++
.../common/requests/JoinGroupRequestTest.java | 1 -
.../kafka/common/requests/RequestResponseTest.java | 36 ++++++++++-----------
core/src/main/scala/kafka/server/KafkaApis.scala | 26 ++++++++-------
.../kafka/api/BaseAdminIntegrationTest.scala | 6 ++--
.../api/DescribeAuthorizedOperationsTest.scala | 18 +++++------
.../unit/kafka/server/MetadataRequestTest.scala | 33 ++++++++++++-------
.../apache/kafka/message/MessageDataGenerator.java | 37 +++++++++++++---------
13 files changed, 108 insertions(+), 77 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index cb369a0..4ba4ede 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -33,6 +33,8 @@ import java.util.Set;
public class DescribeGroupsResponse extends AbstractResponse {
+ public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
+
/**
* Possible per-group error codes:
*
@@ -135,7 +137,7 @@ public class DescribeGroupsResponse extends
AbstractResponse {
public static DescribedGroup forError(String groupId, Errors error) {
return groupMetadata(groupId, error,
DescribeGroupsResponse.UNKNOWN_STATE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL,
Collections.emptyList(), Collections.emptySet());
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL,
Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
}
public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors
error, List<String> groupIds) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index ef5381b..9b263b1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -232,7 +232,7 @@ public class MetadataResponse extends AbstractResponse {
String topic,
boolean isInternal,
List<PartitionMetadata> partitionMetadata) {
- this(error, topic, isInternal, partitionMetadata, 0);
+ this(error, topic, isInternal, partitionMetadata,
AUTHORIZED_OPERATIONS_OMITTED);
}
public Errors error() {
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json
b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 79fe7a7..66e4511 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -24,8 +24,8 @@
"flexibleVersions": "3+",
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
- "about": "The name of the client." },
+ "ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
- "about": "The version of the client." }
+ "ignorable": true, "about": "The version of the client." }
]
}
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index e0d5e35..97bc6dd 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -44,14 +44,14 @@
{ "name": "CommittedOffset", "type": "int64", "versions": "0+",
"about": "The committed message offset." },
{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+",
"default": "-1",
- "about": "The leader epoch." },
+ "ignorable": true, "about": "The leader epoch." },
{ "name": "Metadata", "type": "string", "versions": "0+",
"nullableVersions": "0+",
"about": "The partition metadata." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]}
]},
- { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0",
"ignorable": false,
+ { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0",
"ignorable": true,
"about": "The top-level error code, or 0 if there was no error." }
]
}
diff --git
a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
index 66dfe74..1ad665f 100644
--- a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
+++ b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
@@ -27,7 +27,7 @@
"about": "The error message, or null if there was no error." },
{ "name": "AuthBytes", "type": "bytes", "versions": "0+",
"about": "The SASL authentication bytes from the server, as defined by
the SASL mechanism." },
- { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+",
"default": "0",
+ { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+",
"default": "0", "ignorable": true,
"about": "The SASL authentication bytes from the server, as defined by
the SASL mechanism." }
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 4c697f2..8d09fa7 100644
---
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.message;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.types.Struct;
@@ -31,6 +32,7 @@ import java.util.function.Consumer;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class SimpleExampleMessageTest {
@@ -54,6 +56,16 @@ public class SimpleExampleMessageTest {
}
@Test
+ public void shouldThrowIfCannotWriteNonIgnorableField() {
+ // processId is not supported in v0 and is not marked as ignorable
+
+ final SimpleExampleMessageData out = new
SimpleExampleMessageData().setProcessId(UUID.randomUUID());
+ assertThrows(UnsupportedVersionException.class, () ->
+ out.write(new ByteBufferAccessor(ByteBuffer.allocate(64)), new
ObjectSerializationCache(), (short) 0));
+ assertThrows(UnsupportedVersionException.class, () ->
out.toStruct((short) 0));
+ }
+
+ @Test
public void shouldDefaultField() {
final SimpleExampleMessageData out = new SimpleExampleMessageData();
assertEquals(UUID.fromString("00000000-0000-0000-0000-000000000000"),
out.processId());
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 9d8031c..125a328 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -86,7 +86,6 @@ public class JoinGroupRequestTest {
Struct struct = new JoinGroupRequestData()
.setGroupId("groupId")
.setMemberId("consumerId")
- .setGroupInstanceId("groupInstanceId")
.setProtocolType("consumer")
.setSessionTimeoutMs(sessionTimeoutMs)
.toStruct((short) 0);
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d5db9b2..f8f9dd4 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
@@ -34,30 +34,32 @@ import
org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
import
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
import
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
-import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
@@ -68,15 +70,11 @@ import
org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -87,6 +85,8 @@ import
org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
@@ -1039,12 +1039,12 @@ public class RequestResponseTest {
DescribeGroupsResponseData.DescribedGroupMember member =
DescribeGroupsResponse.groupMember("memberId", null,
clientId, clientHost, new byte[0], new byte[0]);
DescribedGroup metadata =
DescribeGroupsResponse.groupMetadata("test-group",
-
Errors.NONE,
-
"STABLE",
-
"consumer",
-
"roundrobin",
-
Collections.singletonList(member),
-
Collections.emptySet());
+ Errors.NONE,
+ "STABLE",
+ "consumer",
+ "roundrobin",
+ Collections.singletonList(member),
+ DescribeGroupsResponse.AUTHORIZED_OPERATIONS_OMITTED);
describeGroupsResponseData.groups().add(metadata);
return new DescribeGroupsResponse(describeGroupsResponseData);
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4c83423..12621ad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1129,18 +1129,22 @@ class KafkaApis(val requestChannel: RequestChannel,
getTopicMetadata(metadataRequest.allowAutoTopicCreation,
authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
- var clusterAuthorizedOperations = 0
-
+ var clusterAuthorizedOperations = Int.MinValue
if (request.header.apiVersion >= 8) {
// get cluster authorized operations
- if (metadataRequest.data().includeClusterAuthorizedOperations() &&
- authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
- clusterAuthorizedOperations = authorizedOperations(request,
Resource.CLUSTER)
+ if (metadataRequest.data.includeClusterAuthorizedOperations) {
+ if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+ clusterAuthorizedOperations = authorizedOperations(request,
Resource.CLUSTER)
+ else
+ clusterAuthorizedOperations = 0
+ }
+
// get topic authorized operations
- if (metadataRequest.data().includeTopicAuthorizedOperations())
- topicMetadata.foreach(topicData => {
- topicData.authorizedOperations(authorizedOperations(request, new
Resource(ResourceType.TOPIC, topicData.topic())))
- })
+ if (metadataRequest.data.includeTopicAuthorizedOperations) {
+ topicMetadata.foreach { topicData =>
+ topicData.authorizedOperations(authorizedOperations(request, new
Resource(ResourceType.TOPIC, topicData.topic)))
+ }
+ }
}
val completeTopicMetadata = topicMetadata ++
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
@@ -1337,10 +1341,8 @@ class KafkaApis(val requestChannel: RequestChannel,
.setMembers(members.asJava)
if (request.header.apiVersion >= 3) {
- if (error == Errors.NONE &&
describeRequest.data().includeAuthorizedOperations()) {
+ if (error == Errors.NONE &&
describeRequest.data.includeAuthorizedOperations) {
describedGroup.setAuthorizedOperations(authorizedOperations(request, new
Resource(ResourceType.GROUP, groupId)))
- } else {
- describedGroup.setAuthorizedOperations(0)
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index dd115b2..d680dfb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -158,7 +158,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
// without includeAuthorizedOperations flag
var result = client.describeCluster
- assertEquals(Set().asJava, result.authorizedOperations().get())
+ assertNull(result.authorizedOperations().get())
//with includeAuthorizedOperations flag
result = client.describeCluster(new
DescribeClusterOptions().includeAuthorizedOperations(true))
@@ -172,7 +172,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
// without includeAuthorizedOperations flag
var topicResult = getTopicMetadata(client, topic)
- assertEquals(Set().asJava, topicResult.authorizedOperations)
+ assertNull(topicResult.authorizedOperations)
//with includeAuthorizedOperations flag
topicResult = getTopicMetadata(client, topic, new
DescribeTopicsOptions().includeAuthorizedOperations(true))
@@ -181,7 +181,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
assertEquals(expectedOperations, topicResult.authorizedOperations)
}
- def configuredClusterPermissions() : Set[AclOperation] = {
+ def configuredClusterPermissions(): Set[AclOperation] = {
Cluster.supportedOperations.map(operation => operation.toJava)
}
diff --git
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index bda8271..3022af1 100644
---
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -21,14 +21,14 @@ import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, DESCRIBE,
CLUSTER_ACTION}
+import org.apache.kafka.common.acl.AclOperation.{ALTER, CLUSTER_ACTION,
DESCRIBE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
-import org.junit.Assert.{assertEquals, assertFalse}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull}
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
@@ -96,7 +96,7 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP,
group3, PatternType.LITERAL),
new AccessControlEntry("User:" +
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE,
AclPermissionType.ALLOW))
- val clusteAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
+ val clusterAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
new AccessControlEntry("User:" +
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL,
AclPermissionType.ALLOW))
val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
topic1, PatternType.LITERAL),
@@ -145,7 +145,7 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
// test without includeAuthorizedOperations flag
var clusterDescribeResult = client.describeCluster()
- assertEquals(Set(),
clusterDescribeResult.authorizedOperations().get().asScala.toSet)
+ assertNull(clusterDescribeResult.authorizedOperations.get())
//test with includeAuthorizedOperations flag, we have give Alter permission
// in configureSecurityBeforeServersStart()
@@ -155,8 +155,8 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
clusterDescribeResult.authorizedOperations().get().asScala.toSet)
// enable all operations for cluster resource
- val results = client.createAcls(List(clusteAllAcl).asJava)
- assertEquals(Set(clusteAllAcl), results.values.keySet.asScala)
+ val results = client.createAcls(List(clusterAllAcl).asJava)
+ assertEquals(Set(clusterAllAcl), results.values.keySet.asScala)
results.all.get
val expectedOperations = Cluster.supportedOperations
@@ -175,8 +175,8 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
// test without includeAuthorizedOperations flag
var describeTopicsResult = client.describeTopics(Set(topic1,
topic2).asJava).all.get()
- assertEquals(Set(),
describeTopicsResult.get(topic1).authorizedOperations().asScala.toSet)
- assertEquals(Set(),
describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
+ assertNull(describeTopicsResult.get(topic1).authorizedOperations)
+ assertNull(describeTopicsResult.get(topic2).authorizedOperations)
//test with includeAuthorizedOperations flag
describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava,
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 2b3f321..df812e8 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -22,13 +22,15 @@ import java.util.Properties
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.test.TestUtils.isValidClusterId
import org.junit.Assert._
import org.junit.{Before, Test}
-import org.apache.kafka.test.TestUtils.isValidClusterId
+import org.scalatest.Assertions.intercept
import scala.collection.JavaConverters._
import scala.collection.Seq
@@ -126,8 +128,7 @@ class MetadataRequestTest extends BaseRequestTest {
@Test
def testAutoTopicCreation(): Unit = {
- def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String,
response: MetadataResponse): Unit = {
- assertNull(response.errors.get(existingTopic))
+ def checkAutoCreatedTopic(autoCreatedTopic: String, response:
MetadataResponse): Unit = {
assertEquals(Errors.LEADER_NOT_AVAILABLE,
response.errors.get(autoCreatedTopic))
assertEquals(Some(servers.head.config.numPartitions),
zkClient.getTopicPartitionCount(autoCreatedTopic))
for (i <- 0 until servers.head.config.numPartitions)
@@ -138,20 +139,28 @@ class MetadataRequestTest extends BaseRequestTest {
val topic2 = "t2"
val topic3 = "t3"
val topic4 = "t4"
- createTopic(topic1, 1, 1)
+ val topic5 = "t5"
+ createTopic(topic1, numPartitions = 1, replicationFactor = 1)
- val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true,
ApiKeys.METADATA.latestVersion).build())
- checkAutoCreatedTopic(topic1, topic2, response1)
+ val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
+ assertNull(response1.errors.get(topic1))
+ checkAutoCreatedTopic(topic2, response1)
- // V3 doesn't support a configurable allowAutoTopicCreation, so the fact
that we set it to `false` has no effect
- val response2 = sendMetadataRequest(new
MetadataRequest(requestData(List(topic2, topic3), false), 3.toShort))
- checkAutoCreatedTopic(topic2, topic3, response2)
+ // The default behavior in old versions of the metadata API is to allow
topic creation, so
+ // protocol downgrades should happen gracefully when auto-creation is
explicitly requested.
+ val response2 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
+ checkAutoCreatedTopic(topic3, response2)
+
+ // V3 doesn't support a configurable allowAutoTopicCreation, so disabling
auto-creation is not supported
+ intercept[UnsupportedVersionException] {
+ sendMetadataRequest(new MetadataRequest(requestData(List(topic4),
false), 3.toShort))
+ }
// V4 and higher support a configurable allowAutoTopicCreation
- val response3 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
- assertNull(response3.errors.get(topic3))
+ val response3 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic4))
- assertEquals(None, zkClient.getTopicPartitionCount(topic4))
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic5))
+ assertEquals(None, zkClient.getTopicPartitionCount(topic5))
}
@Test
diff --git
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index e1a41d6..263da12 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -843,6 +843,21 @@ public final class MessageDataGenerator {
}
}
+ private void maybeGenerateNonIgnorableFieldCheck(FieldSpec field,
VersionConditional cond) {
+ if (!field.ignorable()) {
+ cond.ifNotMember(__ -> {
+ generateNonDefaultValueCheck(field, field.nullableVersions());
+ buffer.incrementIndent();
+
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
+ buffer.printf("throw new UnsupportedVersionException(" +
+ "\"Attempted to write a non-default %s at
version \" + _version);%n",
+ field.camelCaseName());
+ buffer.decrementIndent();
+ buffer.printf("}%n");
+ });
+ }
+ }
+
private void generateClassWriter(String className, StructSpec struct,
Versions parentVersions) {
headerGenerator.addImport(MessageGenerator.WRITABLE_CLASS);
@@ -908,18 +923,8 @@ public final class MessageDataGenerator {
}).
generate(buffer);
});
- if (!field.ignorable()) {
- cond.ifNotMember(__ -> {
- generateNonDefaultValueCheck(field,
field.nullableVersions());
- buffer.incrementIndent();
-
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
- buffer.printf("throw new UnsupportedVersionException(" +
- "\"Attempted to write a non-default %s at version
\" + _version);%n",
- field.camelCaseName());
- buffer.decrementIndent();
- buffer.printf("}%n");
- });
- }
+
+ maybeGenerateNonIgnorableFieldCheck(field, cond);
cond.generate(buffer);
}
headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS);
@@ -1216,7 +1221,7 @@ public final class MessageDataGenerator {
generate(buffer);
buffer.printf("Struct struct = new Struct(SCHEMAS[_version]);%n");
for (FieldSpec field : struct.fields()) {
- VersionConditional.forVersions(field.versions(), curVersions).
+ VersionConditional cond =
VersionConditional.forVersions(field.versions(), curVersions).
alwaysEmitBlockScope(field.type().isArray()).
ifMember(presentVersions -> {
VersionConditional.forVersions(field.taggedVersions(),
presentVersions).
@@ -1231,8 +1236,10 @@ public final class MessageDataGenerator {
buffer.printf("}%n");
}).
generate(buffer);
- }).
- generate(buffer);
+ });
+
+ maybeGenerateNonIgnorableFieldCheck(field, cond);
+ cond.generate(buffer);
}
VersionConditional.forVersions(messageFlexibleVersions, curVersions).
ifMember(__ -> {