This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ca5d2cf76db KAFKA-18646: Null records in fetch response breaks
librdkafka (#18726)
ca5d2cf76db is described below
commit ca5d2cf76dbc86d4300dadd372c2a5a38388783e
Author: Ismael Juma <[email protected]>
AuthorDate: Wed Jan 29 07:04:12 2025 -0800
KAFKA-18646: Null records in fetch response breaks librdkafka (#18726)
Ensure we always return empty records (including cases where an error is
returned).
We also remove `nullable` from `records` since it is effectively expected
to be
non-null by a large percentage of clients in the wild.
This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.
Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.
Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur
<[email protected]>
---
.../kafka/common/requests/FetchResponse.java | 5 ++--
.../resources/common/message/FetchResponse.json | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 6 +++--
.../kafka/api/AuthorizerIntegrationTest.scala | 31 +++++++++++++++++++++-
.../apache/kafka/raft/RaftClientTestContext.java | 4 +++
5 files changed, 42 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index fb2a5a3c87b..91837bfaa55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -196,7 +196,8 @@ public class FetchResponse extends AbstractResponse {
return new FetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(error.code())
- .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+ .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+ .setRecords(MemoryRecords.EMPTY);
}
/**
@@ -285,4 +286,4 @@ public class FetchResponse extends AbstractResponse {
.setSessionId(sessionId)
.setResponses(topicResponseList);
}
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/resources/common/message/FetchResponse.json
b/clients/src/main/resources/common/message/FetchResponse.json
index dc8d3517566..495f9a35e23 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -106,7 +106,7 @@
]},
{ "name": "PreferredReadReplica", "type": "int32", "versions": "11+",
"default": "-1", "ignorable": false, "entityType": "brokerId",
"about": "The preferred read replica for the consumer to use on its
next fetch request."},
- { "name": "Records", "type": "records", "versions": "0+",
"nullableVersions": "0+", "about": "The record data."}
+ { "name": "Records", "type": "records", "versions": "0+", "about":
"The record data."}
]}
]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+",
"taggedVersions": "16+", "tag": 0,
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0acaf9fc7d0..6beb20fb9df 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2024,7 +2024,8 @@ public class RequestResponseTest {
.setPartitionIndex(1)
.setHighWatermark(1000000)
.setLogStartOffset(0)
- .setAbortedTransactions(abortedTransactions));
+ .setAbortedTransactions(abortedTransactions)
+ .setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId,
responseData).serialize(FETCH.latestVersion()),
FETCH.latestVersion());
}
@@ -2048,7 +2049,8 @@ public class RequestResponseTest {
.setPartitionIndex(1)
.setHighWatermark(1000000)
.setLogStartOffset(0)
- .setAbortedTransactions(abortedTransactions));
+ .setAbortedTransactions(abortedTransactions)
+ .setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25,
INVALID_SESSION_ID,
responseData).serialize(FETCH.latestVersion()),
FETCH.latestVersion());
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 30e6f0384fb..4256d40da4a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData,
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData,
DeleteTopicsRequestData, DescribeClusterRequestData,
DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestDa
[...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData,
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData,
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData,
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData,
DeleteTopicsRequestData, DescribeClusterRequestData,
DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestDa
[...]
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
@@ -59,6 +59,7 @@ import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
import
org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker,
WritableTxnMarkerTopic}
import org.apache.kafka.coordinator.group.GroupConfig
+import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import scala.collection.mutable
@@ -808,6 +809,34 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
}
+ @Test
+ def testFetchConsumerRequest(): Unit = {
+ createTopicWithBrokerPrincipal(topic)
+
+ val request = createFetchRequest
+ val topicNames = getTopicNames().asJava
+
+ def partitionDatas(response: AbstractResponse):
Iterable[FetchResponseData.PartitionData] = {
+ assertTrue(response.isInstanceOf[FetchResponse])
+ response.asInstanceOf[FetchResponse].responseData(topicNames,
ApiKeys.FETCH.latestVersion).values().asScala
+ }
+
+ removeAllClientAcls()
+ val resources = Set(topicResource.resourceType,
clusterResource.resourceType)
+ val failedResponse = sendRequestAndVerifyResponseError(request, resources,
isAuthorized = false)
+ val failedPartitionDatas = partitionDatas(failedResponse)
+ assertEquals(1, failedPartitionDatas.size)
+ // Some clients (like librdkafka) always expect non-null records - even
for the cases where an error is returned
+ failedPartitionDatas.foreach(partitionData =>
assertEquals(MemoryRecords.EMPTY, partitionData.records))
+
+ val readAcls = topicReadAcl(topicResource)
+ addAndVerifyAcls(readAcls, topicResource)
+ val succeededResponse = sendRequestAndVerifyResponseError(request,
resources, isAuthorized = true)
+ val succeededPartitionDatas = partitionDatas(succeededResponse)
+ assertEquals(1, succeededPartitionDatas.size)
+ succeededPartitionDatas.foreach(partitionData =>
assertEquals(MemoryRecords.EMPTY, partitionData.records))
+ }
+
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def
testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum:
String): Unit = {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index ee840ff59ed..a159d8c5cb2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1802,6 +1802,8 @@ public final class RaftClientTestContext {
partitionData.divergingEpoch()
.setEpoch(divergingEpoch)
.setEndOffset(divergingEpochEndOffset);
+
+ partitionData.setRecords(MemoryRecords.EMPTY);
}
);
}
@@ -1830,6 +1832,8 @@ public final class RaftClientTestContext {
partitionData.snapshotId()
.setEpoch(snapshotId.epoch())
.setEndOffset(snapshotId.offset());
+
+ partitionData.setRecords(MemoryRecords.EMPTY);
}
);
}