This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78aff4fede2 KAFKA-18659: librdkafka compressed produce fails unless 
api versions returns produce v0 (#18727)
78aff4fede2 is described below

commit 78aff4fede2b9ad72d1a72ea385ee840c1505b38
Author: Ismael Juma <[email protected]>
AuthorDate: Sat Feb 1 16:08:54 2025 -0800

    KAFKA-18659: librdkafka compressed produce fails unless api versions 
returns produce v0 (#18727)
    
    Return produce v0-v2 as supported versions in `ApiVersionsResponse`, but 
disable support
    for it everywhere else.
    
    Since clients pick the highest supported version by both client and broker 
during version
    negotiation, this solves the problem with minimal tech debt (even though 
it's not ideal that
    `ApiVersionsResponse` becomes inconsistent with the actual protocol 
support).
    
    Add one test for the socket server handling (in `ProcessorTest`) and one 
test for the
    client behavior (in `ProduceRequestTest`). Adjust a couple of api versions 
tests to verify
    the new behavior.
    
    Finally, include a few clean-ups in `ApiKeys`, `Protocol`, `ProduceRequest`,
    `ProduceRequestTest` and `BrokerApiVersionsCommandTest`.
    
    Reference to related librdkafka issue:
    https://github.com/confluentinc/librdkafka/issues/4956
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>, 
Stanislav Kozlovski <[email protected]>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  | 36 ++++++++++++++++----
 .../org/apache/kafka/common/protocol/Protocol.java | 38 +++++++++++-----------
 .../kafka/common/requests/ApiVersionsResponse.java |  7 ++--
 .../kafka/common/requests/ProduceRequest.java      | 21 ++++--------
 .../resources/common/message/ProduceRequest.json   |  4 ++-
 .../resources/common/message/ProduceResponse.json  |  4 ++-
 .../common/requests/ApiVersionsResponseTest.java   | 14 +++++---
 .../kafka/common/requests/ProduceRequestTest.java  | 19 +++++++++--
 .../scala/kafka/server/ApiVersionManager.scala     |  2 +-
 .../scala/unit/kafka/network/ProcessorTest.scala   | 37 ++++++++++++++++-----
 .../server/AbstractApiVersionsRequestTest.scala    |  4 +++
 .../unit/kafka/server/ProduceRequestTest.scala     |  2 +-
 .../kafka/tools/BrokerApiVersionsCommandTest.java  |  4 +--
 13 files changed, 129 insertions(+), 63 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 1f8a98554c2..1c752945563 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -149,6 +149,11 @@ public enum ApiKeys {
     private static final Map<Integer, ApiKeys> ID_TO_TYPE = 
Arrays.stream(ApiKeys.values())
         .collect(Collectors.toMap(key -> (int) key.id, Function.identity()));
 
+    // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new 
baseline. Due to a bug in librdkafka,
+    // version `0` has to be included in the api versions response (see 
KAFKA-18659). In order to achieve that,
+    // we adjust `toApiVersion` to return `0` for the min version of `produce` 
in the broker listener.
+    public static final short PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0;
+
     /** the permanent and immutable id of an API - this can't change ever */
     public final short id;
 
@@ -264,8 +269,30 @@ public enum ApiKeys {
         return oldestVersion() <= latestVersion();
     }
 
+    /**
+     * To workaround a critical bug in librdkafka, the api versions response 
is inconsistent with the actual versions
+     * supported by `produce` - this method handles that. It should be called 
in the context of the api response protocol
+     * handling.
+     *
+     * It should not be used by code generating protocol documentation - we 
keep that consistent with the actual versions
+     * supported by `produce`.
+     *
+     * See `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details.
+     */
+    public Optional<ApiVersionsResponseData.ApiVersion> 
toApiVersionForApiResponse(boolean enableUnstableLastVersion,
+                                                                               
    ApiMessageType.ListenerType listenerType) {
+        return toApiVersion(enableUnstableLastVersion, 
Optional.of(listenerType));
+    }
+
     public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean 
enableUnstableLastVersion) {
-        short oldestVersion = oldestVersion();
+        return toApiVersion(enableUnstableLastVersion, Optional.empty());
+    }
+
+    private Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean 
enableUnstableLastVersion,
+                                                                     
Optional<ApiMessageType.ListenerType> listenerType) {
+        // see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why 
we do this
+        short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == 
ApiMessageType.ListenerType.BROKER).orElse(false)) ?
+            PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION : oldestVersion();
         short latestVersion = latestVersion(enableUnstableLastVersion);
 
         // API is entirely disabled if latestStableVersion is smaller than 
oldestVersion.
@@ -299,7 +326,7 @@ public enum ApiKeys {
         b.append("<th>Key</th>\n");
         b.append("</tr>");
         clientApis().stream()
-            .filter(apiKey -> apiKey.toApiVersion(false).isPresent())
+            .filter(apiKey -> apiKey.toApiVersion(false, 
Optional.empty()).isPresent())
             .forEach(apiKey -> {
                 b.append("<tr>\n");
                 b.append("<td>");
@@ -341,10 +368,7 @@ public enum ApiKeys {
     }
 
     public static EnumSet<ApiKeys> clientApis() {
-        List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
-            .filter(apiKey -> 
apiKey.inScope(ApiMessageType.ListenerType.BROKER))
-            .collect(Collectors.toList());
-        return EnumSet.copyOf(apis);
+        return brokerApis();
     }
 
     public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType 
listener) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 1459a901030..237948f61c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -208,26 +208,26 @@ public class Protocol {
             // Responses
             b.append("<b>Responses:</b><br>\n");
             Schema[] responses = key.messageType.responseSchemas();
-            for (int i = 0; i < responses.length; i++) {
-                Schema schema = responses[i];
+            for (int version = key.oldestVersion(); version < 
key.latestVersion(); version++) {
+                Schema schema = responses[version];
+                if (schema == null)
+                    throw new IllegalStateException("Unexpected null schema 
for " + key + " with version " + version);
                 // Schema
-                if (schema != null) {
-                    b.append("<div>");
-                    // Version header
-                    b.append("<pre>");
-                    b.append(key.name);
-                    b.append(" Response (Version: ");
-                    b.append(i);
-                    b.append(") => ");
-                    schemaToBnfHtml(responses[i], b, 2);
-                    b.append("</pre>");
-
-                    b.append("<p><b>Response header version:</b> ");
-                    b.append(key.responseHeaderVersion((short) i));
-                    b.append("</p>\n");
-
-                    schemaToFieldTableHtml(responses[i], b);
-                }
+                b.append("<div>");
+                // Version header
+                b.append("<pre>");
+                b.append(key.name);
+                b.append(" Response (Version: ");
+                b.append(version);
+                b.append(") => ");
+                schemaToBnfHtml(responses[version], b, 2);
+                b.append("</pre>");
+
+                b.append("<p><b>Response header version:</b> ");
+                b.append(key.responseHeaderVersion((short) version));
+                b.append("</p>\n");
+
+                schemaToFieldTableHtml(responses[version], b);
                 b.append("</div>\n");
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 643b4e44c0e..324e527984d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -204,18 +204,19 @@ public class ApiVersionsResponse extends AbstractResponse 
{
             // Skip telemetry APIs if client telemetry is disabled.
             if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
                 continue;
-            
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
+            apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, 
listenerType).ifPresent(apiKeys::add);
         }
         return apiKeys;
     }
 
     public static ApiVersionCollection collectApis(
+        ApiMessageType.ListenerType listenerType,
         Set<ApiKeys> apiKeys,
         boolean enableUnstableLastVersion
     ) {
         ApiVersionCollection res = new ApiVersionCollection();
         for (ApiKeys apiKey : apiKeys) {
-            apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add);
+            apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, 
listenerType).ifPresent(res::add);
         }
         return res;
     }
@@ -238,7 +239,7 @@ public class ApiVersionsResponse extends AbstractResponse {
     ) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
         for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
-            final Optional<ApiVersion> brokerApiVersion = 
apiKey.toApiVersion(enableUnstableLastVersion);
+            final Optional<ApiVersion> brokerApiVersion = 
apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType);
             if (brokerApiVersion.isEmpty()) {
                 // Broker does not support this API key.
                 continue;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 8fbd86cb9bb..a9f5205a308 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
 
 public class ProduceRequest extends AbstractRequest {
+
     public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;
 
     public static Builder builder(ProduceRequestData data, boolean 
useTransactionV1Version) {
@@ -66,21 +67,10 @@ public class ProduceRequest extends AbstractRequest {
 
         @Override
         public ProduceRequest build(short version) {
-            return build(version, true);
-        }
-
-        // Visible for testing only
-        public ProduceRequest buildUnsafe(short version) {
-            return build(version, false);
-        }
-
-        private ProduceRequest build(short version, boolean validate) {
-            if (validate) {
-                // Validate the given records first
-                data.topicData().forEach(tpd ->
-                        tpd.partitionData().forEach(partitionProduceData ->
-                                ProduceRequest.validateRecords(version, 
partitionProduceData.records())));
-            }
+            // Validate the given records first
+            data.topicData().forEach(tpd ->
+                tpd.partitionData().forEach(partitionProduceData ->
+                    ProduceRequest.validateRecords(version, 
partitionProduceData.records())));
             return new ProduceRequest(data, version);
         }
 
@@ -244,4 +234,5 @@ public class ProduceRequest extends AbstractRequest {
     public static boolean isTransactionV2Requested(short version) {
         return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
     }
+
 }
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json 
b/clients/src/main/resources/common/message/ProduceRequest.json
index db7d961f137..0bb29f92378 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -18,7 +18,9 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "ProduceRequest",
-  // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new 
baseline.
+  // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new 
baseline. Due to a bug in librdkafka,
+  // these versions have to be included in the api versions response (see 
KAFKA-18659), but are rejected otherwise.
+  // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
   //
   // Version 1 and 2 are the same as version 0.
   //
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json 
b/clients/src/main/resources/common/message/ProduceResponse.json
index 5c12539dfb1..fafcd86401d 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -17,7 +17,9 @@
   "apiKey": 0,
   "type": "response",
   "name": "ProduceResponse",
-  // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new 
baseline.
+  // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new 
baseline. Due to a bug in librdkafka,
+  // these versions have to be included in the api versions response (see 
KAFKA-18659), but are rejected otherwise.
+  // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
   //
   // Version 1 added the throttle time.
   // Version 2 added the log append time.
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index dd8b8144a29..c94d021ef21 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -58,10 +58,13 @@ public class ApiVersionsResponseTest {
         for (ApiKeys key : ApiKeys.apisForListener(scope)) {
             ApiVersion version = defaultResponse.apiVersion(key.id);
             assertNotNull(version, "Could not find ApiVersion for API " + 
key.name);
-            assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect 
min version for Api " + key.name);
-            assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect 
max version for Api " + key.name);
+            if (key == ApiKeys.PRODUCE)
+                
assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, 
version.minVersion(), "Incorrect min version for Api " + key.name);
+            else
+                assertEquals(key.oldestVersion(), version.minVersion(), 
"Incorrect min version for Api " + key.name);
+            assertEquals(key.latestVersion(), version.maxVersion(), "Incorrect 
max version for Api " + key.name);
 
-            // Check if versions less than min version are indeed set as null, 
i.e., deprecated.
+            // Check if versions less than min version are indeed set as null, 
i.e., removed.
             for (int i = 0; i < version.minVersion(); ++i) {
                 assertNull(key.messageType.requestSchemas()[i],
                     "Request version " + i + " for API " + version.apiKey() + 
" must be null");
@@ -69,8 +72,11 @@ public class ApiVersionsResponseTest {
                     "Response version " + i + " for API " + version.apiKey() + 
" must be null");
             }
 
+            // The min version returned in ApiResponse for Produce is not the 
actual min version, so adjust it
+            var minVersion = (key == ApiKeys.PRODUCE && scope == 
ListenerType.BROKER) ?
+                ApiKeys.PRODUCE.oldestVersion() : version.minVersion();
             // Check if versions between min and max versions are non null, 
i.e., valid.
-            for (int i = version.minVersion(); i <= version.maxVersion(); ++i) 
{
+            for (int i = minVersion; i <= version.maxVersion(); ++i) {
                 assertNotNull(key.messageType.requestSchemas()[i],
                     "Request version " + i + " for API " + version.apiKey() + 
" must not be null");
                 assertNotNull(key.messageType.responseSchemas()[i],
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index ecb8869c38b..42a1e1f3968 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -212,8 +212,7 @@ public class ProduceRequestTest {
             .setAcks((short) 1)
             .setTimeoutMs(1000);
         // Can't create ProduceRequest instance with version within [3, 7)
-        for (short version = 3; version < 7; version++) {
-
+        for (short version = ApiKeys.PRODUCE.oldestVersion(); version < 7; 
version++) {
             ProduceRequest.Builder requestBuilder = new 
ProduceRequest.Builder(version, version, produceData);
             assertThrowsForAllVersions(requestBuilder, 
UnsupportedCompressionTypeException.class);
         }
@@ -277,6 +276,22 @@ public class ProduceRequestTest {
         assertTrue(RequestTestUtils.hasIdempotentRecords(request));
     }
 
+    @Test
+    public void testBuilderOldestAndLatestAllowed() {
+        ProduceRequest.Builder builder = ProduceRequest.builder(new 
ProduceRequestData()
+            .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+                new ProduceRequestData.TopicProduceData()
+                    .setName("topic")
+                    .setPartitionData(Collections.singletonList(new 
ProduceRequestData.PartitionProduceData()
+                        .setIndex(1)
+                        
.setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))
+            ).iterator()))
+            .setAcks((short) -1)
+            .setTimeoutMs(10));
+        assertEquals(ApiKeys.PRODUCE.oldestVersion(), 
builder.oldestAllowedVersion());
+        assertEquals(ApiKeys.PRODUCE.latestVersion(), 
builder.latestAllowedVersion());
+    }
+
     private static <T extends Throwable> void 
assertThrowsForAllVersions(ProduceRequest.Builder builder,
                                                                          
Class<T> expectedType) {
         IntStream.range(builder.oldestAllowedVersion(), 
builder.latestAllowedVersion() + 1)
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index fd1c70e509f..e286bc9352a 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -94,7 +94,7 @@ class SimpleApiVersionManager(
     )
   }
 
-  private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
+  private val apiVersions = ApiVersionsResponse.collectApis(listenerType, 
enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(
     throttleTimeMs: Int,
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 3a862678ca7..66f3c5d5c77 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -17,16 +17,19 @@
 
 package kafka.network
 
-import kafka.server.SimpleApiVersionManager
+import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.{DefaultApiVersionManager, ForwardingManager, 
SimpleApiVersionManager}
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnsupportedVersionException}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.RequestHeaderData
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
+import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.function.Executable
+import org.mockito.Mockito.mock
 
 import java.util.Collections
 
@@ -54,8 +57,8 @@ class ProcessorTest {
       .setClientId("clientid")
       .setCorrelationId(0);
     val requestHeader = RequestTestUtils.serializeRequestHeader(new 
RequestHeader(requestHeaderData, headerVersion))
-    val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, 
true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
+    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[ForwardingManager]),
+      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "LEADER_AND_ISR should throw InvalidRequestException exception")
@@ -65,13 +68,31 @@ class ProcessorTest {
   @Test
   def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
-      new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
-    val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, 
true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
+      new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0))
+    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[ForwardingManager]),
+      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
     val e = assertThrows(classOf[UnsupportedVersionException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
-      "PRODUCE v0 should throw UnsupportedVersionException exception")
+      "FETCH v0 should throw UnsupportedVersionException exception")
     assertTrue(e.toString.contains("unsupported version"));
   }
 
+  /**
+   * We do something unusual with these versions of produce, and we want to 
make sure we don't regress.
+   * See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details.
+   */
+  @Test
+  def testParseRequestHeaderForProduceV0ToV2(): Unit = {
+    for (version <- 0 to 2) {
+      val requestHeader = RequestTestUtils.serializeRequestHeader(
+        new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0))
+      val apiVersionManager = new 
DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]),
+        BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
+      val e = assertThrows(classOf[UnsupportedVersionException],
+        (() => Processor.parseRequestHeader(apiVersionManager, 
requestHeader)): Executable,
+        s"PRODUCE $version should throw UnsupportedVersionException exception")
+      assertTrue(e.toString.contains("unsupported version"));
+    }
+  }
+
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index f71fc37bb2f..900fb0f66fb 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -88,6 +88,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     }
     val expectedApis = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       ApiVersionsResponse.collectApis(
+        ApiMessageType.ListenerType.CONTROLLER,
         ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
         enableUnstableLastVersion
       )
@@ -116,5 +117,8 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
       assertEquals(expectedApiVersion.minVersion, actualApiVersion.minVersion, 
s"Received unexpected min version for API key ${actualApiVersion.apiKey}.")
       assertEquals(expectedApiVersion.maxVersion, actualApiVersion.maxVersion, 
s"Received unexpected max version for API key ${actualApiVersion.apiKey}.")
     }
+
+    if (listenerName.equals(cluster.clientListener))
+      assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, 
apiVersionsResponse.apiVersion(ApiKeys.PRODUCE.id).minVersion)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 64111f14875..5ab33d868a1 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -254,7 +254,7 @@ class ProduceRequestTest extends BaseRequestTest {
     // Create a single-partition topic compressed with ZSTD
     val topicConfig = new Properties
     topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.ZSTD.name)
-    val partitionToLeader = createTopic(topic, topicConfig =  topicConfig)
+    val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
     val leader = partitionToLeader(partition)
     val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
       new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"value".getBytes))
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 49534a0ca82..b1cc54c828a 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -55,7 +55,7 @@ public class BrokerApiVersionsCommandTest {
         ApiMessageType.ListenerType listenerType = 
ApiMessageType.ListenerType.BROKER;
 
         NodeApiVersions nodeApiVersions = new NodeApiVersions(
-                ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true),
+                ApiVersionsResponse.filterApis(listenerType, true, true),
                 Collections.emptyList());
         Iterator<ApiKeys> apiKeysIter = ApiKeys.clientApis().iterator();
         while (apiKeysIter.hasNext()) {
@@ -64,7 +64,7 @@ public class BrokerApiVersionsCommandTest {
             StringBuilder lineBuilder = new StringBuilder().append("\t");
             if (apiKey.inScope(listenerType)) {
                 ApiVersion apiVersion = nodeApiVersions.apiVersion(apiKey);
-                assertNotNull(apiVersion);
+                assertNotNull(apiVersion, "No apiVersion found for " + apiKey);
 
                 String versionRangeStr = (apiVersion.minVersion() == 
apiVersion.maxVersion()) ?
                         String.valueOf(apiVersion.minVersion()) :

Reply via email to