This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new c501b8599c0 KAFKA-18399 Remove ZooKeeper from KafkaApis (4/N): OFFSET_COMMIT and OFFSET_FETCH (#18461) c501b8599c0 is described below commit c501b8599c041f8990212aaf3f157a7ebed57db2 Author: PoAn Yang <pay...@apache.org> AuthorDate: Sun Jan 12 20:54:10 2025 +0800 KAFKA-18399 Remove ZooKeeper from KafkaApis (4/N): OFFSET_COMMIT and OFFSET_FETCH (#18461) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/KafkaApis.scala | 109 ----------------------- 1 file changed, 109 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 88e032e7cc8..35c872abe13 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -326,14 +326,6 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopicsRequest.isEmpty) { requestHelper.sendMaybeThrottle(request, responseBuilder.build()) CompletableFuture.completedFuture(()) - } else if (request.header.apiVersion == 0) { - // For version 0, always store offsets in ZK. - commitOffsetsToZookeeper( - request, - offsetCommitRequest, - authorizedTopicsRequest, - responseBuilder - ) } else { // For version > 0, store offsets in Coordinator. commitOffsetsToCoordinator( @@ -347,41 +339,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def commitOffsetsToZookeeper( - request: RequestChannel.Request, - offsetCommitRequest: OffsetCommitRequest, - authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic], - responseBuilder: OffsetCommitResponse.Builder - ): CompletableFuture[Unit] = { - val zkSupport = metadataSupport.requireZkOrThrow( - KafkaApis.unsupported("Version 0 offset commit requests")) - - authorizedTopicsRequest.foreach { topic => - topic.partitions.forEach { partition => - val error = try { - if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) { - Errors.OFFSET_METADATA_TOO_LARGE - } else { - zkSupport.zkClient.setOrCreateConsumerOffset( - offsetCommitRequest.data.groupId, - new TopicPartition(topic.name, partition.partitionIndex), - partition.committedOffset - ) - Errors.NONE - } - } catch { - case e: Throwable => - Errors.forException(e) - } - - responseBuilder.addPartition(topic.name, partition.partitionIndex, error) - } - } - - requestHelper.sendMaybeThrottle(request, responseBuilder.build()) - CompletableFuture.completedFuture[Unit](()) - } - private def commitOffsetsToCoordinator( request: RequestChannel.Request, offsetCommitRequest: OffsetCommitRequest, @@ -1048,61 +1005,6 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { - val version = request.header.apiVersion - if (version == 0) { - handleOffsetFetchRequestFromZookeeper(request) - } else { - handleOffsetFetchRequestFromCoordinator(request) - } - } - - private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = { - val header = request.header - val offsetFetchRequest = request.body[OffsetFetchRequest] - - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val offsetFetchResponse = - // reject the request if not authorized to the group - if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId)) - offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED) - else { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests")) - val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized( - offsetFetchRequest.partitions.asScala, request.context) - - // version 0 reads offsets from ZK - val authorizedPartitionData = authorizedPartitions.map { topicPartition => - try { - if (!metadataCache.contains(topicPartition)) - (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) - else { - val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition) - payloadOpt match { - case Some(payload) => - (topicPartition, new OffsetFetchResponse.PartitionData(payload, - Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE)) - case None => - (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) - } - } - } catch { - case e: Throwable => - (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, - Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e))) - } - }.toMap - - val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap - new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) - } - trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") - offsetFetchResponse - } - requestHelper.sendResponseMaybeThrottle(request, createResponse) - CompletableFuture.completedFuture[Unit](()) - } - - private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = { val offsetFetchRequest = request.body[OffsetFetchRequest] val groups = offsetFetchRequest.groups() val requireStable = offsetFetchRequest.requireStable() @@ -1213,13 +1115,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def partitionByAuthorized( - seq: Seq[TopicPartition], - context: RequestContext - ): (Seq[TopicPartition], Seq[TopicPartition]) = { - authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic) - } - def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion if (version < 4) { @@ -4357,8 +4252,4 @@ object KafkaApis { private[server] def shouldAlwaysForward(request: RequestChannel.Request): Exception = { new UnsupportedVersionException(s"Should always be forwarded to the Active Controller when using a Raft-based metadata quorum: ${request.header.apiKey}") } - - private def unsupported(text: String): Exception = { - new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text") - } }