This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new a6954a0  KAFKA-12701: NPE in MetadataRequest when using topic IDs 
(#10885)
a6954a0 is described below

commit a6954a0464ea586795eab0a8a585a45272d9a417
Author: Justine Olshan <[email protected]>
AuthorDate: Tue Jun 15 14:09:28 2021 -0700

    KAFKA-12701: NPE in MetadataRequest when using topic IDs (#10885)
    
    We prevent handling MetadataRequests where the topic name is null (to 
prevent NPE) as
    well as prevent requests that set topic IDs since this functionality has 
not yet been
    implemented. When we do implement it  in 
https://github.com/apache/kafka/pull/9769,
    we should bump the request/response version.
    
    Added tests to ensure the error is thrown.
    
    (cherry picked from commit c16711cb8e0d1c03f)
    
    Reviewers: dengziming <[email protected]>, Ismael Juma <[email protected]>
---
 .../kafka/common/requests/MetadataRequest.java     | 21 +++++++++++--
 .../resources/common/message/MetadataRequest.json  |  3 +-
 .../kafka/common/requests/MetadataRequestTest.java | 25 ++++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 11 +++++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 35 ++++++++++++++++++++++
 5 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 816f600..d38e9ac 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.MetadataRequestData;
 import 
org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
@@ -92,6 +93,16 @@ public class MetadataRequest extends AbstractRequest {
             if (!data.allowAutoTopicCreation() && version < 4)
                 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
                         "allowAutoTopicCreation field");
+            if (data.topics() != null) {
+                data.topics().forEach(topic -> {
+                    if (topic.name() == null)
+                        throw new UnsupportedVersionException("MetadataRequest 
version " + version +
+                                " does not support null topic names.");
+                    if (topic.topicId() != Uuid.ZERO_UUID)
+                        throw new UnsupportedVersionException("MetadataRequest 
version " + version +
+                            " does not support non-zero topic IDs.");
+                });
+            }
             return new MetadataRequest(data, version);
         }
 
@@ -117,13 +128,17 @@ public class MetadataRequest extends AbstractRequest {
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         MetadataResponseData responseData = new MetadataResponseData();
-        if (topics() != null) {
-            for (String topic : topics())
+        if (data.topics() != null) {
+            for (MetadataRequestTopic topic : data.topics()) {
+                // the response does not allow null, so convert to empty 
string if necessary
+                String topicName = topic.name() == null ? "" : topic.name();
                 responseData.topics().add(new 
MetadataResponseData.MetadataResponseTopic()
-                    .setName(topic)
+                    .setName(topicName)
+                    .setTopicId(topic.topicId())
                     .setErrorCode(error.code())
                     .setIsInternal(false)
                     .setPartitions(Collections.emptyList()));
+            }
         }
 
         responseData.setThrottleTimeMs(throttleTimeMs);
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index e5083a8..a1634b1 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -33,7 +33,8 @@
     //
     // Version 9 is the first flexible version.
     //
-    // Version 10 adds topicId.
+    // Version 10 adds topicId and allows name field to be null. However, this 
functionality was not implemented on the server.
+    // Versions 10 and 11 should not use the topicId field or set topic name 
to null.
     //
     // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is 
now exposed
     // by the DescribeCluster API (KIP-700).
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
index e515232..74c217d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
@@ -16,16 +16,21 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class MetadataRequestTest {
 
@@ -65,4 +70,24 @@ public class MetadataRequestTest {
         assertEquals(minVersion, builder3.oldestAllowedVersion());
         assertEquals(maxVersion, builder3.latestAllowedVersion());
     }
+
+    @Test
+    public void testTopicIdAndNullTopicNameRequests() {
+        // Construct invalid MetadataRequestTopics. We will build each one 
separately and ensure the error is thrown.
+        List<MetadataRequestData.MetadataRequestTopic> topics = Arrays.asList(
+                new 
MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
+                new MetadataRequestData.MetadataRequestTopic().setName(null),
+                new 
MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
+                new 
MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(Uuid.randomUuid()));
+
+        // if version is 10 or 11, the invalid topic metadata should return an 
error
+        List<Short> invalidVersions = Arrays.asList((short) 10, (short) 11);
+        invalidVersions.forEach(version ->
+            topics.forEach(topic -> {
+                MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
+                MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
+                assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
+            })
+        );
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a9812fb..e3b4947 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1137,6 +1137,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Topic IDs are not supported for versions 10 and 11. Topic names can not 
be null in these versions.
+    if (!metadataRequest.isAllTopics) {
+      metadataRequest.data.topics.forEach{ topic =>
+        if (topic.name == null) {
+          throw new InvalidRequestException(s"Topic name can not be null for 
version ${metadataRequest.version}")
+        } else if (topic.topicId != Uuid.ZERO_UUID) {
+          throw new InvalidRequestException(s"Topic IDs are not supported in 
requests for version ${metadataRequest.version}")
+        }
+      }
+    }
+
     val topics = if (metadataRequest.isAllTopics)
       metadataCache.getAllTopics()
     else
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b89b29c..f059b42 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -1067,6 +1067,41 @@ class KafkaApisTest {
   }
 
   @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {
+    // Construct invalid MetadataRequestTopics. We will try each one 
separately and ensure the error is thrown.
+    val topics = List(new 
MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()),
+      new MetadataRequestData.MetadataRequestTopic().setName(null),
+      new 
MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()),
+      new 
MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager,
+      autoTopicCreationManager, forwardingManager, 
clientControllerQuotaManager, groupCoordinator, txnCoordinator)
+
+    // if version is 10 or 11, the invalid topic metadata should return an 
error
+    val invalidVersions = Set(10, 11)
+    invalidVersions.foreach( version =>
+      topics.foreach(topic => {
+        val metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic))
+        val metadataRequest = new MetadataRequest(metadataRequestData, 
version.toShort)
+        val request = buildRequest(metadataRequest)
+        val kafkaApis = createKafkaApis()
+
+        val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+        
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+        EasyMock.replay(requestChannel)
+        kafkaApis.handle(request)
+
+        val response = readResponse(metadataRequest, 
capturedResponse).asInstanceOf[MetadataResponse]
+        assertEquals(1, response.topicMetadata.size)
+        assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST))
+        response.data.topics.forEach(topic => assertNotEquals(null, 
topic.name))
+        reset(requestChannel)
+      })
+    )
+  }
+
+  @Test
   def testOffsetCommitWithInvalidPartition(): Unit = {
     val topic = "topic"
     addTopicToMetadataCache(topic, numPartitions = 1)

Reply via email to