This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78aff4fede2 KAFKA-18659: librdkafka compressed produce fails unless
api versions returns produce v0 (#18727)
78aff4fede2 is described below
commit 78aff4fede2b9ad72d1a72ea385ee840c1505b38
Author: Ismael Juma <[email protected]>
AuthorDate: Sat Feb 1 16:08:54 2025 -0800
KAFKA-18659: librdkafka compressed produce fails unless api versions
returns produce v0 (#18727)
Return produce v0-v2 as supported versions in `ApiVersionsResponse`, but
disable support
for it everywhere else.
Since clients pick the highest supported version by both client and broker
during version
negotiation, this solves the problem with minimal tech debt (even though
it's not ideal that
`ApiVersionsResponse` becomes inconsistent with the actual protocol
support).
Add one test for the socket server handling (in `ProcessorTest`) and one
test for the
client behavior (in `ProduceRequestTest`). Adjust a couple of api versions
tests to verify
the new behavior.
Finally, include a few clean-ups in `ApiKeys`, `Protocol`, `ProduceRequest`,
`ProduceRequestTest` and `BrokerApiVersionsCommandTest`.
Reference to related librdkafka issue:
https://github.com/confluentinc/librdkafka/issues/4956
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>,
Stanislav Kozlovski <[email protected]>
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 36 ++++++++++++++++----
.../org/apache/kafka/common/protocol/Protocol.java | 38 +++++++++++-----------
.../kafka/common/requests/ApiVersionsResponse.java | 7 ++--
.../kafka/common/requests/ProduceRequest.java | 21 ++++--------
.../resources/common/message/ProduceRequest.json | 4 ++-
.../resources/common/message/ProduceResponse.json | 4 ++-
.../common/requests/ApiVersionsResponseTest.java | 14 +++++---
.../kafka/common/requests/ProduceRequestTest.java | 19 +++++++++--
.../scala/kafka/server/ApiVersionManager.scala | 2 +-
.../scala/unit/kafka/network/ProcessorTest.scala | 37 ++++++++++++++++-----
.../server/AbstractApiVersionsRequestTest.scala | 4 +++
.../unit/kafka/server/ProduceRequestTest.scala | 2 +-
.../kafka/tools/BrokerApiVersionsCommandTest.java | 4 +--
13 files changed, 129 insertions(+), 63 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 1f8a98554c2..1c752945563 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -149,6 +149,11 @@ public enum ApiKeys {
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
.collect(Collectors.toMap(key -> (int) key.id, Function.identity()));
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new
baseline. Due to a bug in librdkafka,
+ // version `0` has to be included in the api versions response (see
KAFKA-18659). In order to achieve that,
+ // we adjust `toApiVersion` to return `0` for the min version of `produce`
in the broker listener.
+ public static final short PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0;
+
/** the permanent and immutable id of an API - this can't change ever */
public final short id;
@@ -264,8 +269,30 @@ public enum ApiKeys {
return oldestVersion() <= latestVersion();
}
+ /**
+ * To workaround a critical bug in librdkafka, the api versions response
is inconsistent with the actual versions
+ * supported by `produce` - this method handles that. It should be called
in the context of the api response protocol
+ * handling.
+ *
+ * It should not be used by code generating protocol documentation - we
keep that consistent with the actual versions
+ * supported by `produce`.
+ *
+ * See `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details.
+ */
+ public Optional<ApiVersionsResponseData.ApiVersion>
toApiVersionForApiResponse(boolean enableUnstableLastVersion,
+
ApiMessageType.ListenerType listenerType) {
+ return toApiVersion(enableUnstableLastVersion,
Optional.of(listenerType));
+ }
+
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean
enableUnstableLastVersion) {
- short oldestVersion = oldestVersion();
+ return toApiVersion(enableUnstableLastVersion, Optional.empty());
+ }
+
+ private Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean
enableUnstableLastVersion,
+
Optional<ApiMessageType.ListenerType> listenerType) {
+ // see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why
we do this
+ short oldestVersion = (this == PRODUCE && listenerType.map(l -> l ==
ApiMessageType.ListenerType.BROKER).orElse(false)) ?
+ PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION : oldestVersion();
short latestVersion = latestVersion(enableUnstableLastVersion);
// API is entirely disabled if latestStableVersion is smaller than
oldestVersion.
@@ -299,7 +326,7 @@ public enum ApiKeys {
b.append("<th>Key</th>\n");
b.append("</tr>");
clientApis().stream()
- .filter(apiKey -> apiKey.toApiVersion(false).isPresent())
+ .filter(apiKey -> apiKey.toApiVersion(false,
Optional.empty()).isPresent())
.forEach(apiKey -> {
b.append("<tr>\n");
b.append("<td>");
@@ -341,10 +368,7 @@ public enum ApiKeys {
}
public static EnumSet<ApiKeys> clientApis() {
- List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
- .filter(apiKey ->
apiKey.inScope(ApiMessageType.ListenerType.BROKER))
- .collect(Collectors.toList());
- return EnumSet.copyOf(apis);
+ return brokerApis();
}
public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType
listener) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 1459a901030..237948f61c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -208,26 +208,26 @@ public class Protocol {
// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = key.messageType.responseSchemas();
- for (int i = 0; i < responses.length; i++) {
- Schema schema = responses[i];
+ for (int version = key.oldestVersion(); version <
key.latestVersion(); version++) {
+ Schema schema = responses[version];
+ if (schema == null)
+ throw new IllegalStateException("Unexpected null schema
for " + key + " with version " + version);
// Schema
- if (schema != null) {
- b.append("<div>");
- // Version header
- b.append("<pre>");
- b.append(key.name);
- b.append(" Response (Version: ");
- b.append(i);
- b.append(") => ");
- schemaToBnfHtml(responses[i], b, 2);
- b.append("</pre>");
-
- b.append("<p><b>Response header version:</b> ");
- b.append(key.responseHeaderVersion((short) i));
- b.append("</p>\n");
-
- schemaToFieldTableHtml(responses[i], b);
- }
+ b.append("<div>");
+ // Version header
+ b.append("<pre>");
+ b.append(key.name);
+ b.append(" Response (Version: ");
+ b.append(version);
+ b.append(") => ");
+ schemaToBnfHtml(responses[version], b, 2);
+ b.append("</pre>");
+
+ b.append("<p><b>Response header version:</b> ");
+ b.append(key.responseHeaderVersion((short) version));
+ b.append("</p>\n");
+
+ schemaToFieldTableHtml(responses[version], b);
b.append("</div>\n");
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 643b4e44c0e..324e527984d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -204,18 +204,19 @@ public class ApiVersionsResponse extends AbstractResponse
{
// Skip telemetry APIs if client telemetry is disabled.
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey ==
ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
continue;
-
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
+ apiKey.toApiVersionForApiResponse(enableUnstableLastVersion,
listenerType).ifPresent(apiKeys::add);
}
return apiKeys;
}
public static ApiVersionCollection collectApis(
+ ApiMessageType.ListenerType listenerType,
Set<ApiKeys> apiKeys,
boolean enableUnstableLastVersion
) {
ApiVersionCollection res = new ApiVersionCollection();
for (ApiKeys apiKey : apiKeys) {
- apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add);
+ apiKey.toApiVersionForApiResponse(enableUnstableLastVersion,
listenerType).ifPresent(res::add);
}
return res;
}
@@ -238,7 +239,7 @@ public class ApiVersionsResponse extends AbstractResponse {
) {
ApiVersionCollection apiKeys = new ApiVersionCollection();
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
- final Optional<ApiVersion> brokerApiVersion =
apiKey.toApiVersion(enableUnstableLastVersion);
+ final Optional<ApiVersion> brokerApiVersion =
apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType);
if (brokerApiVersion.isEmpty()) {
// Broker does not support this API key.
continue;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 8fbd86cb9bb..a9f5205a308 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
public class ProduceRequest extends AbstractRequest {
+
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;
public static Builder builder(ProduceRequestData data, boolean
useTransactionV1Version) {
@@ -66,21 +67,10 @@ public class ProduceRequest extends AbstractRequest {
@Override
public ProduceRequest build(short version) {
- return build(version, true);
- }
-
- // Visible for testing only
- public ProduceRequest buildUnsafe(short version) {
- return build(version, false);
- }
-
- private ProduceRequest build(short version, boolean validate) {
- if (validate) {
- // Validate the given records first
- data.topicData().forEach(tpd ->
- tpd.partitionData().forEach(partitionProduceData ->
- ProduceRequest.validateRecords(version,
partitionProduceData.records())));
- }
+ // Validate the given records first
+ data.topicData().forEach(tpd ->
+ tpd.partitionData().forEach(partitionProduceData ->
+ ProduceRequest.validateRecords(version,
partitionProduceData.records())));
return new ProduceRequest(data, version);
}
@@ -244,4 +234,5 @@ public class ProduceRequest extends AbstractRequest {
public static boolean isTransactionV2Requested(short version) {
return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
}
+
}
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json
b/clients/src/main/resources/common/message/ProduceRequest.json
index db7d961f137..0bb29f92378 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -18,7 +18,9 @@
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
- // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new
baseline.
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new
baseline. Due to a bug in librdkafka,
+ // these versions have to be included in the api versions response (see
KAFKA-18659), but are rejected otherwise.
+ // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
//
// Version 1 and 2 are the same as version 0.
//
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json
b/clients/src/main/resources/common/message/ProduceResponse.json
index 5c12539dfb1..fafcd86401d 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -17,7 +17,9 @@
"apiKey": 0,
"type": "response",
"name": "ProduceResponse",
- // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new
baseline.
+ // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new
baseline. Due to a bug in librdkafka,
+ // these versions have to be included in the api versions response (see
KAFKA-18659), but are rejected otherwise.
+ // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
//
// Version 1 added the throttle time.
// Version 2 added the log append time.
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index dd8b8144a29..c94d021ef21 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -58,10 +58,13 @@ public class ApiVersionsResponseTest {
for (ApiKeys key : ApiKeys.apisForListener(scope)) {
ApiVersion version = defaultResponse.apiVersion(key.id);
assertNotNull(version, "Could not find ApiVersion for API " +
key.name);
- assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect
min version for Api " + key.name);
- assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect
max version for Api " + key.name);
+ if (key == ApiKeys.PRODUCE)
+
assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION,
version.minVersion(), "Incorrect min version for Api " + key.name);
+ else
+ assertEquals(key.oldestVersion(), version.minVersion(),
"Incorrect min version for Api " + key.name);
+ assertEquals(key.latestVersion(), version.maxVersion(), "Incorrect
max version for Api " + key.name);
- // Check if versions less than min version are indeed set as null,
i.e., deprecated.
+ // Check if versions less than min version are indeed set as null,
i.e., removed.
for (int i = 0; i < version.minVersion(); ++i) {
assertNull(key.messageType.requestSchemas()[i],
"Request version " + i + " for API " + version.apiKey() +
" must be null");
@@ -69,8 +72,11 @@ public class ApiVersionsResponseTest {
"Response version " + i + " for API " + version.apiKey() +
" must be null");
}
+ // The min version returned in ApiResponse for Produce is not the
actual min version, so adjust it
+ var minVersion = (key == ApiKeys.PRODUCE && scope ==
ListenerType.BROKER) ?
+ ApiKeys.PRODUCE.oldestVersion() : version.minVersion();
// Check if versions between min and max versions are non null,
i.e., valid.
- for (int i = version.minVersion(); i <= version.maxVersion(); ++i)
{
+ for (int i = minVersion; i <= version.maxVersion(); ++i) {
assertNotNull(key.messageType.requestSchemas()[i],
"Request version " + i + " for API " + version.apiKey() +
" must not be null");
assertNotNull(key.messageType.responseSchemas()[i],
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index ecb8869c38b..42a1e1f3968 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -212,8 +212,7 @@ public class ProduceRequestTest {
.setAcks((short) 1)
.setTimeoutMs(1000);
// Can't create ProduceRequest instance with version within [3, 7)
- for (short version = 3; version < 7; version++) {
-
+ for (short version = ApiKeys.PRODUCE.oldestVersion(); version < 7;
version++) {
ProduceRequest.Builder requestBuilder = new
ProduceRequest.Builder(version, version, produceData);
assertThrowsForAllVersions(requestBuilder,
UnsupportedCompressionTypeException.class);
}
@@ -277,6 +276,22 @@ public class ProduceRequestTest {
assertTrue(RequestTestUtils.hasIdempotentRecords(request));
}
+ @Test
+ public void testBuilderOldestAndLatestAllowed() {
+ ProduceRequest.Builder builder = ProduceRequest.builder(new
ProduceRequestData()
+ .setTopicData(new
ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
+ new ProduceRequestData.TopicProduceData()
+ .setName("topic")
+ .setPartitionData(Collections.singletonList(new
ProduceRequestData.PartitionProduceData()
+ .setIndex(1)
+
.setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))
+ ).iterator()))
+ .setAcks((short) -1)
+ .setTimeoutMs(10));
+ assertEquals(ApiKeys.PRODUCE.oldestVersion(),
builder.oldestAllowedVersion());
+ assertEquals(ApiKeys.PRODUCE.latestVersion(),
builder.latestAllowedVersion());
+ }
+
private static <T extends Throwable> void
assertThrowsForAllVersions(ProduceRequest.Builder builder,
Class<T> expectedType) {
IntStream.range(builder.oldestAllowedVersion(),
builder.latestAllowedVersion() + 1)
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index fd1c70e509f..e286bc9352a 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -94,7 +94,7 @@ class SimpleApiVersionManager(
)
}
- private val apiVersions =
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
+ private val apiVersions = ApiVersionsResponse.collectApis(listenerType,
enabledApis.asJava, enableUnstableLastVersion)
override def apiVersionResponse(
throttleTimeMs: Int,
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 3a862678ca7..66f3c5d5c77 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -17,16 +17,19 @@
package kafka.network
-import kafka.server.SimpleApiVersionManager
+import kafka.server.metadata.KRaftMetadataCache
+import kafka.server.{DefaultApiVersionManager, ForwardingManager,
SimpleApiVersionManager}
import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.RequestHeaderData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
+import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
+import org.mockito.Mockito.mock
import java.util.Collections
@@ -54,8 +57,8 @@ class ProcessorTest {
.setClientId("clientid")
.setCorrelationId(0);
val requestHeader = RequestTestUtils.serializeRequestHeader(new
RequestHeader(requestHeaderData, headerVersion))
- val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER,
true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
+ val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[ForwardingManager]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"LEADER_AND_ISR should throw InvalidRequestException exception")
@@ -65,13 +68,31 @@ class ProcessorTest {
@Test
def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = {
val requestHeader = RequestTestUtils.serializeRequestHeader(
- new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
- val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER,
true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
Collections.emptyMap[String, java.lang.Short], 0))
+ new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0))
+ val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER,
mock(classOf[ForwardingManager]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
val e = assertThrows(classOf[UnsupportedVersionException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
- "PRODUCE v0 should throw UnsupportedVersionException exception")
+ "FETCH v0 should throw UnsupportedVersionException exception")
assertTrue(e.toString.contains("unsupported version"));
}
+ /**
+ * We do something unusual with these versions of produce, and we want to
make sure we don't regress.
+ * See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details.
+ */
+ @Test
+ def testParseRequestHeaderForProduceV0ToV2(): Unit = {
+ for (version <- 0 to 2) {
+ val requestHeader = RequestTestUtils.serializeRequestHeader(
+ new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0))
+ val apiVersionManager = new
DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]),
+ BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () =>
KRaftVersion.LATEST_PRODUCTION), true)
+ val e = assertThrows(classOf[UnsupportedVersionException],
+ (() => Processor.parseRequestHeader(apiVersionManager,
requestHeader)): Executable,
+ s"PRODUCE $version should throw UnsupportedVersionException exception")
+ assertTrue(e.toString.contains("unsupported version"));
+ }
+ }
+
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index f71fc37bb2f..900fb0f66fb 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -88,6 +88,7 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
}
val expectedApis = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
+ ApiMessageType.ListenerType.CONTROLLER,
ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
enableUnstableLastVersion
)
@@ -116,5 +117,8 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedApiVersion.minVersion, actualApiVersion.minVersion,
s"Received unexpected min version for API key ${actualApiVersion.apiKey}.")
assertEquals(expectedApiVersion.maxVersion, actualApiVersion.maxVersion,
s"Received unexpected max version for API key ${actualApiVersion.apiKey}.")
}
+
+ if (listenerName.equals(cluster.clientListener))
+ assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION,
apiVersionsResponse.apiVersion(ApiKeys.PRODUCE.id).minVersion)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 64111f14875..5ab33d868a1 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -254,7 +254,7 @@ class ProduceRequestTest extends BaseRequestTest {
// Create a single-partition topic compressed with ZSTD
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.ZSTD.name)
- val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
+ val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
val leader = partitionToLeader(partition)
val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(),
new SimpleRecord(System.currentTimeMillis(), "key".getBytes,
"value".getBytes))
diff --git
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 49534a0ca82..b1cc54c828a 100644
---
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -55,7 +55,7 @@ public class BrokerApiVersionsCommandTest {
ApiMessageType.ListenerType listenerType =
ApiMessageType.ListenerType.BROKER;
NodeApiVersions nodeApiVersions = new NodeApiVersions(
- ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true),
+ ApiVersionsResponse.filterApis(listenerType, true, true),
Collections.emptyList());
Iterator<ApiKeys> apiKeysIter = ApiKeys.clientApis().iterator();
while (apiKeysIter.hasNext()) {
@@ -64,7 +64,7 @@ public class BrokerApiVersionsCommandTest {
StringBuilder lineBuilder = new StringBuilder().append("\t");
if (apiKey.inScope(listenerType)) {
ApiVersion apiVersion = nodeApiVersions.apiVersion(apiKey);
- assertNotNull(apiVersion);
+ assertNotNull(apiVersion, "No apiVersion found for " + apiKey);
String versionRangeStr = (apiVersion.minVersion() ==
apiVersion.maxVersion()) ?
String.valueOf(apiVersion.minVersion()) :