This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d37b0f07ff KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to 
GroupCoordinator interface (#13329)
6d37b0f07ff is described below

commit 6d37b0f07ff56ede7b4e066cf873f8b67ca449d1
Author: David Jacot <[email protected]>
AuthorDate: Tue Mar 7 09:20:03 2023 +0100

    KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator 
interface (#13329)
    
    This patch adds ConsumerGroupHeartbeat to the GroupCoordinator interface 
and implements the API in KafkaApis.
    
    Reviewers: Jeff Kim <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../group/GroupCoordinatorAdapter.scala            | 14 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 24 +++++++-
 .../group/GroupCoordinatorAdapterTest.scala        | 20 +++++-
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 14 +++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 72 ++++++++++++++++++++--
 .../kafka/coordinator/group/GroupCoordinator.java  | 15 +++++
 .../org/apache/kafka/server/util/FutureUtils.java  | 12 ++++
 7 files changed, 159 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 660070d024c..e49ca2dc5cc 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -20,12 +20,13 @@ import kafka.common.OffsetAndMetadata
 import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Implicits.MapExtensionMethods
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, 
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, 
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, 
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchRespon [...]
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, 
TransactionResult}
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.util.FutureUtils
 
 import java.util
 import java.util.{Optional, OptionalInt, Properties}
@@ -62,6 +63,15 @@ private[group] class GroupCoordinatorAdapter(
   private val time: Time
 ) extends org.apache.kafka.coordinator.group.GroupCoordinator {
 
+  override def consumerGroupHeartbeat(
+    context: RequestContext,
+    request: ConsumerGroupHeartbeatRequestData
+  ): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
+    FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+      s"The old group coordinator does not support 
${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."
+    ))
+  }
+
   override def joinGroup(
     context: RequestContext,
     request: JoinGroupRequestData,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59e7fb022f7..ad0f1d1b784 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3579,9 +3579,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
-    // KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
-    requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
consumerGroupHeartbeatRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      groupCoordinator.consumerGroupHeartbeat(
+        request.context,
+        consumerGroupHeartbeatRequest.data,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(request, new 
ConsumerGroupHeartbeatResponse(response))
+        }
+      }
+    }
   }
 
   private def updateRecordConversionStats(request: RequestChannel.Request,
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 03a271689eb..6704e2317e6 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -21,8 +21,8 @@ import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallbac
 import kafka.server.RequestLocal
 import kafka.utils.MockTime
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidGroupIdException
-import org.apache.kafka.common.message.{DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, 
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, 
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.errors.{InvalidGroupIdException, 
UnsupportedVersionException}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, SyncGroupRequestData, SyncGr [...]
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
 OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -62,6 +62,22 @@ class GroupCoordinatorAdapterTest {
     )
   }
 
+  @Test
+  def testJoinConsumerGroup(): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+    val ctx = makeContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT, 
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion)
+    val request = new ConsumerGroupHeartbeatRequestData()
+      .setGroupId("group")
+
+    val future = adapter.consumerGroupHeartbeat(ctx, request)
+
+    assertTrue(future.isDone)
+    assertTrue(future.isCompletedExceptionally)
+    assertFutureThrows(future, classOf[UnsupportedVersionException])
+  }
+
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
   def testJoinGroup(version: Short): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 33e449f7531..60f8d75157c 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -54,6 +54,20 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+  ))
+  def 
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit 
= {
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+    ).build()
+
+    val consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+    val expectedResponse = new 
ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+    assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
+  }
+
   private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): 
ConsumerGroupHeartbeatResponse = {
     IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
       request,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 014e3ef0220..0638bf36323 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5817,11 +5817,9 @@ class KafkaApisTest {
 
   @Test
   def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
-    val requestChannelRequest = buildRequest(
-      new ConsumerGroupHeartbeatRequest.Builder(new 
ConsumerGroupHeartbeatRequestData()
-        .setGroupId("group")
-      ).build()
-    )
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
 
     createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
 
@@ -5830,4 +5828,68 @@ class KafkaApisTest {
     val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
     assertEquals(expectedHeartbeatResponse, response.data)
   }
+
+  @Test
+  def testConsumerGroupHeartbeatRequest(): Unit = {
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+    val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
+    when(groupCoordinator.consumerGroupHeartbeat(
+      requestChannelRequest.context,
+      consumerGroupHeartbeatRequest
+    )).thenReturn(future)
+
+    createKafkaApis(overrideProperties = Map(
+      KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+    )).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val consumerGroupHeartbeatResponse = new 
ConsumerGroupHeartbeatResponseData()
+      .setMemberId("member")
+
+    future.complete(consumerGroupHeartbeatResponse)
+    val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(consumerGroupHeartbeatResponse, response.data)
+  }
+
+  @Test
+  def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = {
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+    val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
+    when(groupCoordinator.consumerGroupHeartbeat(
+      requestChannelRequest.context,
+      consumerGroupHeartbeatRequest
+    )).thenReturn(future)
+
+    createKafkaApis(overrideProperties = Map(
+      KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+    )).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+    val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+  }
+
+  @Test
+  def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index c9cfadaba2f..f448346a116 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -52,6 +54,19 @@ import java.util.function.IntSupplier;
  */
 public interface GroupCoordinator {
 
+    /**
+     * Heartbeat to a Consumer Group.
+     *
+     * @param context           The request context.
+     * @param request           The ConsumerGroupHeartbeatResponse data.
+     *
+     * @return A future yielding the response or an exception.
+     */
+    CompletableFuture<ConsumerGroupHeartbeatResponseData> 
consumerGroupHeartbeat(
+        RequestContext context,
+        ConsumerGroupHeartbeatRequestData request
+    );
+
     /**
      * Join a Generic Group.
      *
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index 3383904d5ec..0d3ee81dc96 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -90,4 +90,16 @@ public class FutureUtils {
             }
         });
     }
+
+    /**
+     * Returns a new CompletableFuture that is already completed exceptionally 
with the given exception.
+     *
+     * @param ex    The exception.
+     * @return      The exceptionally completed CompletableFuture.
+     */
+    public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        future.completeExceptionally(ex);
+        return future;
+    }
 }

Reply via email to