This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a19bc7d97b77edaacbfafe56b69256214b5c648b Author: Bruno Cadonna <[email protected]> AuthorDate: Mon May 27 10:51:54 2024 +0200 Implement StreamsInitialize request manager See https://github.com/lucasbru/kafka/pull/11 --- .../consumer/internals/RequestManagers.java | 16 +++ .../internals/StreamsInitializeRequestManager.java | 98 ++++++++++++++- .../StreamsInitializeRequestManagerTest.java | 131 +++++++++++++++++++++ 3 files changed, 241 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 5045743848d..65dec12140b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -251,6 +251,22 @@ public class RequestManagers implements Closeable { membershipManager.registerStateListener(commitRequestManager); membershipManager.registerStateListener(applicationThreadMemberStateListener); + if (streamsInstanceMetadata.isPresent()) { + streamsInitializeRequestManager = new StreamsInitializeRequestManager( + logContext, + groupRebalanceConfig.groupId, + streamsInstanceMetadata.get(), + coordinator); + streamsPrepareAssignmentRequestManager = new StreamsPrepareAssignmentRequestManager( + streamsInstanceMetadata.get()); + streamsInstallAssignmentRequestManager = new StreamsInstallAssignmentRequestManager( + streamsInstanceMetadata.get()); + streamsHeartbeatRequestManager = new StreamsHeartbeatRequestManager( + streamsInitializeRequestManager, + streamsPrepareAssignmentRequestManager, + streamsInstanceMetadata.get()); + } else { + heartbeatRequestManager = new HeartbeatRequestManager( logContext, time, config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java index 73d2955ef2f..34f9fee96f7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java @@ -16,23 +16,113 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.requests.StreamsInitializeRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; public class StreamsInitializeRequestManager implements RequestManager { + private final Logger logger; + private final String groupId; private final StreamsAssignmentInterface streamsAssignmentInterface; + private final CoordinatorRequestManager coordinatorRequestManager; + + private Optional<NetworkClientDelegate.UnsentRequest> unsentRequest = Optional.empty(); - StreamsInitializeRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + + StreamsInitializeRequestManager(final LogContext logContext, + final String groupId, + final StreamsAssignmentInterface streamsAssignmentInterface, + final CoordinatorRequestManager coordinatorRequestManager) { + this.logger = logContext.logger(getClass()); + this.groupId = groupId; this.streamsAssignmentInterface = streamsAssignmentInterface; + this.coordinatorRequestManager = coordinatorRequestManager; } @Override public PollResult poll(final long currentTimeMs) { - // TODO: Return an initalize request - return PollResult.EMPTY; + return unsentRequest.map(PollResult::new).orElse(PollResult.EMPTY); } public void initialize() { - // TODO: Create a `StreamsInitializeRequest` + final NetworkClientDelegate.UnsentRequest unsentRequest = makeRequest(); + + unsentRequest.whenComplete(this::onResponse); + + this.unsentRequest = Optional.of(unsentRequest); + } + + private NetworkClientDelegate.UnsentRequest makeRequest() { + final StreamsInitializeRequestData streamsInitializeRequestData = new StreamsInitializeRequestData(); + streamsInitializeRequestData.setGroupId(groupId); + final List<StreamsInitializeRequestData.Subtopology> topology = getTopologyFromStreams(); + streamsInitializeRequestData.setTopology(topology); + final StreamsInitializeRequest.Builder streamsInitializeRequestBuilder = new StreamsInitializeRequest.Builder( + streamsInitializeRequestData + ); + return new NetworkClientDelegate.UnsentRequest( + streamsInitializeRequestBuilder, + coordinatorRequestManager.coordinator() + ); + } + + private List<StreamsInitializeRequestData.Subtopology> getTopologyFromStreams() { + final Map<String, StreamsAssignmentInterface.SubTopology> subTopologyMap = streamsAssignmentInterface.subtopologyMap(); + final List<StreamsInitializeRequestData.Subtopology> topology = new ArrayList<>(subTopologyMap.size()); + for (final Map.Entry<String, StreamsAssignmentInterface.SubTopology> subtopology : subTopologyMap.entrySet()) { + topology.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue())); + } + return topology; + } + + private static StreamsInitializeRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, + final StreamsAssignmentInterface.SubTopology subtopology) { + final StreamsInitializeRequestData.Subtopology subtopologyData = new StreamsInitializeRequestData.Subtopology(); + subtopologyData.setSubtopology(subtopologyName); + subtopologyData.setSourceTopics(new ArrayList<>(subtopology.sourceTopics)); + subtopologyData.setSinkTopics(new ArrayList<>(subtopology.sinkTopics)); + subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology)); + subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology)); + return subtopologyData; + } + + private static List<StreamsInitializeRequestData.TopicInfo> getRepartitionTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) { + final List<StreamsInitializeRequestData.TopicInfo> repartitionTopicsInfo = new ArrayList<>(); + for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> repartitionTopic : subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) { + final StreamsInitializeRequestData.TopicInfo repartitionTopicInfo = new StreamsInitializeRequestData.TopicInfo(); + repartitionTopicInfo.setName(repartitionTopic.getKey()); + repartitionTopic.getValue().numPartitions.ifPresent(repartitionTopicInfo::setPartitions); + repartitionTopicsInfo.add(repartitionTopicInfo); + } + return repartitionTopicsInfo; + } + + private static List<StreamsInitializeRequestData.TopicInfo> getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) { + final List<StreamsInitializeRequestData.TopicInfo> changelogTopicsInfo = new ArrayList<>(); + for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) { + final StreamsInitializeRequestData.TopicInfo changelogTopicInfo = new StreamsInitializeRequestData.TopicInfo(); + changelogTopicInfo.setName(changelogTopic.getKey()); + changelogTopicsInfo.add(changelogTopicInfo); + } + return changelogTopicsInfo; + } + + private void onResponse(final ClientResponse response, final Throwable exception) { + if (exception != null) { + // todo: handle error + logger.error("Error during Streams initialization: ", exception); + } else { + // todo: handle success + logger.info("Streams initialization successful", exception); + } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java new file mode 100644 index 00000000000..bb734a7b1f5 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.StreamsInitializeRequest; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StreamsInitializeRequestManagerTest { + + final private String groupId = "groupId"; + final private LogContext logContext = new LogContext("test"); + + @Test + public void shouldPollEmptyResult() { + final CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); + final StreamsAssignmentInterface streamsAssignmentInterface = mock(StreamsAssignmentInterface.class); + final StreamsInitializeRequestManager streamsInitializeRequestManager = new StreamsInitializeRequestManager( + logContext, + groupId, + streamsAssignmentInterface, + coordinatorRequestManager + ); + + final NetworkClientDelegate.PollResult pollResult = streamsInitializeRequestManager.poll(0); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, pollResult); + } + + @Test + public void shouldPollStreamsInitializeRequest() { + final Node node = mock(Node.class); + final CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node)); + final StreamsAssignmentInterface streamsAssignmentInterface = mock(StreamsAssignmentInterface.class); + final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2"); + final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", "sinkTopic3"); + final Map<String, StreamsAssignmentInterface.TopicInfo> repartitionTopics = mkMap( + mkEntry("repartitionTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.of(2), Collections.emptyMap())), + mkEntry("repartitionTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.of(3), Collections.emptyMap())) + ); + final Map<String, StreamsAssignmentInterface.TopicInfo> changelogTopics = mkMap( + mkEntry("changelogTopic1", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())), + mkEntry("changelogTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())), + mkEntry("changelogTopic3", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())) + ); + final StreamsAssignmentInterface.SubTopology subtopology1 = new StreamsAssignmentInterface.SubTopology( + sinkTopics, + sourceTopics, + repartitionTopics, + changelogTopics + ); + final String subtopologyName1 = "subtopology1"; + when(streamsAssignmentInterface.subtopologyMap()).thenReturn( + mkMap(mkEntry(subtopologyName1, subtopology1)) + ); + final StreamsInitializeRequestManager streamsInitializeRequestManager = new StreamsInitializeRequestManager( + logContext, + groupId, + streamsAssignmentInterface, + coordinatorRequestManager + ); + + streamsInitializeRequestManager.initialize(); + final NetworkClientDelegate.PollResult pollResult = streamsInitializeRequestManager.poll(0); + + assertEquals(1, pollResult.unsentRequests.size()); + final NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); + assertTrue(unsentRequest.node().isPresent()); + assertEquals(node, unsentRequest.node().get()); + assertEquals(ApiKeys.STREAMS_INITIALIZE, unsentRequest.requestBuilder().apiKey()); + final StreamsInitializeRequest.Builder streamsInitializeRequestBuilder = (StreamsInitializeRequest.Builder) unsentRequest.requestBuilder(); + final StreamsInitializeRequest streamsInitializeRequest = streamsInitializeRequestBuilder.build(); + final StreamsInitializeRequestData streamsInitializeRequestData = streamsInitializeRequest.data(); + assertEquals(ApiKeys.STREAMS_INITIALIZE.id, streamsInitializeRequestData.apiKey()); + assertEquals(groupId, streamsInitializeRequestData.groupId()); + assertNotNull(streamsInitializeRequestData.topology()); + final List<StreamsInitializeRequestData.Subtopology> subtopologies = streamsInitializeRequestData.topology(); + assertEquals(1, subtopologies.size()); + final StreamsInitializeRequestData.Subtopology subtopology = subtopologies.get(0); + assertEquals(subtopologyName1, subtopology.subtopology()); + assertEquals(new ArrayList<>(sourceTopics), subtopology.sourceTopics()); + assertEquals(new ArrayList<>(sinkTopics), subtopology.sinkTopics()); + assertEquals(repartitionTopics.size(), subtopology.repartitionSourceTopics().size()); + subtopology.repartitionSourceTopics().forEach(topicInfo -> { + final StreamsAssignmentInterface.TopicInfo repartitionTopic = repartitionTopics.get(topicInfo.name()); + assertEquals(repartitionTopic.numPartitions.get(), topicInfo.partitions()); + }); + assertEquals(changelogTopics.size(), subtopology.stateChangelogTopics().size()); + subtopology.stateChangelogTopics().forEach(topicInfo -> { + assertTrue(changelogTopics.containsKey(topicInfo.name())); + assertEquals(0, topicInfo.partitions()); + }); + } +} \ No newline at end of file
