This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6d72c9e60f KAFKA-18648: Add back support for metadata version 0-3 
(#18716)
e6d72c9e60f is described below

commit e6d72c9e60f4740b72ea21a607e234081252c428
Author: Ismael Juma <[email protected]>
AuthorDate: Tue Jan 28 18:35:33 2025 -0800

    KAFKA-18648: Add back support for metadata version 0-3 (#18716)
    
    During testing, we identified that kafka-python (and aiokafka) relies on 
metadata request v0 and
    hence we need to add these back to comply with the premise of KIP-896 - 
i.e. it should not
    break the clients listed within it.
    
    I reverted the changes from #18218 related to the removal of metadata 
versions 0-3.
    
    I will submit a separate PR to undeprecate these API versions on the 
relevant 3.x branches.
    
    kafka-python (and aiokafka) work correctly (produce & consume) with this 
change on
    top of the 4.0 branch.
    
    Reviewers: David Arthur <[email protected]>
---
 .../resources/common/message/MetadataRequest.json  |  5 +-
 .../resources/common/message/MetadataResponse.json |  7 +--
 .../apache/kafka/common/message/MessageTest.java   |  2 +
 .../kafka/common/requests/MetadataRequestTest.java | 14 ++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  6 +-
 .../unit/kafka/server/MetadataRequestTest.scala    | 70 ++++++++++++++++------
 6 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index c29093239ed..349f88b7c64 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,14 +18,13 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "MetadataRequest",
-  "validVersions": "4-13",
+  "validVersions": "0-13",
   "flexibleVersions": "9+",
   "fields": [
-    // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new 
baseline.
-    //
     // In version 0, an empty array indicates "request metadata for all 
topics."  In version 1 and
     // higher, an empty array indicates "request metadata for no topics," and 
a null array is used to
     // indicate "request metadata for all topics."
+    //
     // Version 2 and 3 are the same as version 1.
     //
     // Version 4 adds AllowAutoTopicCreation.
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json 
b/clients/src/main/resources/common/message/MetadataResponse.json
index 6b31fdcccfc..07ee7010e5e 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -17,11 +17,10 @@
   "apiKey": 3,
   "type": "response",
   "name": "MetadataResponse",
-  // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new 
baseline.
+  // Version 1 adds fields for the rack of each broker, the controller id, and 
whether or not the topic is internal.
   //
-  // Version 1 adds fields for the rack of each broker, the controller id, and
-  // whether or not the topic is internal.
   // Version 2 adds the cluster ID field.
+  //
   // Version 3 adds the throttle time.
   //
   // Version 4 is the same as version 3.
@@ -43,7 +42,7 @@
   // by the DescribeCluster API (KIP-700).
   // Version 12 supports topicId.
   // Version 13 supports top-level error code in the response.
-  "validVersions": "4-13",
+  "validVersions": "0-13",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 2786f5b62d8..638d60fee44 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -1127,6 +1127,8 @@ public final class MessageTest {
         for (short version : ApiKeys.CREATE_TOPICS.allVersions()) {
             verifyWriteRaisesNpe(version, createTopics);
         }
+        MetadataRequestData metadata = new 
MetadataRequestData().setTopics(null);
+        verifyWriteRaisesNpe((short) 0, metadata);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
index c28b54fd398..117d0ced9ba 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
@@ -30,13 +30,23 @@ import java.util.List;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MetadataRequestTest {
 
     @Test
-    public void testEmptyMeansEmptyForAllVersions() {
-        for (int i = ApiKeys.METADATA.oldestVersion(); i < 
MetadataRequestData.SCHEMAS.length; i++) {
+    public void testEmptyMeansAllTopicsV0() {
+        MetadataRequestData data = new MetadataRequestData();
+        MetadataRequest parsedRequest = new MetadataRequest(data, (short) 0);
+        assertTrue(parsedRequest.isAllTopics());
+        assertNull(parsedRequest.topics());
+    }
+
+    @Test
+    public void testEmptyMeansEmptyForVersionsAboveV0() {
+        for (int i = 1; i < MetadataRequestData.SCHEMAS.length; i++) {
             MetadataRequestData data = new MetadataRequestData();
             data.setAllowAutoTopicCreation(true);
             MetadataRequest parsedRequest = new MetadataRequest(data, (short) 
i);
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 5f23c54ee88..30e6f0384fb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -2297,7 +2297,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(quorum: 
String): Unit = {
     removeAllClientAcls()
 
-    for (version <- ApiKeys.METADATA.oldestVersion to 
ApiKeys.METADATA.latestVersion) {
+    // MetadataRequest versions older than 1 are not supported.
+    for (version <- 1 to ApiKeys.METADATA.latestVersion) {
       testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
     }
   }
@@ -2317,7 +2318,8 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     val expectedClusterAuthorizedOperations = Utils.to32BitField(
       acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
 
-    for (version <- ApiKeys.METADATA.oldestVersion to 
ApiKeys.METADATA.latestVersion) {
+    // MetadataRequest versions older than 1 are not supported.
+    for (version <- 1 to ApiKeys.METADATA.latestVersion) {
       testMetadataClusterClusterAuthorizedOperations(version.toShort, 
expectedClusterAuthorizedOperations)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index ddfa9b42d4c..2b2250ff95d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.util.Optional
 import kafka.utils.TestUtils
 import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@@ -40,6 +41,14 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     doSetup(testInfo, createOffsetsTopic = false)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testClusterIdWithRequestVersion1(quorum: String): Unit = {
+    val v1MetadataResponse = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
+    val v1ClusterId = v1MetadataResponse.clusterId
+    assertNull(v1ClusterId, s"v1 clusterId should be null")
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testClusterIdIsValid(quorum: String): Unit = {
@@ -96,17 +105,27 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
   def testAutoTopicCreation(quorum: String): Unit = {
     val topic1 = "t1"
     val topic2 = "t2"
-    val topic3 = "t4"
-    val topic4 = "t5"
+    val topic3 = "t3"
+    val topic4 = "t4"
+    val topic5 = "t5"
     createTopic(topic1)
 
     val response1 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
     assertNull(response1.errors.get(topic1))
     checkAutoCreatedTopic(topic2, response1)
 
-    val response2 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response2.errors.get(topic3))
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response2.errors.get(topic4))
+    // The default behavior in old versions of the metadata API is to allow 
topic creation, so
+    // protocol downgrades should happen gracefully when auto-creation is 
explicitly requested.
+    val response2 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
+    checkAutoCreatedTopic(topic3, response2)
+
+    // V3 doesn't support a configurable allowAutoTopicCreation, so disabling 
auto-creation is not supported
+    assertThrows(classOf[UnsupportedVersionException], () => 
sendMetadataRequest(new MetadataRequest(requestData(List(topic4), 
allowAutoTopicCreation = false), 3.toShort)))
+
+    // V4 and higher support a configurable allowAutoTopicCreation
+    val response3 = sendMetadataRequest(new 
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic4))
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response3.errors.get(topic5))
   }
 
   @ParameterizedTest
@@ -132,10 +151,15 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     createTopic("t1", 3, 2)
     createTopic("t2", 3, 2)
 
-    // v4, Null represents all topics
-    val metadataResponseV1 = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
-    assertTrue(metadataResponseV1.errors.isEmpty, "V4 Response should have no 
errors")
-    assertEquals(2, metadataResponseV1.topicMetadata.size(), "V4 Response 
should have 2 (all) topics")
+    // v0, Empty list represents all topics
+    val metadataResponseV0 = sendMetadataRequest(new 
MetadataRequest(requestData(List(), allowAutoTopicCreation = true), 0.toShort))
+    assertTrue(metadataResponseV0.errors.isEmpty, "V0 Response should have no 
errors")
+    assertEquals(2, metadataResponseV0.topicMetadata.size(), "V0 Response 
should have 2 (all) topics")
+
+    // v1, Null represents all topics
+    val metadataResponseV1 = 
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
+    assertTrue(metadataResponseV1.errors.isEmpty, "V1 Response should have no 
errors")
+    assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response 
should have 2 (all) topics")
   }
 
   @ParameterizedTest
@@ -217,15 +241,25 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
       !response.brokers.asScala.exists(_.id == 
downNode.dataPlaneRequestProcessor.brokerId)
     }, "Replica was not found down", 50000)
 
-    // Validate version 4 returns unavailable replicas with no error
-    val v4MetadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(4))
-    val v4BrokerIds = v4MetadataResponse.brokers().asScala.map(_.id).toSeq
-    assertTrue(v4MetadataResponse.errors.isEmpty, "Response should have no 
errors")
-    assertFalse(v4BrokerIds.contains(downNode.config.brokerId), s"The downed 
broker should not be in the brokers list")
-    assertEquals(1, v4MetadataResponse.topicMetadata.size, "Response should 
have one topic")
-    val v4PartitionMetadata = 
v4MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
-    assertEquals(Errors.NONE, v4PartitionMetadata.error, "PartitionMetadata 
should have no errors")
-    assertEquals(replicaCount, v4PartitionMetadata.replicaIds.size, s"Response 
should have $replicaCount replicas")
+    // Validate version 0 still filters unavailable replicas and contains error
+    val v0MetadataResponse = sendMetadataRequest(new 
MetadataRequest(requestData(List(replicaDownTopic), allowAutoTopicCreation = 
true), 0.toShort))
+    val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no 
errors")
+    assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed 
broker should not be in the brokers list")
+    assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should 
have one topic")
+    val v0PartitionMetadata = 
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE, 
"PartitionMetadata should have an error")
+    assertTrue(v0PartitionMetadata.replicaIds.size == replicaCount - 1, 
s"Response should have ${replicaCount - 1} replicas")
+
+    // Validate version 1 returns unavailable replicas with no error
+    val v1MetadataResponse = sendMetadataRequest(new 
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
+    val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no 
errors")
+    assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed 
broker should not be in the brokers list")
+    assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should 
have one topic")
+    val v1PartitionMetadata = 
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata 
should have no errors")
+    assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response 
should have $replicaCount replicas")
   }
 
   @ParameterizedTest

Reply via email to