This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd05073cc17 KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator`
interface (#12847)
fd05073cc17 is described below
commit fd05073cc17a788d07d85a87c5c4053317a8ffa2
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 2 17:15:29 2022 +0100
KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
(#12847)
This patch adds `syncGroup` to the new `GroupCoordinator` interface and
updates `KafkaApis` to use it.
Reviewers: Justine Olshan <[email protected]>, Jeff Kim
<[email protected]>, Jason Gustafson <[email protected]>
---
.../group/GroupCoordinatorAdapter.scala | 39 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 72 +++-----
.../scala/kafka/server/RequestHandlerHelper.scala | 12 ++
.../group/GroupCoordinatorAdapterTest.scala | 72 +++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 185 ++++++++++++---------
.../kafka/coordinator/group/GroupCoordinator.java | 17 ++
6 files changed, 266 insertions(+), 131 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index f2dd34a5379..c86fed815e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -17,11 +17,12 @@
package kafka.coordinator.group
import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.{HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
SyncGroupRequestData, SyncGroupResponseData}
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.utils.BufferSupplier
import java.util.concurrent.CompletableFuture
+import scala.collection.immutable
import scala.jdk.CollectionConverters._
/**
@@ -84,6 +85,42 @@ class GroupCoordinatorAdapter(
future
}
+ override def syncGroup(
+ context: RequestContext,
+ request: SyncGroupRequestData,
+ bufferSupplier: BufferSupplier
+ ): CompletableFuture[SyncGroupResponseData] = {
+ val future = new CompletableFuture[SyncGroupResponseData]()
+
+ def callback(syncGroupResult: SyncGroupResult): Unit = {
+ future.complete(new SyncGroupResponseData()
+ .setErrorCode(syncGroupResult.error.code)
+ .setProtocolType(syncGroupResult.protocolType.orNull)
+ .setProtocolName(syncGroupResult.protocolName.orNull)
+ .setAssignment(syncGroupResult.memberAssignment)
+ )
+ }
+
+ val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
+ request.assignments.forEach { assignment =>
+ assignmentMap += assignment.memberId -> assignment.assignment
+ }
+
+ coordinator.handleSyncGroup(
+ request.groupId,
+ request.generationId,
+ request.memberId,
+ Option(request.protocolType),
+ Option(request.protocolName),
+ Option(request.groupInstanceId),
+ assignmentMap.result(),
+ callback,
+ RequestLocal(bufferSupplier)
+ )
+
+ future
+ }
+
override def heartbeat(
context: RequestContext,
request: HeartbeatRequestData
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d8cc117d189..df6ae7682c4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.HEARTBEAT =>
handleHeartbeatRequest(request).exceptionally(handleError)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
- case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request,
requestLocal)
+ case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request,
requestLocal).exceptionally(handleError)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
@@ -1660,23 +1660,14 @@ class KafkaApis(val requestChannel: RequestChannel,
): CompletableFuture[Unit] = {
val joinGroupRequest = request.body[JoinGroupRequest]
- def sendResponse(response: AbstractResponse): Unit = {
- trace("Sending join group response %s for correlation id %d to client
%s."
- .format(response, request.header.correlationId,
request.header.clientId))
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
- response.maybeSetThrottleTimeMs(requestThrottleMs)
- response
- })
- }
-
if (joinGroupRequest.data.groupInstanceId != null &&
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe
for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being
loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally
become "dynamic", which leads to wrong states.
-
sendResponse(joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ requestHelper.sendMaybeThrottle(request,
joinGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP,
joinGroupRequest.data.groupId)) {
-
sendResponse(joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
joinGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
newGroupCoordinator.joinGroup(
@@ -1685,56 +1676,45 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLocal.bufferSupplier
).handle[Unit] { (response, exception) =>
if (exception != null) {
- sendResponse(joinGroupRequest.getErrorResponse(exception))
+ requestHelper.sendMaybeThrottle(request,
joinGroupRequest.getErrorResponse(exception))
} else {
- sendResponse(new JoinGroupResponse(response,
request.context.apiVersion))
+ requestHelper.sendMaybeThrottle(request, new
JoinGroupResponse(response, request.context.apiVersion))
}
}
}
}
- def handleSyncGroupRequest(request: RequestChannel.Request, requestLocal:
RequestLocal): Unit = {
+ def handleSyncGroupRequest(
+ request: RequestChannel.Request,
+ requestLocal: RequestLocal
+ ): CompletableFuture[Unit] = {
val syncGroupRequest = request.body[SyncGroupRequest]
- def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
- new SyncGroupResponse(
- new SyncGroupResponseData()
- .setErrorCode(syncGroupResult.error.code)
- .setProtocolType(syncGroupResult.protocolType.orNull)
- .setProtocolName(syncGroupResult.protocolName.orNull)
- .setAssignment(syncGroupResult.memberAssignment)
- .setThrottleTimeMs(requestThrottleMs)
- ))
- }
-
if (syncGroupRequest.data.groupInstanceId != null &&
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
// Only enable static membership when IBP >= 2.3, because it is not safe
for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being
loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally
become "dynamic", which leads to wrong states.
- sendResponseCallback(SyncGroupResult(Errors.UNSUPPORTED_VERSION))
+ requestHelper.sendMaybeThrottle(request,
syncGroupRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
} else if (!syncGroupRequest.areMandatoryProtocolTypeAndNamePresent()) {
// Starting from version 5, ProtocolType and ProtocolName fields are
mandatory.
- sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
+ requestHelper.sendMaybeThrottle(request,
syncGroupRequest.getErrorResponse(Errors.INCONSISTENT_GROUP_PROTOCOL.exception))
+ CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP,
syncGroupRequest.data.groupId)) {
- sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
+ requestHelper.sendMaybeThrottle(request,
syncGroupRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ CompletableFuture.completedFuture[Unit](())
} else {
- val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
- syncGroupRequest.data.assignments.forEach { assignment =>
- assignmentMap += (assignment.memberId -> assignment.assignment)
- }
-
- groupCoordinator.handleSyncGroup(
- syncGroupRequest.data.groupId,
- syncGroupRequest.data.generationId,
- syncGroupRequest.data.memberId,
- Option(syncGroupRequest.data.protocolType),
- Option(syncGroupRequest.data.protocolName),
- Option(syncGroupRequest.data.groupInstanceId),
- assignmentMap.result(),
- sendResponseCallback,
- requestLocal
- )
+ newGroupCoordinator.syncGroup(
+ request.context,
+ syncGroupRequest.data,
+ requestLocal.bufferSupplier
+ ).handle[Unit] { (response, exception) =>
+ if (exception != null) {
+ requestHelper.sendMaybeThrottle(request,
syncGroupRequest.getErrorResponse(exception))
+ } else {
+ requestHelper.sendMaybeThrottle(request, new
SyncGroupResponse(response))
+ }
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
index 5db595986ef..75bd7a32969 100644
--- a/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
+++ b/core/src/main/scala/kafka/server/RequestHandlerHelper.scala
@@ -107,6 +107,18 @@ class RequestHandlerHelper(
// Throttle the channel if the request quota is enabled but has been
violated. Regardless of throttling, send the
// response immediately.
+ def sendMaybeThrottle(
+ request: RequestChannel.Request,
+ response: AbstractResponse
+ ): Unit = {
+ val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+ // Only throttle non-forwarded requests
+ if (!request.isForwarded)
+ throttle(quotas.request, request, throttleTimeMs)
+ response.maybeSetThrottleTimeMs(throttleTimeMs)
+ requestChannel.sendResponse(request, response, None)
+ }
+
def sendResponseMaybeThrottle(request: RequestChannel.Request,
createResponse: Int => AbstractResponse): Unit
= {
val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index d88e7f44325..bf8077707ba 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -16,9 +16,9 @@
*/
package kafka.coordinator.group
-import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback,
SyncGroupCallback}
import kafka.server.RequestLocal
-import org.apache.kafka.common.message.{HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData}
+import org.apache.kafka.common.message.{HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
SyncGroupRequestData, SyncGroupResponseData}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
@@ -142,6 +142,74 @@ class GroupCoordinatorAdapterTest {
assertEquals(expectedData, future.get())
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testSyncGroup(version: Short): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+ val ctx = makeContext(ApiKeys.SYNC_GROUP, version)
+ val data = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member1")
+ .setGroupInstanceId("instance")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+ .setGenerationId(10)
+ .setAssignments(List(
+ new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member1")
+ .setAssignment("member1".getBytes()),
+ new SyncGroupRequestData.SyncGroupRequestAssignment()
+ .setMemberId("member2")
+ .setAssignment("member2".getBytes())
+ ).asJava)
+ val bufferSupplier = BufferSupplier.create()
+
+ val future = adapter.syncGroup(ctx, data, bufferSupplier)
+ assertFalse(future.isDone)
+
+ val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+ ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+ val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+ ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+ verify(groupCoordinator).handleSyncGroup(
+ ArgumentMatchers.eq(data.groupId),
+ ArgumentMatchers.eq(data.generationId),
+ ArgumentMatchers.eq(data.memberId),
+ ArgumentMatchers.eq(Some(data.protocolType)),
+ ArgumentMatchers.eq(Some(data.protocolName)),
+ ArgumentMatchers.eq(Some(data.groupInstanceId)),
+ capturedAssignment.capture(),
+ capturedCallback.capture(),
+ ArgumentMatchers.eq(RequestLocal(bufferSupplier))
+ )
+
+ assertEquals(Map(
+ "member1" -> "member1",
+ "member2" -> "member2",
+ ), capturedAssignment.getValue.map { case (member, metadata) =>
+ (member, new String(metadata))
+ })
+
+ capturedCallback.getValue.apply(SyncGroupResult(
+ error = Errors.NONE,
+ protocolType = Some("consumer"),
+ protocolName = Some("range"),
+ memberAssignment = "member1".getBytes()
+ ))
+
+ val expectedResponseData = new SyncGroupResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+ .setAssignment("member1".getBytes())
+
+ assertTrue(future.isDone)
+ assertEquals(expectedResponseData, future.get())
+ }
+
@Test
def testHeartbeat(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 32a0e0aec1a..459a640ae78 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Optional, Properties, Random}
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController}
-import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.log.AppendOrigin
@@ -2709,110 +2708,132 @@ class KafkaApisTest {
assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
}
- @Test
- def testSyncGroupProtocolTypeAndName(): Unit = {
- for (version <- ApiKeys.SYNC_GROUP.oldestVersion to
ApiKeys.SYNC_GROUP.latestVersion) {
- testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short])
- }
- }
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testHandleSyncGroupRequest(version: Short): Unit = {
+ val syncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
- def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
- reset(groupCoordinator, clientRequestQuotaManager, requestChannel,
replicaManager)
+ val requestChannelRequest = buildRequest(new
SyncGroupRequest.Builder(syncGroupRequest).build(version))
- val groupId = "group"
- val memberId = "member1"
- val protocolType = "consumer"
- val protocolName = "range"
+ val expectedSyncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType(if (version >= 5) "consumer" else null)
+ .setProtocolName(if (version >= 5) "range" else null)
- val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+ val future = new CompletableFuture[SyncGroupResponseData]()
+ when(newGroupCoordinator.syncGroup(
+ requestChannelRequest.context,
+ expectedSyncGroupRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
- val requestLocal = RequestLocal.withThreadConfinedCaching
- val syncGroupRequest = new SyncGroupRequest.Builder(
- new SyncGroupRequestData()
- .setGroupId(groupId)
- .setGenerationId(0)
- .setMemberId(memberId)
- .setProtocolType(protocolType)
- .setProtocolName(protocolName)
- ).build(version)
+ createKafkaApis().handleSyncGroupRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
- val requestChannelRequest = buildRequest(syncGroupRequest)
+ val expectedSyncGroupResponse = new SyncGroupResponseData()
+ .setProtocolType("consumer")
+ .setProtocolName("range")
- createKafkaApis().handleSyncGroupRequest(requestChannelRequest,
requestLocal)
+ future.complete(expectedSyncGroupResponse)
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
+ val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+ assertEquals(expectedSyncGroupResponse, response.data)
+ }
- verify(groupCoordinator).handleSyncGroup(
- ArgumentMatchers.eq(groupId),
- ArgumentMatchers.eq(0),
- ArgumentMatchers.eq(memberId),
- ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
- ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
- ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(Map.empty),
- capturedCallback.capture(),
- ArgumentMatchers.eq(requestLocal)
+ @Test
+ def testHandleSyncGroupRequestFutureFailed(): Unit = {
+ val syncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+
+ val requestChannelRequest = buildRequest(new
SyncGroupRequest.Builder(syncGroupRequest).build())
+
+ val expectedSyncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
+
+ val future = new CompletableFuture[SyncGroupResponseData]()
+ when(newGroupCoordinator.syncGroup(
+ requestChannelRequest.context,
+ expectedSyncGroupRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handleSyncGroupRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
)
- capturedCallback.getValue.apply(SyncGroupResult(
- protocolType = Some(protocolType),
- protocolName = Some(protocolName),
- memberAssignment = Array.empty,
- error = Errors.NONE
- ))
+ future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception)
val capturedResponse = verifyNoThrottling(requestChannelRequest)
val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
-
- assertEquals(Errors.NONE, response.error)
- assertArrayEquals(Array.empty[Byte], response.data.assignment)
- assertEquals(protocolType, response.data.protocolType)
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR, response.error)
}
@Test
- def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(): Unit = {
- for (version <- ApiKeys.SYNC_GROUP.oldestVersion to
ApiKeys.SYNC_GROUP.latestVersion) {
-
testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version.asInstanceOf[Short])
- }
- }
+ def testHandleSyncGroupRequestAuthenticationFailed(): Unit = {
+ val syncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
+ .setProtocolType("consumer")
+ .setProtocolName("range")
- def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short):
Unit = {
- reset(groupCoordinator, clientRequestQuotaManager, requestChannel,
replicaManager)
+ val requestChannelRequest = buildRequest(new
SyncGroupRequest.Builder(syncGroupRequest).build())
- val groupId = "group"
- val memberId = "member1"
- val protocolType = "consumer"
- val protocolName = "range"
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
- val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+ createKafkaApis(authorizer = Some(authorizer)).handleSyncGroupRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
- val requestLocal = RequestLocal.withThreadConfinedCaching
+ val capturedResponse = verifyNoThrottling(requestChannelRequest)
+ val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+ assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
+ }
- val syncGroupRequest = new SyncGroupRequest.Builder(
- new SyncGroupRequestData()
- .setGroupId(groupId)
- .setGenerationId(0)
- .setMemberId(memberId)
- ).build(version)
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+ def testSyncGroupProtocolTypeAndNameAreMandatorySinceV5(version: Short):
Unit = {
+ val syncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
- val requestChannelRequest = buildRequest(syncGroupRequest)
+ val requestChannelRequest = buildRequest(new
SyncGroupRequest.Builder(syncGroupRequest).build(version))
+
+ val expectedSyncGroupRequest = new SyncGroupRequestData()
+ .setGroupId("group")
+ .setMemberId("member")
- createKafkaApis().handleSyncGroupRequest(requestChannelRequest,
requestLocal)
+ val future = new CompletableFuture[SyncGroupResponseData]()
+ when(newGroupCoordinator.syncGroup(
+ requestChannelRequest.context,
+ expectedSyncGroupRequest,
+ RequestLocal.NoCaching.bufferSupplier
+ )).thenReturn(future)
+
+ createKafkaApis().handleSyncGroupRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
if (version < 5) {
- verify(groupCoordinator).handleSyncGroup(
- ArgumentMatchers.eq(groupId),
- ArgumentMatchers.eq(0),
- ArgumentMatchers.eq(memberId),
- ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(Map.empty),
- capturedCallback.capture(),
- ArgumentMatchers.eq(requestLocal))
- capturedCallback.getValue.apply(SyncGroupResult(
- protocolType = Some(protocolType),
- protocolName = Some(protocolName),
- memberAssignment = Array.empty,
- error = Errors.NONE
- ))
+ future.complete(new SyncGroupResponseData()
+ .setProtocolType("consumer")
+ .setProtocolName("range"))
}
val capturedResponse = verifyNoThrottling(requestChannelRequest)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 36935981c6b..70aecf2ccae 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -42,6 +44,21 @@ public interface GroupCoordinator {
BufferSupplier bufferSupplier
);
+ /**
+ * Sync a Generic Group.
+ *
+ * @param context The coordinator request context.
+ * @param request The SyncGroupRequest data.
+ * @param bufferSupplier The buffer supplier tight to the request
thread.
+ *
+ * @return A future yielding the response or an exception.
+ */
+ CompletableFuture<SyncGroupResponseData> syncGroup(
+ RequestContext context,
+ SyncGroupRequestData request,
+ BufferSupplier bufferSupplier
+ );
+
/**
* Heartbeat to a Generic Group.
*