This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new e6cf82bb6cc KAFKA-18646: Null records in fetch response breaks 
librdkafka (#18726)
e6cf82bb6cc is described below

commit e6cf82bb6cc41732973b70122adf49afd8f237f2
Author: Ismael Juma <[email protected]>
AuthorDate: Wed Jan 29 07:04:12 2025 -0800

    KAFKA-18646: Null records in fetch response breaks librdkafka (#18726)
    
    Ensure we always return empty records (including cases where an error is 
returned).
    We also remove `nullable` from `records` since it is effectively expected 
to be
    non-null by a large percentage of clients in the wild.
    
    This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
    previously set via `FetchResponse.recordsOrFail(partitionData)` in the
    now-removed `maybeConvertFetchedData` method.
    
    Added an integration test that fails without this fix and also update many
    tests to set `records` to `empty` instead of leaving them as `null`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur 
<[email protected]>
---
 .../kafka/common/requests/FetchResponse.java       |  5 ++--
 .../resources/common/message/FetchResponse.json    |  2 +-
 .../kafka/common/requests/RequestResponseTest.java |  6 +++--
 .../kafka/api/AuthorizerIntegrationTest.scala      | 31 +++++++++++++++++++++-
 .../apache/kafka/raft/RaftClientTestContext.java   |  4 +++
 5 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index fb2a5a3c87b..91837bfaa55 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -196,7 +196,8 @@ public class FetchResponse extends AbstractResponse {
         return new FetchResponseData.PartitionData()
             .setPartitionIndex(partition)
             .setErrorCode(error.code())
-            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
+            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+            .setRecords(MemoryRecords.EMPTY);
     }
 
     /**
@@ -285,4 +286,4 @@ public class FetchResponse extends AbstractResponse {
                 .setSessionId(sessionId)
                 .setResponses(topicResponseList);
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/resources/common/message/FetchResponse.json 
b/clients/src/main/resources/common/message/FetchResponse.json
index dc8d3517566..495f9a35e23 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -106,7 +106,7 @@
         ]},
         { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
           "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-        { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}
+        { "name": "Records", "type": "records", "versions": "0+", "about": 
"The record data."}
       ]}
     ]},
     { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", 
"taggedVersions": "16+", "tag": 0,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c6be52fb189..a97c0bd9e15 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2012,7 +2012,8 @@ public class RequestResponseTest {
                         .setPartitionIndex(1)
                         .setHighWatermark(1000000)
                         .setLogStartOffset(0)
-                        .setAbortedTransactions(abortedTransactions));
+                        .setAbortedTransactions(abortedTransactions)
+                        .setRecords(MemoryRecords.EMPTY));
         return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId,
             responseData).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
     }
@@ -2036,7 +2037,8 @@ public class RequestResponseTest {
                         .setPartitionIndex(1)
                         .setHighWatermark(1000000)
                         .setLogStartOffset(0)
-                        .setAbortedTransactions(abortedTransactions));
+                        .setAbortedTransactions(abortedTransactions)
+                        .setRecords(MemoryRecords.EMPTY));
         return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, 
INVALID_SESSION_ID,
             responseData).serialize(FETCH.latestVersion()), 
FETCH.latestVersion());
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 30e6f0384fb..4256d40da4a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -37,7 +37,7 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition,
 OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteTopicsRequestData, DescribeClusterRequestData, 
DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestDa 
[...]
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, 
ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, 
CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, 
DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, 
DeleteTopicsRequestData, DescribeClusterRequestData, 
DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestDa 
[...]
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
@@ -59,6 +59,7 @@ import java.util.Collections.singletonList
 import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
 import 
org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, 
WritableTxnMarkerTopic}
 import org.apache.kafka.coordinator.group.GroupConfig
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.function.Executable
 
 import scala.collection.mutable
@@ -808,6 +809,34 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
   }
 
+  @Test
+  def testFetchConsumerRequest(): Unit = {
+    createTopicWithBrokerPrincipal(topic)
+
+    val request = createFetchRequest
+    val topicNames = getTopicNames().asJava
+
+    def partitionDatas(response: AbstractResponse): 
Iterable[FetchResponseData.PartitionData] = {
+      assertTrue(response.isInstanceOf[FetchResponse])
+      response.asInstanceOf[FetchResponse].responseData(topicNames, 
ApiKeys.FETCH.latestVersion).values().asScala
+    }
+
+    removeAllClientAcls()
+    val resources = Set(topicResource.resourceType, 
clusterResource.resourceType)
+    val failedResponse = sendRequestAndVerifyResponseError(request, resources, 
isAuthorized = false)
+    val failedPartitionDatas = partitionDatas(failedResponse)
+    assertEquals(1, failedPartitionDatas.size)
+    // Some clients (like librdkafka) always expect non-null records - even 
for the cases where an error is returned
+    failedPartitionDatas.foreach(partitionData => 
assertEquals(MemoryRecords.EMPTY, partitionData.records))
+
+    val readAcls = topicReadAcl(topicResource)
+    addAndVerifyAcls(readAcls, topicResource)
+    val succeededResponse = sendRequestAndVerifyResponseError(request, 
resources, isAuthorized = true)
+    val succeededPartitionDatas = partitionDatas(succeededResponse)
+    assertEquals(1, succeededPartitionDatas.size)
+    succeededPartitionDatas.foreach(partitionData => 
assertEquals(MemoryRecords.EMPTY, partitionData.records))
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def 
testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum:
 String): Unit = {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 13ad1d26482..278cc6375b5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1804,6 +1804,8 @@ public final class RaftClientTestContext {
                 partitionData.divergingEpoch()
                     .setEpoch(divergingEpoch)
                     .setEndOffset(divergingEpochEndOffset);
+
+                partitionData.setRecords(MemoryRecords.EMPTY);
             }
         );
     }
@@ -1832,6 +1834,8 @@ public final class RaftClientTestContext {
                 partitionData.snapshotId()
                     .setEpoch(snapshotId.epoch())
                     .setEndOffset(snapshotId.offset());
+
+                partitionData.setRecords(MemoryRecords.EMPTY);
             }
         );
     }

Reply via email to