This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 4130f2f9270 Internal topic auto creation (#17433)
4130f2f9270 is described below
commit 4130f2f927060716d05f7629e7e20143436e481c
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 29 14:37:57 2024 +0100
Internal topic auto creation (#17433)
* impl
* fixes
---
.../group/GroupCoordinatorAdapter.scala | 5 +-
.../kafka/server/AutoTopicCreationManager.scala | 39 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 56 +++++-
.../server/AutoTopicCreationManagerTest.scala | 199 ++++++++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 7 +-
.../kafka/coordinator/group/GroupCoordinator.java | 6 +-
.../coordinator/group/GroupCoordinatorService.java | 11 +-
.../coordinator/group/GroupCoordinatorShard.java | 4 +-
.../coordinator/group/GroupMetadataManager.java | 55 +++---
.../streams/StreamsGroupInitializeResult.java | 74 ++++++++
.../group/GroupCoordinatorServiceTest.java | 14 +-
.../group/GroupCoordinatorShardTest.java | 5 +-
.../group/GroupMetadataManagerTest.java | 51 ++++--
.../group/GroupMetadataManagerTestContext.java | 6 +-
.../SmokeTestDriverIntegrationTest.java | 13 --
streams/src/test/resources/log4j.properties | 2 +
16 files changed, 446 insertions(+), 101 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index b6961e6a198..34cf71464cb 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -19,13 +19,14 @@ package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData,
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData,
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData,
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData,
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData,
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData,
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext,
TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.FutureUtils
@@ -80,7 +81,7 @@ private[group] class GroupCoordinatorAdapter(
override def streamsGroupInitialize(
context: RequestContext,
request: StreamsGroupInitializeRequestData
- ):
CompletableFuture[StreamsGroupInitializeResponseData] = {
+ ):
CompletableFuture[StreamsGroupInitializeResult] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support
${ApiKeys.STREAMS_GROUP_INITIALIZE.name} API."
))
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index b7e7439e55b..4c90f30e546 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -47,6 +47,13 @@ trait AutoTopicCreationManager {
controllerMutationQuota: ControllerMutationQuota,
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic]
+
+
+ def createStreamsInternalTopics(
+ topics: Map[String, CreatableTopic],
+ requestContext: RequestContext
+ ): Unit
+
}
object AutoTopicCreationManager {
@@ -108,6 +115,31 @@ class DefaultAutoTopicCreationManager(
uncreatableTopicResponses ++ creatableTopicResponses
}
+ override def createStreamsInternalTopics(
+ topics: Map[String, CreatableTopic],
+ requestContext: RequestContext
+ ): Unit = {
+
+ for ((_, creatableTopic) <- topics) {
+ if (creatableTopic.numPartitions() == -1) {
+ creatableTopic
+ .setNumPartitions(config.numPartitions)
+ }
+ if (creatableTopic.replicationFactor() == -1) {
+ creatableTopic
+ .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+ }
+ }
+
+ if (topics.isEmpty) {
+ Seq.empty
+ } else if (controller.isEmpty || !controller.get.isActive &&
channelManager.isDefined) {
+ sendCreateTopicRequest(topics, Some(requestContext))
+ } else {
+ throw new IllegalStateException("Controller must be defined in order to
create streams internal topics.")
+ }
+ }
+
private def createTopicsInZk(
creatableTopics: Map[String, CreatableTopic],
controllerMutationQuota: ControllerMutationQuota
@@ -160,7 +192,7 @@ class DefaultAutoTopicCreationManager(
private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
- metadataRequestContext: Option[RequestContext]
+ requestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
val topicsToCreate = new
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
@@ -184,7 +216,8 @@ class DefaultAutoTopicCreationManager(
} else if (response.versionMismatch() != null) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with
invalid version exception")
} else {
- debug(s"Auto topic creation completed for ${creatableTopics.keys}
with response ${response.responseBody}.")
+ // TODO: Response may still contain errors for individual topics.
This should be exposed.
+ info(s"Auto topic creation completed for ${creatableTopics.keys}
with response ${response.responseBody}.")
}
}
}
@@ -193,7 +226,7 @@ class DefaultAutoTopicCreationManager(
throw new IllegalStateException("Channel manager must be defined in
order to send CreateTopic requests.")
}
- val request = metadataRequestContext.map { context =>
+ val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 81510b4aec9..0d9381670b5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3880,10 +3880,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleStreamsGroupInitialize(request: RequestChannel.Request):
CompletableFuture[Unit] = {
+ // TODO: The unit tests for this method are insufficient. Once we merge
initialize with group heartbeat, we have to extend the tests to cover ACLs and
internal topic creation
val streamsGroupInitializeRequest =
request.body[StreamsGroupInitializeRequest]
- // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
-
if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the
default). If the
// new one is not enabled, we fail directly here.
@@ -3893,6 +3892,53 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
+ val requestContext = request.context
+
+ val internalTopics: Map[String,
StreamsGroupInitializeRequestData.TopicInfo] = {
+
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
+ subtopology.repartitionSourceTopics().iterator().asScala ++
subtopology.stateChangelogTopics().iterator().asScala
+ ).map(x => x.name() -> x).toMap
+ }
+
+ val prohibitedInternalTopics =
internalTopics.keys.filter(Topic.isInternal)
+ if (prohibitedInternalTopics.nonEmpty) {
+ val errorResponse = new StreamsGroupInitializeResponseData()
+ errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+ errorResponse.setErrorMessage(f"Use of Kafka internal topics
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is
prohibited.")
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+
+ val invalidTopics = internalTopics.keys.filterNot(Topic.isValid)
+ if (invalidTopics.nonEmpty) {
+ val errorResponse = new StreamsGroupInitializeResponseData()
+ errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+ errorResponse.setErrorMessage(f"Internal topic names
${invalidTopics.mkString(",")} are not valid topic names.")
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+
+ // TODO: Once we move initialization to the heartbeat, we should only
require these permissions if there are missing internal topics.
+ if(!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false)) {
+ val (_, createTopicUnauthorized) =
authHelper.partitionSeqByAuthorized(request.context, CREATE, TOPIC,
internalTopics.keys.toSeq)(identity[String])
+ if (createTopicUnauthorized.nonEmpty) {
+ val errorResponse = new StreamsGroupInitializeResponseData()
+ errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ errorResponse.setErrorMessage(f"Unauthorized to CREATE TOPIC
${createTopicUnauthorized.mkString(",")}.")
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+ }
+
+ val (_, describeConfigsAuthorized) =
authHelper.partitionSeqByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
internalTopics.keys.toSeq)(identity[String])
+ if (describeConfigsAuthorized.nonEmpty) {
+ val errorResponse = new StreamsGroupInitializeResponseData()
+ errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ errorResponse.setErrorMessage(f"Unauthorized to DESCRIBE_CONFIGS on
topics ${describeConfigsAuthorized.mkString(",")} are unauthorized.")
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
+ return CompletableFuture.completedFuture[Unit](())
+ }
+
groupCoordinator.streamsGroupInitialize(
request.context,
streamsGroupInitializeRequest.data,
@@ -3900,7 +3946,11 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(exception))
} else {
- requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(response))
+ if (!response.creatableTopics().isEmpty) {
+ // TODO: Once we move this code to the heartbeat, we should
indicate which topics are being created. We should also find a way to propagate
failures to the client
+
autoTopicCreationManager.createStreamsInternalTopics(response.creatableTopics().asScala,
requestContext);
+ }
+ requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(response.responseData()))
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index e37b27a18cd..7af2463fa17 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.{ApiVersionsResponseData,
CreateTopicsRequestData}
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicConfig, CreatableTopicConfigCollection}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -44,6 +44,7 @@ import
org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToContro
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.never
import org.mockito.invocation.InvocationOnMock
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
@@ -255,7 +256,7 @@ class AutoTopicCreationManagerTest {
override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
- val requestContext = initializeRequestContext(topicName, userPrincipal,
Optional.of(principalSerde))
+ val requestContext = initializeRequestContext(userPrincipal,
Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
@@ -274,7 +275,7 @@ class AutoTopicCreationManagerTest {
def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit
= {
val topicName = "topic"
- val requestContext = initializeRequestContext(topicName,
KafkaPrincipal.ANONYMOUS, Optional.empty())
+ val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS,
Optional.empty())
// Throw upon undefined principal serde when building the forward request
assertThrows(classOf[IllegalArgumentException], () =>
autoTopicCreationManager.createTopics(
@@ -292,7 +293,7 @@ class AutoTopicCreationManagerTest {
override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
- val requestContext = initializeRequestContext(topicName,
KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde))
+ val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS,
Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
autoTopicCreationManager.createTopics(
@@ -322,9 +323,16 @@ class AutoTopicCreationManagerTest {
argumentCaptor.capture())
}
- private def initializeRequestContext(topicName: String,
- kafkaPrincipal: KafkaPrincipal,
- principalSerde:
Optional[KafkaPrincipalSerde]): RequestContext = {
+ @Test
+ def testCreateStreamsInternalTopics(): Unit = {
+ val topicConfig = new CreatableTopicConfigCollection()
+ topicConfig.add(new
CreatableTopicConfig().setName("cleanup.policy").setValue("compact"));
+
+ val topics = Map(
+ "stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2).setConfigs(topicConfig),
+ "stream-topic-2" -> new
CreatableTopic().setName("stream-topic-2").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrinciple()
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
@@ -335,8 +343,183 @@ class AutoTopicCreationManagerTest {
transactionCoordinator,
Some(shareCoordinator))
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+ Mockito.verify(brokerToController).sendRequest(
+ argumentCaptor.capture(),
+ any(classOf[ControllerRequestCompletionHandler]))
+
+ val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0,
"clientId", 0)
+ val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
- topicsCollection.add(getNewTopic(topicName))
+ topicsCollection.add(getNewTopic("stream-topic-1", 3,
2.toShort).setConfigs(topicConfig))
+ topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort))
+ val requestBody = new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTopics(topicsCollection)
+ .setTimeoutMs(requestTimeout))
+ .build(0)
+
+ val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
+ assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
+ assertEquals(requestBody.data(),
CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data())
+ }
+
+ @Test
+ def testCreateStreamsInternalTopicsWhenControllerNotActive(): Unit = {
+ val topics = Map(
+ "stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2)
+ )
+ val requestContext = initializeRequestContextWithUserPrinciple()
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ Some(brokerToController),
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
+ Mockito.when(controller.isActive).thenReturn(false)
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+ }
+
+ @Test
+ def testFailStreamsInternalTopicsWhenNoChannelManager(): Unit = {
+ val topics = Map(
+ "stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2)
+ )
+ val requestContext = initializeRequestContextWithUserPrinciple()
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ None,
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
+ Mockito.when(controller.isActive).thenReturn(true)
+
+ assertThrows(classOf[IllegalStateException], () =>
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext))
+ }
+
+ @Test
+ def testCreateStreamsInternalTopicsWithEmptyTopics(): Unit = {
+ val topics = Map.empty[String, CreatableTopic]
+ val requestContext = initializeRequestContextWithUserPrinciple()
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ Some(brokerToController),
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ Mockito.verify(brokerToController, never()).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+ }
+
+ @Test
+ def testCreateStreamsInternalTopicsWithDefaultConfig(): Unit = {
+ val topics = Map(
+ "stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
+ )
+ val requestContext = initializeRequestContextWithUserPrinciple()
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ Some(brokerToController),
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext);
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+ Mockito.verify(brokerToController).sendRequest(
+ argumentCaptor.capture(),
+ any(classOf[ControllerRequestCompletionHandler]))
+
+ val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
+
+ val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0,
"clientId", 0)
+ val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+ topicsCollection.add(getNewTopic("stream-topic-1", config.numPartitions,
config.defaultReplicationFactor.toShort))
+ val requestBody = new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTopics(topicsCollection)
+ .setTimeoutMs(requestTimeout))
+ .build(0)
+ val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
+ assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
+ assertEquals(requestBody.data(),
CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data())
+ }
+
+ @Test
+ def testCreateStreamsInternalTopicsPassesPrinciple(): Unit = {
+ val topics = Map(
+ "stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
+ )
+ val requestContext = initializeRequestContextWithUserPrinciple()
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ Some(brokerToController),
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext);
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+ Mockito.verify(brokerToController).sendRequest(
+ argumentCaptor.capture(),
+ any(classOf[ControllerRequestCompletionHandler]))
+ val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
+ assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+ }
+
+ private def initializeRequestContextWithUserPrinciple(): RequestContext = {
+ val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+ val principalSerde = new KafkaPrincipalSerde {
+ override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+ Utils.utf8(principal.toString)
+ }
+ override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+ }
+ initializeRequestContext(userPrincipal, Optional.of(principalSerde))
+ }
+
+ private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal,
+ principalSerde:
Optional[KafkaPrincipalSerde]): RequestContext = {
+
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ Some(brokerToController),
+ Some(adminManager),
+ Some(controller),
+ groupCoordinator,
+ transactionCoordinator,
+ Some(shareCoordinator))
+
val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.CREATE_TOPICS.id)
.setMinVersion(0)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 712e67da2b3..10c1eb2709b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,6 +77,7 @@ import
org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorConfigTest}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
@@ -11195,7 +11196,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest,
true).build())
- val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
+ val future = new CompletableFuture[StreamsGroupInitializeResult]()
when(groupCoordinator.streamsGroupInitialize(
requestChannelRequest.context,
streamsGroupInitializeRequest
@@ -11206,7 +11207,7 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
- val streamsGroupInitializeResponse = new
StreamsGroupInitializeResponseData()
+ val streamsGroupInitializeResponse = new StreamsGroupInitializeResult(new
StreamsGroupInitializeResponseData())
future.complete(streamsGroupInitializeResponse)
val response =
verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest)
@@ -11221,7 +11222,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest,
true).build())
- val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
+ val future = new CompletableFuture[StreamsGroupInitializeResult]()
when(groupCoordinator.streamsGroupInitialize(
requestChannelRequest.context,
streamsGroupInitializeRequest
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index da80d9fd457..3855807cf42 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -43,7 +43,6 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -51,6 +50,7 @@ import
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -93,10 +93,10 @@ public interface GroupCoordinator {
* @param context The request context.
* @param request The StreamsGroupInitializeRequest data.
*
- * @return A future yielding the response.
+ * @return A future yielding the result, which contains the response and
all topics to be created.
* The error code(s) of the response are set to indicate the
error(s) occurred during the execution.
*/
- CompletableFuture<StreamsGroupInitializeResponseData>
streamsGroupInitialize(
+ CompletableFuture<StreamsGroupInitializeResult> streamsGroupInitialize(
RequestContext context,
StreamsGroupInitializeRequestData request
);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index a3441005bff..7b754803e9d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -78,6 +78,7 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSuppli
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -352,14 +353,14 @@ public class GroupCoordinatorService implements
GroupCoordinator {
* See {@link GroupCoordinator#streamsGroupInitialize(RequestContext,
org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}.
*/
@Override
- public CompletableFuture<StreamsGroupInitializeResponseData>
streamsGroupInitialize(
+ public CompletableFuture<StreamsGroupInitializeResult>
streamsGroupInitialize(
RequestContext context,
StreamsGroupInitializeRequestData request
) {
if (!isActive.get()) {
- return CompletableFuture.completedFuture(new
StreamsGroupInitializeResponseData()
+ return CompletableFuture.completedFuture(new
StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- );
+ ));
}
return runtime.scheduleWriteOperation(
@@ -371,9 +372,9 @@ public class GroupCoordinatorService implements
GroupCoordinator {
"streams-group-initialize",
request,
exception,
- (error, message) -> new StreamsGroupInitializeResponseData()
+ (error, message) -> new StreamsGroupInitializeResult(new
StreamsGroupInitializeResponseData()
.setErrorCode(error.code())
- .setErrorMessage(message),
+ .setErrorMessage(message)),
log
));
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 948de7d59e4..fe58621c171 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -43,7 +43,6 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -104,6 +103,7 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -382,7 +382,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
* @return A Result containing the StreamsGroupInitialize response and
* a list of records to update the state machine.
*/
- public CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> streamsGroupInitialize(
+ public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
streamsGroupInitialize(
RequestContext context,
StreamsGroupInitializeRequestData request
) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a258aa25739..3ca6e00d7e4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -37,6 +37,7 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -135,8 +136,11 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@@ -167,7 +171,6 @@ import java.util.stream.Stream;
import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
-import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
@@ -2590,7 +2593,7 @@ public class GroupMetadataManager {
* @param subtopologies The list of subtopologies.
* @return A Result containing the StreamsGroupInitialize response and a
list of records to update the state machine.
*/
- private CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> streamsGroupInitialize(String groupId,
+ private CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
streamsGroupInitialize(String groupId,
String topologyId,
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies)
throws ApiException {
@@ -2605,49 +2608,31 @@ public class GroupMetadataManager {
if (!isTopologyInitializationScheduled(groupId, topologyId)) {
log.warn("No topology to initialize for group ID {} and topology
ID {} found.", groupId, topologyId);
StreamsGroupInitializeResponseData response = new
StreamsGroupInitializeResponseData();
- return new CoordinatorResult<>(records, response);
- }
-
- // TODO: For the POC, only check if internal topics exist
- Set<String> missingTopics = new HashSet<>();
- for (StreamsGroupInitializeRequestData.Subtopology subtopology :
subtopologies) {
- for (StreamsGroupInitializeRequestData.TopicInfo topic :
subtopology.stateChangelogTopics()) {
- if (metadataImage.topics().getTopic(topic.name()) == null) {
- missingTopics.add(topic.name());
- }
- }
- for (StreamsGroupInitializeRequestData.TopicInfo topic :
subtopology.repartitionSourceTopics()) {
- if (metadataImage.topics().getTopic(topic.name()) == null) {
- missingTopics.add(topic.name());
- }
- }
+ return new CoordinatorResult<>(records, new
StreamsGroupInitializeResult(response));
}
StreamsGroupTopologyValue recordValue =
convertToStreamsGroupTopologyRecord(subtopologies);
- cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
-
- if (!missingTopics.isEmpty()) {
- StreamsGroupInitializeResponseData response =
- new StreamsGroupInitializeResponseData()
- .setErrorCode(STREAMS_INVALID_TOPOLOGY.code())
- .setErrorMessage("Internal topics " + String.join(", ",
missingTopics) + " do not exist.");
+ final Map<String, ConfiguredSubtopology> configuredTopics =
+ InternalTopicManager.configureTopics(logContext,
recordValue.topology(), metadataImage);
- return new CoordinatorResult<>(records, response);
- } else {
- records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
+ cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
- final Map<String, StreamsGroupTopologyValue.Subtopology>
subtopologyMap = recordValue.topology().stream()
-
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId,
x -> x));
+ records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
- final StreamsTopology topology = new StreamsTopology(topologyId,
subtopologyMap);
+ final Map<String, StreamsGroupTopologyValue.Subtopology>
subtopologyMap = recordValue.topology().stream()
+
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId,
x -> x));
+ final StreamsTopology topology = new StreamsTopology(topologyId,
subtopologyMap);
+ final Map<String, CreatableTopic> missingTopics =
InternalTopicManager.missingTopics(configuredTopics, metadataImage);
+ // TODO: This needs to be sorted out, once we merge heartbeat and
initialization
+ if (missingTopics.isEmpty()) {
computeFirstTargetAssignmentAfterTopologyInitialization(group,
records, topology);
+ }
- StreamsGroupInitializeResponseData response = new
StreamsGroupInitializeResponseData();
+ StreamsGroupInitializeResponseData response = new
StreamsGroupInitializeResponseData();
- return new CoordinatorResult<>(records, response);
- }
+ return new CoordinatorResult<>(records, new
StreamsGroupInitializeResult(response, missingTopics));
}
@@ -4579,7 +4564,7 @@ public class GroupMetadataManager {
* @return A Result containing the StreamsGroupInitialize response and
* a list of records to update the state machine.
*/
- public CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> streamsGroupInitialize(
+ public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
streamsGroupInitialize(
RequestContext context,
StreamsGroupInitializeRequestData request
) throws ApiException {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
new file mode 100644
index 00000000000..7a4845993eb
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsGroupInitializeResult {
+
+ private final StreamsGroupInitializeResponseData data;
+ private final Map<String, CreatableTopic> creatableTopics;
+
+ public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData
data, Map<String, CreatableTopic> creatableTopics) {
+ this.data = data;
+ this.creatableTopics = creatableTopics;
+ }
+
+ public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData
data) {
+ this.data = data;
+ this.creatableTopics = Collections.emptyMap();
+ }
+
+ public StreamsGroupInitializeResponseData responseData() {
+ return data;
+ }
+
+ public Map<String, CreatableTopic> creatableTopics() {
+ return creatableTopics;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamsGroupInitializeResult that =
(StreamsGroupInitializeResult) o;
+ return Objects.equals(data, that.data) &&
Objects.equals(creatableTopics,
+ that.creatableTopics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(data, creatableTopics);
+ }
+
+ @Override
+ public String toString() {
+ return "StreamsGroupInitializeResult{" +
+ "data=" + data +
+ ", creatableTopics=" + creatableTopics +
+ '}';
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 7505aa8c791..6ce91186adf 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -78,6 +78,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
@@ -280,14 +281,15 @@ public class GroupCoordinatorServiceTest {
StreamsGroupInitializeRequestData request = new
StreamsGroupInitializeRequestData()
.setGroupId("foo");
- CompletableFuture<StreamsGroupInitializeResponseData> future =
service.streamsGroupInitialize(
+ CompletableFuture<StreamsGroupInitializeResult> future =
service.streamsGroupInitialize(
requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
request
);
assertEquals(
- new StreamsGroupInitializeResponseData()
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+ new StreamsGroupInitializeResult(
+ new StreamsGroupInitializeResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())),
future.get()
);
}
@@ -317,7 +319,7 @@ public class GroupCoordinatorServiceTest {
new StreamsGroupInitializeResponseData()
));
- CompletableFuture<StreamsGroupInitializeResponseData> future =
service.streamsGroupInitialize(
+ CompletableFuture<StreamsGroupInitializeResult> future =
service.streamsGroupInitialize(
requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
request
);
@@ -368,7 +370,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(exception));
- CompletableFuture<StreamsGroupInitializeResponseData> future =
service.streamsGroupInitialize(
+ CompletableFuture<StreamsGroupInitializeResult> future =
service.streamsGroupInitialize(
requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
request
);
@@ -377,7 +379,7 @@ public class GroupCoordinatorServiceTest {
new StreamsGroupInitializeResponseData()
.setErrorCode(expectedErrorCode)
.setErrorMessage(expectedErrorMessage),
- future.get(5, TimeUnit.SECONDS)
+ future.get(5, TimeUnit.SECONDS).responseData()
);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 74d3179a24c..8951068d287 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -63,6 +63,7 @@ import
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -152,9 +153,9 @@ public class GroupCoordinatorShardTest {
RequestContext context =
requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE);
StreamsGroupInitializeRequestData request = new
StreamsGroupInitializeRequestData();
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result = new CoordinatorResult<>(
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result = new CoordinatorResult<>(
Collections.emptyList(),
- new StreamsGroupInitializeResponseData()
+ new StreamsGroupInitializeResult(new
StreamsGroupInitializeResponseData())
);
when(groupMetadataManager.streamsGroupInitialize(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f2ee3768203..35b045b2625 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -40,6 +40,9 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -93,6 +96,7 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil;
import org.apache.kafka.image.MetadataDelta;
@@ -525,10 +529,10 @@ public class GroupMetadataManagerTest {
.setTopologyId(topologyId)
.setTopology(subtopologies);
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result = context.streamsGroupInitialize(initialize);
assertNotNull(result.response());
- StreamsGroupInitializeResponseData response = result.response();
+ StreamsGroupInitializeResponseData response =
result.response().responseData();
assertEquals(Errors.NONE.code(), response.errorCode());
List<CoordinatorRecord> coordinatorRecords = result.records();
assertEquals(5, coordinatorRecords.size());
@@ -643,10 +647,10 @@ public class GroupMetadataManagerTest {
.setTopologyId(wrongTopologyId)
.setTopology(subtopologies);
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result = context.streamsGroupInitialize(initialize);
assertNotNull(result.response());
- StreamsGroupInitializeResponseData response = result.response();
+ StreamsGroupInitializeResponseData response =
result.response().responseData();
assertEquals(Errors.NONE.code(), response.errorCode());
assertTrue(result.records().isEmpty());
}
@@ -687,10 +691,10 @@ public class GroupMetadataManagerTest {
.setTopology(subtopologies);
context.streamsGroupInitialize(initialize);
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result = context.streamsGroupInitialize(initialize);
assertNotNull(result.response());
- StreamsGroupInitializeResponseData response = result.response();
+ StreamsGroupInitializeResponseData response =
result.response().responseData();
assertEquals(Errors.NONE.code(), response.errorCode());
assertTrue(result.records().isEmpty());
}
@@ -703,9 +707,12 @@ public class GroupMetadataManagerTest {
String processId = "process-id";
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "repartition";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withMetadataImage(new MetadataImageBuilder()
- .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(fooTopicId, fooTopicName, 4)
+ .addTopic(barTopicId, barTopicName, 4)
.addRacks()
.build())
.build();
@@ -734,6 +741,7 @@ public class GroupMetadataManagerTest {
Collections.singletonList(
new StreamsGroupInitializeRequestData.TopicInfo()
.setName("changelog")
+ .setReplicationFactor((short) 3)
.setTopicConfigs(Collections.singletonList(
new
StreamsGroupInitializeRequestData.TopicConfig()
.setKey("config-name2")
@@ -742,7 +750,7 @@ public class GroupMetadataManagerTest {
)
)
);
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result =
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result =
context.streamsGroupInitialize(
new StreamsGroupInitializeRequestData()
.setGroupId(groupId)
@@ -750,14 +758,31 @@ public class GroupMetadataManagerTest {
.setTopology(topology)
);
+ CreatableTopicConfigCollection expectedConfig = new
CreatableTopicConfigCollection();
+ expectedConfig.add(
+ new CreatableTopicConfig()
+ .setName("config-name2")
+ .setValue("config-value2")
+ );
+ CreatableTopic expected =
+ new CreatableTopic()
+ .setName("changelog")
+ .setNumPartitions(4)
+ .setReplicationFactor((short) 3)
+ .setConfigs(expectedConfig);
+
assertEquals(
- new StreamsGroupInitializeResponseData()
- .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
- .setErrorMessage("Internal topics changelog do not exist."),
+ new StreamsGroupInitializeResult(
+ new StreamsGroupInitializeResponseData(),
+ Collections.singletonMap(
+ "changelog", expected
+ )
+ ),
result.response()
);
-
- assertTrue(result.records().isEmpty());
+
+ // TODO: Need to check the generated records. Adapt this unit test
after merging of
+ // initilization & heartbeat
}
private StreamsGroupHeartbeatRequestData
buildFirstStreamsGroupHeartbeatRequest(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index e80d5db64fd..ff6307fd2cc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -40,7 +40,6 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
@@ -108,6 +107,7 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -720,7 +720,7 @@ public class GroupMetadataManagerTestContext {
}
- public CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> streamsGroupInitialize(
+ public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
streamsGroupInitialize(
StreamsGroupInitializeRequestData request
) {
RequestContext context = new RequestContext(
@@ -739,7 +739,7 @@ public class GroupMetadataManagerTestContext {
false
);
- CoordinatorResult<StreamsGroupInitializeResponseData,
CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize(
+ CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord>
result = groupMetadataManager.streamsGroupInitialize(
context,
request
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index ab8fb57d3fb..498613ce783 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -127,19 +127,6 @@ public class SmokeTestDriverIntegrationTest extends
IntegrationTestHarness {
}
for (final String topic: new String[]{
- "SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog",
- "SmokeTest-minStoreName-changelog",
- "SmokeTest-cntByCnt-repartition",
- "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog",
- "SmokeTest-sum-STATE-STORE-0000000050-changelog",
- "SmokeTest-uwin-cnt-changelog",
- "SmokeTest-maxStoreName-changelog",
- "SmokeTest-cntStoreName-changelog",
- "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000027-changelog",
- "SmokeTest-win-sum-changelog",
- "SmokeTest-uwin-max-changelog",
- "SmokeTest-uwin-min-changelog",
- "SmokeTest-cntByCnt-changelog",
"data",
"echo",
"max",
diff --git a/streams/src/test/resources/log4j.properties
b/streams/src/test/resources/log4j.properties
index cdf56287648..992dd87f3cc 100644
--- a/streams/src/test/resources/log4j.properties
+++ b/streams/src/test/resources/log4j.properties
@@ -29,6 +29,8 @@ log4j.logger.org.apache.kafka.clients.consumer=INFO
log4j.logger.org.apache.kafka.clients.producer=INFO
log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.coordinator.group=INFO
+log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupInitializeRequestManager=INFO
+log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager=INFO
# printing out the configs takes up a huge amount of the allotted characters,
# and provides little value as we can always figure out the test configs
without the logs