This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c501b8599c0 KAFKA-18399 Remove ZooKeeper from KafkaApis (4/N): 
OFFSET_COMMIT and OFFSET_FETCH (#18461)
c501b8599c0 is described below

commit c501b8599c041f8990212aaf3f157a7ebed57db2
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Sun Jan 12 20:54:10 2025 +0800

    KAFKA-18399 Remove ZooKeeper from KafkaApis (4/N): OFFSET_COMMIT and 
OFFSET_FETCH (#18461)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/KafkaApis.scala | 109 -----------------------
 1 file changed, 109 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 88e032e7cc8..35c872abe13 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -326,14 +326,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopicsRequest.isEmpty) {
         requestHelper.sendMaybeThrottle(request, responseBuilder.build())
         CompletableFuture.completedFuture(())
-      } else if (request.header.apiVersion == 0) {
-        // For version 0, always store offsets in ZK.
-        commitOffsetsToZookeeper(
-          request,
-          offsetCommitRequest,
-          authorizedTopicsRequest,
-          responseBuilder
-        )
       } else {
         // For version > 0, store offsets in Coordinator.
         commitOffsetsToCoordinator(
@@ -347,41 +339,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def commitOffsetsToZookeeper(
-    request: RequestChannel.Request,
-    offsetCommitRequest: OffsetCommitRequest,
-    authorizedTopicsRequest: 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
-    responseBuilder: OffsetCommitResponse.Builder
-  ): CompletableFuture[Unit] = {
-    val zkSupport = metadataSupport.requireZkOrThrow(
-      KafkaApis.unsupported("Version 0 offset commit requests"))
-
-    authorizedTopicsRequest.foreach { topic =>
-      topic.partitions.forEach { partition =>
-        val error = try {
-          if (partition.committedMetadata != null && 
partition.committedMetadata.length > 
config.groupCoordinatorConfig.offsetMetadataMaxSize) {
-            Errors.OFFSET_METADATA_TOO_LARGE
-          } else {
-            zkSupport.zkClient.setOrCreateConsumerOffset(
-              offsetCommitRequest.data.groupId,
-              new TopicPartition(topic.name, partition.partitionIndex),
-              partition.committedOffset
-            )
-            Errors.NONE
-          }
-        } catch {
-          case e: Throwable =>
-            Errors.forException(e)
-        }
-
-        responseBuilder.addPartition(topic.name, partition.partitionIndex, 
error)
-      }
-    }
-
-    requestHelper.sendMaybeThrottle(request, responseBuilder.build())
-    CompletableFuture.completedFuture[Unit](())
-  }
-
   private def commitOffsetsToCoordinator(
     request: RequestChannel.Request,
     offsetCommitRequest: OffsetCommitRequest,
@@ -1048,61 +1005,6 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset fetch request
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    val version = request.header.apiVersion
-    if (version == 0) {
-      handleOffsetFetchRequestFromZookeeper(request)
-    } else {
-      handleOffsetFetchRequestFromCoordinator(request)
-    }
-  }
-
-  private def handleOffsetFetchRequestFromZookeeper(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        // reject the request if not authorized to the group
-        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, 
offsetFetchRequest.groupId))
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, 
Errors.GROUP_AUTHORIZATION_FAILED)
-        else {
-          val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch 
requests"))
-          val (authorizedPartitions, unauthorizedPartitions) = 
partitionByAuthorized(
-            offsetFetchRequest.partitions.asScala, request.context)
-
-          // version 0 reads offsets from ZK
-          val authorizedPartitionData = authorizedPartitions.map { 
topicPartition =>
-            try {
-              if (!metadataCache.contains(topicPartition))
-                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-              else {
-                val payloadOpt = 
zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
-                payloadOpt match {
-                  case Some(payload) =>
-                    (topicPartition, new 
OffsetFetchResponse.PartitionData(payload,
-                      Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.NONE))
-                  case None =>
-                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
-                }
-              }
-            } catch {
-              case e: Throwable =>
-                (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                  Optional.empty(), OffsetFetchResponse.NO_METADATA, 
Errors.forException(e)))
-            }
-          }.toMap
-
-          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
(authorizedPartitionData ++ unauthorizedPartitionData).asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
-    }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    CompletableFuture.completedFuture[Unit](())
-  }
-
-  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
     val groups = offsetFetchRequest.groups()
     val requireStable = offsetFetchRequest.requireStable()
@@ -1213,13 +1115,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def partitionByAuthorized(
-    seq: Seq[TopicPartition],
-    context: RequestContext
-  ): (Seq[TopicPartition], Seq[TopicPartition]) = {
-    authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
-  }
-
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
     if (version < 4) {
@@ -4357,8 +4252,4 @@ object KafkaApis {
   private[server] def shouldAlwaysForward(request: RequestChannel.Request): 
Exception = {
     new UnsupportedVersionException(s"Should always be forwarded to the Active 
Controller when using a Raft-based metadata quorum: ${request.header.apiKey}")
   }
-
-  private def unsupported(text: String): Exception = {
-    new UnsupportedVersionException(s"Unsupported when using a Raft-based 
metadata quorum: $text")
-  }
 }

Reply via email to