This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d37b0f07ff KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to
GroupCoordinator interface (#13329)
6d37b0f07ff is described below
commit 6d37b0f07ff56ede7b4e066cf873f8b67ca449d1
Author: David Jacot <[email protected]>
AuthorDate: Tue Mar 7 09:20:03 2023 +0100
KAFKA-14462; [2/N] Add ConsumerGroupHeartbeart to GroupCoordinator
interface (#13329)
This patch adds ConsumerGroupHeartbeat to the GroupCoordinator interface
and implements the API in KafkaApis.
Reviewers: Jeff Kim <[email protected]>, Justine Olshan
<[email protected]>
---
.../group/GroupCoordinatorAdapter.scala | 14 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 24 +++++++-
.../group/GroupCoordinatorAdapterTest.scala | 20 +++++-
.../server/ConsumerGroupHeartbeatRequestTest.scala | 14 +++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 72 ++++++++++++++++++++--
.../kafka/coordinator/group/GroupCoordinator.java | 15 +++++
.../org/apache/kafka/server/util/FutureUtils.java | 12 ++++
7 files changed, 159 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 660070d024c..e49ca2dc5cc 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -20,12 +20,13 @@ import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData,
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData,
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData,
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchRespon [...]
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext,
TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.util.FutureUtils
import java.util
import java.util.{Optional, OptionalInt, Properties}
@@ -62,6 +63,15 @@ private[group] class GroupCoordinatorAdapter(
private val time: Time
) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+ override def consumerGroupHeartbeat(
+ context: RequestContext,
+ request: ConsumerGroupHeartbeatRequestData
+ ): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
+ FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+ s"The old group coordinator does not support
${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."
+ ))
+ }
+
override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 59e7fb022f7..ad0f1d1b784 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3579,9 +3579,27 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleConsumerGroupHeartbeat(request: RequestChannel.Request):
CompletableFuture[Unit] = {
val consumerGroupHeartbeatRequest =
request.body[ConsumerGroupHeartbeatRequest]
- // KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
- requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+
+ if (!config.isNewGroupCoordinatorEnabled) {
+ // The API is not supported by the "old" group coordinator (the
default). If the
+ // new one is not enabled, we fail directly here.
+ requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
consumerGroupHeartbeatRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
+ } else {
+ groupCoordinator.consumerGroupHeartbeat(
+ request.context,
+ consumerGroupHeartbeatRequest.data,
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
consumerGroupHeartbeatRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(request, new
ConsumerGroupHeartbeatResponse(response))
+ }
+ }
+ }
}
private def updateRecordConversionStats(request: RequestChannel.Request,
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 03a271689eb..6704e2317e6 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -21,8 +21,8 @@ import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallbac
import kafka.server.RequestLocal
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidGroupIdException
-import org.apache.kafka.common.message.{DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData,
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData,
SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitReq [...]
+import org.apache.kafka.common.errors.{InvalidGroupIdException,
UnsupportedVersionException}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData,
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData,
OffsetFetchResponseData, SyncGroupRequestData, SyncGr [...]
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@@ -62,6 +62,22 @@ class GroupCoordinatorAdapterTest {
)
}
+ @Test
+ def testJoinConsumerGroup(): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+ val ctx = makeContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT,
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion)
+ val request = new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("group")
+
+ val future = adapter.consumerGroupHeartbeat(ctx, request)
+
+ assertTrue(future.isDone)
+ assertTrue(future.isCompletedExceptionally)
+ assertFutureThrows(future, classOf[UnsupportedVersionException])
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroup(version: Short): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 33e449f7531..60f8d75157c 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -54,6 +54,20 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
+ @ClusterTest(serverProperties = Array(
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true")
+ ))
+ def
testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit
= {
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ ).build()
+
+ val consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ val expectedResponse = new
ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
+ }
+
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest):
ConsumerGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
request,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 014e3ef0220..0638bf36323 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -5817,11 +5817,9 @@ class KafkaApisTest {
@Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
- val requestChannelRequest = buildRequest(
- new ConsumerGroupHeartbeatRequest.Builder(new
ConsumerGroupHeartbeatRequestData()
- .setGroupId("group")
- ).build()
- )
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
@@ -5830,4 +5828,68 @@ class KafkaApisTest {
val response =
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
assertEquals(expectedHeartbeatResponse, response.data)
}
+
+ @Test
+ def testConsumerGroupHeartbeatRequest(): Unit = {
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+ val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
+ when(groupCoordinator.consumerGroupHeartbeat(
+ requestChannelRequest.context,
+ consumerGroupHeartbeatRequest
+ )).thenReturn(future)
+
+ createKafkaApis(overrideProperties = Map(
+ KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+ )).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ val consumerGroupHeartbeatResponse = new
ConsumerGroupHeartbeatResponseData()
+ .setMemberId("member")
+
+ future.complete(consumerGroupHeartbeatResponse)
+ val response =
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(consumerGroupHeartbeatResponse, response.data)
+ }
+
+ @Test
+ def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = {
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+ val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
+ when(groupCoordinator.consumerGroupHeartbeat(
+ requestChannelRequest.context,
+ consumerGroupHeartbeatRequest
+ )).thenReturn(future)
+
+ createKafkaApis(overrideProperties = Map(
+ KafkaConfig.NewGroupCoordinatorEnableProp -> "true"
+ )).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+ val response =
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode)
+ }
+
+ @Test
+ def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+ createKafkaApis(
+ authorizer = Some(authorizer),
+ overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp ->
"true")
+ ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+ val response =
verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
response.data.errorCode)
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index c9cfadaba2f..f448346a116 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -17,6 +17,8 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -52,6 +54,19 @@ import java.util.function.IntSupplier;
*/
public interface GroupCoordinator {
+ /**
+ * Heartbeat to a Consumer Group.
+ *
+ * @param context The request context.
+ * @param request The ConsumerGroupHeartbeatResponse data.
+ *
+ * @return A future yielding the response or an exception.
+ */
+ CompletableFuture<ConsumerGroupHeartbeatResponseData>
consumerGroupHeartbeat(
+ RequestContext context,
+ ConsumerGroupHeartbeatRequestData request
+ );
+
/**
* Join a Generic Group.
*
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index 3383904d5ec..0d3ee81dc96 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -90,4 +90,16 @@ public class FutureUtils {
}
});
}
+
+ /**
+ * Returns a new CompletableFuture that is already completed exceptionally
with the given exception.
+ *
+ * @param ex The exception.
+ * @return The exceptionally completed CompletableFuture.
+ */
+ public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(ex);
+ return future;
+ }
}