This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 7139bf08388 KAFKA-18648: Add back support for metadata version 0-3
(#18716)
7139bf08388 is described below
commit 7139bf0838849c2c3de0be45662e2c12bcb269cb
Author: Ismael Juma <[email protected]>
AuthorDate: Tue Jan 28 18:35:33 2025 -0800
KAFKA-18648: Add back support for metadata version 0-3 (#18716)
During testing, we identified that kafka-python (and aiokafka) relies on
metadata request v0 and
hence we need to add these back to comply with the premise of KIP-896 -
i.e. it should not
break the clients listed within it.
I reverted the changes from #18218 related to the removal of metadata
versions 0-3.
I will submit a separate PR to undeprecate these API versions on the
relevant 3.x branches.
kafka-python (and aiokafka) work correctly (produce & consume) with this
change on
top of the 4.0 branch.
Reviewers: David Arthur <[email protected]>
---
.../resources/common/message/MetadataRequest.json | 5 +-
.../resources/common/message/MetadataResponse.json | 7 +--
.../apache/kafka/common/message/MessageTest.java | 2 +
.../kafka/common/requests/MetadataRequestTest.java | 14 ++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 6 +-
.../unit/kafka/server/MetadataRequestTest.scala | 70 ++++++++++++++++------
6 files changed, 75 insertions(+), 29 deletions(-)
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json
b/clients/src/main/resources/common/message/MetadataRequest.json
index c29093239ed..349f88b7c64 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,14 +18,13 @@
"type": "request",
"listeners": ["broker"],
"name": "MetadataRequest",
- "validVersions": "4-13",
+ "validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
- // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new
baseline.
- //
// In version 0, an empty array indicates "request metadata for all
topics." In version 1 and
// higher, an empty array indicates "request metadata for no topics," and
a null array is used to
// indicate "request metadata for all topics."
+ //
// Version 2 and 3 are the same as version 1.
//
// Version 4 adds AllowAutoTopicCreation.
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json
b/clients/src/main/resources/common/message/MetadataResponse.json
index 6b31fdcccfc..07ee7010e5e 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -17,11 +17,10 @@
"apiKey": 3,
"type": "response",
"name": "MetadataResponse",
- // Versions 0-3 were removed in Apache Kafka 4.0, Version 4 is the new
baseline.
+ // Version 1 adds fields for the rack of each broker, the controller id, and
whether or not the topic is internal.
//
- // Version 1 adds fields for the rack of each broker, the controller id, and
- // whether or not the topic is internal.
// Version 2 adds the cluster ID field.
+ //
// Version 3 adds the throttle time.
//
// Version 4 is the same as version 3.
@@ -43,7 +42,7 @@
// by the DescribeCluster API (KIP-700).
// Version 12 supports topicId.
// Version 13 supports top-level error code in the response.
- "validVersions": "4-13",
+ "validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 2786f5b62d8..638d60fee44 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -1127,6 +1127,8 @@ public final class MessageTest {
for (short version : ApiKeys.CREATE_TOPICS.allVersions()) {
verifyWriteRaisesNpe(version, createTopics);
}
+ MetadataRequestData metadata = new
MetadataRequestData().setTopics(null);
+ verifyWriteRaisesNpe((short) 0, metadata);
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
index c28b54fd398..117d0ced9ba 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
@@ -30,13 +30,23 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MetadataRequestTest {
@Test
- public void testEmptyMeansEmptyForAllVersions() {
- for (int i = ApiKeys.METADATA.oldestVersion(); i <
MetadataRequestData.SCHEMAS.length; i++) {
+ public void testEmptyMeansAllTopicsV0() {
+ MetadataRequestData data = new MetadataRequestData();
+ MetadataRequest parsedRequest = new MetadataRequest(data, (short) 0);
+ assertTrue(parsedRequest.isAllTopics());
+ assertNull(parsedRequest.topics());
+ }
+
+ @Test
+ public void testEmptyMeansEmptyForVersionsAboveV0() {
+ for (int i = 1; i < MetadataRequestData.SCHEMAS.length; i++) {
MetadataRequestData data = new MetadataRequestData();
data.setAllowAutoTopicCreation(true);
MetadataRequest parsedRequest = new MetadataRequest(data, (short)
i);
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 5f23c54ee88..30e6f0384fb 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -2297,7 +2297,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(quorum:
String): Unit = {
removeAllClientAcls()
- for (version <- ApiKeys.METADATA.oldestVersion to
ApiKeys.METADATA.latestVersion) {
+ // MetadataRequest versions older than 1 are not supported.
+ for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
}
}
@@ -2317,7 +2318,8 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val expectedClusterAuthorizedOperations = Utils.to32BitField(
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)
- for (version <- ApiKeys.METADATA.oldestVersion to
ApiKeys.METADATA.latestVersion) {
+ // MetadataRequest versions older than 1 are not supported.
+ for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort,
expectedClusterAuthorizedOperations)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index ddfa9b42d4c..2b2250ff95d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import java.util.Optional
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@@ -40,6 +41,14 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
doSetup(testInfo, createOffsetsTopic = false)
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testClusterIdWithRequestVersion1(quorum: String): Unit = {
+ val v1MetadataResponse =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
+ val v1ClusterId = v1MetadataResponse.clusterId
+ assertNull(v1ClusterId, s"v1 clusterId should be null")
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testClusterIdIsValid(quorum: String): Unit = {
@@ -96,17 +105,27 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
def testAutoTopicCreation(quorum: String): Unit = {
val topic1 = "t1"
val topic2 = "t2"
- val topic3 = "t4"
- val topic4 = "t5"
+ val topic3 = "t3"
+ val topic4 = "t4"
+ val topic5 = "t5"
createTopic(topic1)
val response1 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build())
assertNull(response1.errors.get(topic1))
checkAutoCreatedTopic(topic2, response1)
- val response2 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response2.errors.get(topic3))
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response2.errors.get(topic4))
+ // The default behavior in old versions of the metadata API is to allow
topic creation, so
+ // protocol downgrades should happen gracefully when auto-creation is
explicitly requested.
+ val response2 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic3).asJava, true).build(1))
+ checkAutoCreatedTopic(topic3, response2)
+
+ // V3 doesn't support a configurable allowAutoTopicCreation, so disabling
auto-creation is not supported
+ assertThrows(classOf[UnsupportedVersionException], () =>
sendMetadataRequest(new MetadataRequest(requestData(List(topic4),
allowAutoTopicCreation = false), 3.toShort)))
+
+ // V4 and higher support a configurable allowAutoTopicCreation
+ val response3 = sendMetadataRequest(new
MetadataRequest.Builder(Seq(topic4, topic5).asJava, false, 4.toShort).build)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic4))
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response3.errors.get(topic5))
}
@ParameterizedTest
@@ -132,10 +151,15 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
createTopic("t1", 3, 2)
createTopic("t2", 3, 2)
- // v4, Null represents all topics
- val metadataResponseV1 =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(4.toShort))
- assertTrue(metadataResponseV1.errors.isEmpty, "V4 Response should have no
errors")
- assertEquals(2, metadataResponseV1.topicMetadata.size(), "V4 Response
should have 2 (all) topics")
+ // v0, Empty list represents all topics
+ val metadataResponseV0 = sendMetadataRequest(new
MetadataRequest(requestData(List(), allowAutoTopicCreation = true), 0.toShort))
+ assertTrue(metadataResponseV0.errors.isEmpty, "V0 Response should have no
errors")
+ assertEquals(2, metadataResponseV0.topicMetadata.size(), "V0 Response
should have 2 (all) topics")
+
+ // v1, Null represents all topics
+ val metadataResponseV1 =
sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
+ assertTrue(metadataResponseV1.errors.isEmpty, "V1 Response should have no
errors")
+ assertEquals(2, metadataResponseV1.topicMetadata.size(), "V1 Response
should have 2 (all) topics")
}
@ParameterizedTest
@@ -217,15 +241,25 @@ class MetadataRequestTest extends
AbstractMetadataRequestTest {
!response.brokers.asScala.exists(_.id ==
downNode.dataPlaneRequestProcessor.brokerId)
}, "Replica was not found down", 50000)
- // Validate version 4 returns unavailable replicas with no error
- val v4MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(4))
- val v4BrokerIds = v4MetadataResponse.brokers().asScala.map(_.id).toSeq
- assertTrue(v4MetadataResponse.errors.isEmpty, "Response should have no
errors")
- assertFalse(v4BrokerIds.contains(downNode.config.brokerId), s"The downed
broker should not be in the brokers list")
- assertEquals(1, v4MetadataResponse.topicMetadata.size, "Response should
have one topic")
- val v4PartitionMetadata =
v4MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
- assertEquals(Errors.NONE, v4PartitionMetadata.error, "PartitionMetadata
should have no errors")
- assertEquals(replicaCount, v4PartitionMetadata.replicaIds.size, s"Response
should have $replicaCount replicas")
+ // Validate version 0 still filters unavailable replicas and contains error
+ val v0MetadataResponse = sendMetadataRequest(new
MetadataRequest(requestData(List(replicaDownTopic), allowAutoTopicCreation =
true), 0.toShort))
+ val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
+ assertTrue(v0MetadataResponse.errors.isEmpty, "Response should have no
errors")
+ assertFalse(v0BrokerIds.contains(downNode.config.brokerId), s"The downed
broker should not be in the brokers list")
+ assertTrue(v0MetadataResponse.topicMetadata.size == 1, "Response should
have one topic")
+ val v0PartitionMetadata =
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+ assertTrue(v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE,
"PartitionMetadata should have an error")
+ assertTrue(v0PartitionMetadata.replicaIds.size == replicaCount - 1,
s"Response should have ${replicaCount - 1} replicas")
+
+ // Validate version 1 returns unavailable replicas with no error
+ val v1MetadataResponse = sendMetadataRequest(new
MetadataRequest.Builder(List(replicaDownTopic).asJava, true).build(1))
+ val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
+ assertTrue(v1MetadataResponse.errors.isEmpty, "Response should have no
errors")
+ assertFalse(v1BrokerIds.contains(downNode.config.brokerId), s"The downed
broker should not be in the brokers list")
+ assertEquals(1, v1MetadataResponse.topicMetadata.size, "Response should
have one topic")
+ val v1PartitionMetadata =
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+ assertEquals(Errors.NONE, v1PartitionMetadata.error, "PartitionMetadata
should have no errors")
+ assertEquals(replicaCount, v1PartitionMetadata.replicaIds.size, s"Response
should have $replicaCount replicas")
}
@ParameterizedTest