This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b94c7f4  MINOR: Add ignorable field check to `toStruct` and fix usage 
(#7710)
b94c7f4 is described below

commit b94c7f479b917d4ec602c31a24f11390627c479b
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Nov 22 22:05:03 2019 -0800

    MINOR: Add ignorable field check to `toStruct` and fix usage (#7710)
    
    If a field is not marked as ignorable, we should raise an exception if it 
has been set to a non-default value. This check already exists in 
`Message.write`, so this patch adds it to `Message.toStruct`. Additionally, we 
fix several fields which should have been marked ignorable and we fix some 
related test assertions.
    
    Reviewers: Ismael Juma <[email protected]>, Manikumar Reddy 
<[email protected]>, Colin Patrick McCabe <[email protected]>
---
 .../common/requests/DescribeGroupsResponse.java    |  4 ++-
 .../kafka/common/requests/MetadataResponse.java    |  2 +-
 .../common/message/ApiVersionsRequest.json         |  4 +--
 .../common/message/OffsetFetchResponse.json        |  4 +--
 .../common/message/SaslAuthenticateResponse.json   |  2 +-
 .../common/message/SimpleExampleMessageTest.java   | 12 +++++++
 .../common/requests/JoinGroupRequestTest.java      |  1 -
 .../kafka/common/requests/RequestResponseTest.java | 36 ++++++++++-----------
 core/src/main/scala/kafka/server/KafkaApis.scala   | 26 ++++++++-------
 .../kafka/api/BaseAdminIntegrationTest.scala       |  6 ++--
 .../api/DescribeAuthorizedOperationsTest.scala     | 18 +++++------
 .../unit/kafka/server/MetadataRequestTest.scala    | 33 ++++++++++++-------
 .../apache/kafka/message/MessageDataGenerator.java | 37 +++++++++++++---------
 13 files changed, 108 insertions(+), 77 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index cb369a0..4ba4ede 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -33,6 +33,8 @@ import java.util.Set;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
+    public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
+
     /**
      * Possible per-group error codes:
      *
@@ -135,7 +137,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
 
     public static DescribedGroup forError(String groupId, Errors error) {
         return groupMetadata(groupId, error, 
DescribeGroupsResponse.UNKNOWN_STATE, 
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-                DescribeGroupsResponse.UNKNOWN_PROTOCOL, 
Collections.emptyList(), Collections.emptySet());
+                DescribeGroupsResponse.UNKNOWN_PROTOCOL, 
Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors 
error, List<String> groupIds) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index ef5381b..9b263b1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -232,7 +232,7 @@ public class MetadataResponse extends AbstractResponse {
                              String topic,
                              boolean isInternal,
                              List<PartitionMetadata> partitionMetadata) {
-            this(error, topic, isInternal, partitionMetadata, 0);
+            this(error, topic, isInternal, partitionMetadata, 
AUTHORIZED_OPERATIONS_OMITTED);
         }
 
         public Errors error() {
diff --git a/clients/src/main/resources/common/message/ApiVersionsRequest.json 
b/clients/src/main/resources/common/message/ApiVersionsRequest.json
index 79fe7a7..66e4511 100644
--- a/clients/src/main/resources/common/message/ApiVersionsRequest.json
+++ b/clients/src/main/resources/common/message/ApiVersionsRequest.json
@@ -24,8 +24,8 @@
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ClientSoftwareName", "type": "string", "versions": "3+",
-      "about": "The name of the client." },
+      "ignorable": true, "about": "The name of the client." },
     { "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
-      "about": "The version of the client." }
+      "ignorable": true, "about": "The version of the client." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json 
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index e0d5e35..97bc6dd 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -44,14 +44,14 @@
         { "name": "CommittedOffset", "type": "int64", "versions": "0+",
           "about": "The committed message offset." },
         { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", 
"default": "-1",
-          "about": "The leader epoch." },
+          "ignorable": true, "about": "The leader epoch." },
         { "name": "Metadata", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
           "about": "The partition metadata." },
         { "name": "ErrorCode", "type": "int16", "versions": "0+",
           "about": "The error code, or 0 if there was no error." }
       ]}
     ]},
-    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": false,
+    { "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", 
"ignorable": true,
       "about": "The top-level error code, or 0 if there was no error." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json 
b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
index 66dfe74..1ad665f 100644
--- a/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
+++ b/clients/src/main/resources/common/message/SaslAuthenticateResponse.json
@@ -27,7 +27,7 @@
       "about": "The error message, or null if there was no error." },
     { "name": "AuthBytes", "type": "bytes", "versions": "0+",
       "about": "The SASL authentication bytes from the server, as defined by 
the SASL mechanism." },
-    { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", 
"default": "0",
+    { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", 
"default": "0", "ignorable": true,
       "about": "The SASL authentication bytes from the server, as defined by 
the SASL mechanism." }
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
index 4c697f2..8d09fa7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/message/SimpleExampleMessageTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.message;
 
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -31,6 +32,7 @@ import java.util.function.Consumer;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class SimpleExampleMessageTest {
@@ -54,6 +56,16 @@ public class SimpleExampleMessageTest {
     }
 
     @Test
+    public void shouldThrowIfCannotWriteNonIgnorableField() {
+        // processId is not supported in v0 and is not marked as ignorable
+
+        final SimpleExampleMessageData out = new 
SimpleExampleMessageData().setProcessId(UUID.randomUUID());
+        assertThrows(UnsupportedVersionException.class, () ->
+                out.write(new ByteBufferAccessor(ByteBuffer.allocate(64)), new 
ObjectSerializationCache(), (short) 0));
+        assertThrows(UnsupportedVersionException.class, () -> 
out.toStruct((short) 0));
+    }
+
+    @Test
     public void shouldDefaultField() {
         final SimpleExampleMessageData out = new SimpleExampleMessageData();
         assertEquals(UUID.fromString("00000000-0000-0000-0000-000000000000"), 
out.processId());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 9d8031c..125a328 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -86,7 +86,6 @@ public class JoinGroupRequestTest {
         Struct struct = new JoinGroupRequestData()
                 .setGroupId("groupId")
                 .setMemberId("consumerId")
-                .setGroupInstanceId("groupInstanceId")
                 .setProtocolType("consumer")
                 .setSessionTimeoutMs(sessionTimeoutMs)
                 .toStruct((short) 0);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d5db9b2..f8f9dd4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -34,30 +34,32 @@ import 
org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ApiVersionsRequestData;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import 
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
 import 
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
 import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
-import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
 import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
 import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
@@ -68,15 +70,11 @@ import 
org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
-import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -87,6 +85,8 @@ import 
org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.OffsetDeleteRequestData;
@@ -1039,12 +1039,12 @@ public class RequestResponseTest {
         DescribeGroupsResponseData.DescribedGroupMember member = 
DescribeGroupsResponse.groupMember("memberId", null,
                 clientId, clientHost, new byte[0], new byte[0]);
         DescribedGroup metadata = 
DescribeGroupsResponse.groupMetadata("test-group",
-                                                                       
Errors.NONE,
-                                                                       
"STABLE",
-                                                                       
"consumer",
-                                                                       
"roundrobin",
-                                                                       
Collections.singletonList(member),
-                                                                       
Collections.emptySet());
+                Errors.NONE,
+                "STABLE",
+                "consumer",
+                "roundrobin",
+                Collections.singletonList(member),
+                DescribeGroupsResponse.AUTHORIZED_OPERATIONS_OMITTED);
         describeGroupsResponseData.groups().add(metadata);
         return new DescribeGroupsResponse(describeGroupsResponseData);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4c83423..12621ad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1129,18 +1129,22 @@ class KafkaApis(val requestChannel: RequestChannel,
         getTopicMetadata(metadataRequest.allowAutoTopicCreation, 
authorizedTopics, request.context.listenerName,
           errorUnavailableEndpoints, errorUnavailableListeners)
 
-    var clusterAuthorizedOperations = 0
-
+    var clusterAuthorizedOperations = Int.MinValue
     if (request.header.apiVersion >= 8) {
       // get cluster authorized operations
-      if (metadataRequest.data().includeClusterAuthorizedOperations() &&
-        authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
-        clusterAuthorizedOperations = authorizedOperations(request, 
Resource.CLUSTER)
+      if (metadataRequest.data.includeClusterAuthorizedOperations) {
+        if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
+          clusterAuthorizedOperations = authorizedOperations(request, 
Resource.CLUSTER)
+        else
+          clusterAuthorizedOperations = 0
+      }
+
       // get topic authorized operations
-      if (metadataRequest.data().includeTopicAuthorizedOperations())
-        topicMetadata.foreach(topicData => {
-          topicData.authorizedOperations(authorizedOperations(request, new 
Resource(ResourceType.TOPIC, topicData.topic())))
-        })
+      if (metadataRequest.data.includeTopicAuthorizedOperations) {
+        topicMetadata.foreach { topicData =>
+          topicData.authorizedOperations(authorizedOperations(request, new 
Resource(ResourceType.TOPIC, topicData.topic)))
+        }
+      }
     }
 
     val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
@@ -1337,10 +1341,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           .setMembers(members.asJava)
 
         if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && 
describeRequest.data().includeAuthorizedOperations()) {
+          if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
             
describedGroup.setAuthorizedOperations(authorizedOperations(request, new 
Resource(ResourceType.GROUP, groupId)))
-          } else {
-            describedGroup.setAuthorizedOperations(0)
           }
         }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index dd115b2..d680dfb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -158,7 +158,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
     // without includeAuthorizedOperations flag
     var result = client.describeCluster
-    assertEquals(Set().asJava, result.authorizedOperations().get())
+    assertNull(result.authorizedOperations().get())
 
     //with includeAuthorizedOperations flag
     result = client.describeCluster(new 
DescribeClusterOptions().includeAuthorizedOperations(true))
@@ -172,7 +172,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
     // without includeAuthorizedOperations flag
     var topicResult = getTopicMetadata(client, topic)
-    assertEquals(Set().asJava, topicResult.authorizedOperations)
+    assertNull(topicResult.authorizedOperations)
 
     //with includeAuthorizedOperations flag
     topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
@@ -181,7 +181,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
     assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
-  def configuredClusterPermissions() : Set[AclOperation] = {
+  def configuredClusterPermissions(): Set[AclOperation] = {
     Cluster.supportedOperations.map(operation => operation.toJava)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index bda8271..3022af1 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -21,14 +21,14 @@ import kafka.security.authorizer.AclAuthorizer
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, DESCRIBE, 
CLUSTER_ACTION}
+import org.apache.kafka.common.acl.AclOperation.{ALTER, CLUSTER_ACTION, 
DESCRIBE}
 import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.Authorizer
-import org.junit.Assert.{assertEquals, assertFalse}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -96,7 +96,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
   val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, 
group3, PatternType.LITERAL),
     new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, 
AclPermissionType.ALLOW))
 
-  val clusteAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
+  val clusterAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
     new AccessControlEntry("User:" + 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
   val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
topic1, PatternType.LITERAL),
@@ -145,7 +145,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 
     // test without includeAuthorizedOperations flag
     var clusterDescribeResult = client.describeCluster()
-    assertEquals(Set(), 
clusterDescribeResult.authorizedOperations().get().asScala.toSet)
+    assertNull(clusterDescribeResult.authorizedOperations.get())
 
     //test with includeAuthorizedOperations flag, we have give Alter permission
     // in configureSecurityBeforeServersStart()
@@ -155,8 +155,8 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
       clusterDescribeResult.authorizedOperations().get().asScala.toSet)
 
     // enable all operations for cluster resource
-    val results = client.createAcls(List(clusteAllAcl).asJava)
-    assertEquals(Set(clusteAllAcl), results.values.keySet.asScala)
+    val results = client.createAcls(List(clusterAllAcl).asJava)
+    assertEquals(Set(clusterAllAcl), results.values.keySet.asScala)
     results.all.get
 
     val expectedOperations = Cluster.supportedOperations
@@ -175,8 +175,8 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
 
     // test without includeAuthorizedOperations flag
     var describeTopicsResult = client.describeTopics(Set(topic1, 
topic2).asJava).all.get()
-    assertEquals(Set(), 
describeTopicsResult.get(topic1).authorizedOperations().asScala.toSet)
-    assertEquals(Set(), 
describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
+    assertNull(describeTopicsResult.get(topic1).authorizedOperations)
+    assertNull(describeTopicsResult.get(topic2).authorizedOperations)
 
     //test with includeAuthorizedOperations flag
     describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava,
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 2b3f321..df812e8 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -22,13 +22,15 @@ import java.util.Properties
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.MetadataRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.apache.kafka.test.TestUtils.isValidClusterId
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import org.apache.kafka.test.TestUtils.isValidClusterId
+import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 import scala.collection.Seq
@@ -126,8 +128,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testAutoTopicCreation(): Unit = {
-    def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, 
response: MetadataResponse): Unit = {
-      assertNull(response.errors.get(existingTopic))
+    def checkAutoCreatedTopic(autoCreatedTopic: String, response: 
MetadataResponse): Unit = {
       assertEquals(Errors.LEADER_NOT_AVAILABLE, 
response.errors.get(autoCreatedTopic))
       assertEquals(Some(servers.head.config.numPartitions), 
zkClient.getTopicPartitionCount(autoCreatedTopic))
       for (i <- 0 until servers.head.config.numPartitions)
@@ -138,20 +139,28 @@ class MetadataRequestTest extends BaseRequestTest {
     val topic2 = "t2"
     val topic3 = "t3"
     val topic4 = "t4"
-    createTopic(topic1, 1, 1)
+    val topic5 = "t5"
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1)
 
-    val response1 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 
ApiKeys.METADATA.latestVersion).build())
-    checkAutoCreatedTopic(topic1, topic2, response1)
+    val response1 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
+    assertNull(response1.errors.get(topic1))
+    checkAutoCreatedTopic(topic2, response1)
 
-    // V3 doesn't support a configurable allowAutoTopicCreation, so the fact 
that we set it to `false` has no effect
-    val response2 = sendMetadataRequest(new 
MetadataRequest(requestData(List(topic2, topic3), false), 3.toShort))
-    checkAutoCreatedTopic(topic2, topic3, response2)
+    // The default behavior in old versions of the metadata API is to allow 
topic creation, so
+    // protocol downgrades should happen gracefully when auto-creation is 
explicitly requested.
+    val response2 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
+    checkAutoCreatedTopic(topic3, response2)
+
+    // V3 doesn't support a configurable allowAutoTopicCreation, so disabling 
auto-creation is not supported
+    intercept[UnsupportedVersionException] {
+      sendMetadataRequest(new MetadataRequest(requestData(List(topic4), 
false), 3.toShort))
+    }
 
     // V4 and higher support a configurable allowAutoTopicCreation
-    val response3 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
-    assertNull(response3.errors.get(topic3))
+    val response3 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic4))
-    assertEquals(None, zkClient.getTopicPartitionCount(topic4))
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic5))
+    assertEquals(None, zkClient.getTopicPartitionCount(topic5))
   }
 
   @Test
diff --git 
a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java 
b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index e1a41d6..263da12 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -843,6 +843,21 @@ public final class MessageDataGenerator {
         }
     }
 
+    private void maybeGenerateNonIgnorableFieldCheck(FieldSpec field, 
VersionConditional cond) {
+        if (!field.ignorable()) {
+            cond.ifNotMember(__ -> {
+                generateNonDefaultValueCheck(field, field.nullableVersions());
+                buffer.incrementIndent();
+                
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
+                buffer.printf("throw new UnsupportedVersionException(" +
+                                "\"Attempted to write a non-default %s at 
version \" + _version);%n",
+                        field.camelCaseName());
+                buffer.decrementIndent();
+                buffer.printf("}%n");
+            });
+        }
+    }
+
     private void generateClassWriter(String className, StructSpec struct,
             Versions parentVersions) {
         headerGenerator.addImport(MessageGenerator.WRITABLE_CLASS);
@@ -908,18 +923,8 @@ public final class MessageDataGenerator {
                         }).
                         generate(buffer);
                 });
-            if (!field.ignorable()) {
-                cond.ifNotMember(__ -> {
-                    generateNonDefaultValueCheck(field, 
field.nullableVersions());
-                    buffer.incrementIndent();
-                    
headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
-                    buffer.printf("throw new UnsupportedVersionException(" +
-                            "\"Attempted to write a non-default %s at version 
\" + _version);%n",
-                        field.camelCaseName());
-                    buffer.decrementIndent();
-                    buffer.printf("}%n");
-                });
-            }
+
+            maybeGenerateNonIgnorableFieldCheck(field, cond);
             cond.generate(buffer);
         }
         
headerGenerator.addImport(MessageGenerator.RAW_TAGGED_FIELD_WRITER_CLASS);
@@ -1216,7 +1221,7 @@ public final class MessageDataGenerator {
             generate(buffer);
         buffer.printf("Struct struct = new Struct(SCHEMAS[_version]);%n");
         for (FieldSpec field : struct.fields()) {
-            VersionConditional.forVersions(field.versions(), curVersions).
+            VersionConditional cond = 
VersionConditional.forVersions(field.versions(), curVersions).
                 alwaysEmitBlockScope(field.type().isArray()).
                 ifMember(presentVersions -> {
                     VersionConditional.forVersions(field.taggedVersions(), 
presentVersions).
@@ -1231,8 +1236,10 @@ public final class MessageDataGenerator {
                             buffer.printf("}%n");
                         }).
                         generate(buffer);
-                }).
-                generate(buffer);
+                });
+
+            maybeGenerateNonIgnorableFieldCheck(field, cond);
+            cond.generate(buffer);
         }
         VersionConditional.forVersions(messageFlexibleVersions, curVersions).
             ifMember(__ -> {

Reply via email to