This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1be15da2751d9c9680a466261d475515be534ed6 Author: Lucas Brutschy <[email protected]> AuthorDate: Thu May 16 22:27:46 2024 +0200 Specify AsyncKafkaConsumer interface with Streams for the Rebalance PoC Extend internal AsyncKafkaConsumer interface to exchange rebalance information with Streams. See https://github.com/lucasbru/kafka/pull/9 --- .../consumer/internals/AsyncKafkaConsumer.java | 23 ++- .../internals/ConsumerDelegateCreator.java | 6 +- .../consumer/internals/RequestManagers.java | 31 ++- .../internals/StreamsAssignmentInterface.java | 216 +++++++++++++++++++++ .../internals/StreamsHeartbeatRequestManager.java | 68 +++++++ .../internals/StreamsInitializeRequestManager.java | 38 ++++ .../StreamsInstallAssignmentRequestManager.java | 34 ++++ .../StreamsPrepareAssignmentRequestManager.java | 37 ++++ .../consumer/internals/AsyncKafkaConsumerTest.java | 6 +- .../consumer/internals/RequestManagersTest.java | 3 +- .../events/ApplicationEventProcessorTest.java | 8 +- .../apache/kafka/streams/KafkaClientSupplier.java | 11 ++ .../internals/DefaultKafkaClientSupplier.java | 12 ++ .../streams/processor/internals/StreamThread.java | 85 +++++++- .../org/apache/kafka/test/MockClientSupplier.java | 5 + .../apache/kafka/streams/TopologyTestDriver.java | 6 +- 16 files changed, 570 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index b88a25f907b..6f06493f706 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -252,9 +252,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); - AsyncKafkaConsumer(final ConsumerConfig config, - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer) { + public AsyncKafkaConsumer(final ConsumerConfig config, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final Optional<StreamsAssignmentInterface> streamsAssignmentInterface) { this( config, keyDeserializer, @@ -264,7 +265,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, - new LinkedBlockingQueue<>() + new LinkedBlockingQueue<>(), + streamsAssignmentInterface ); } @@ -277,7 +279,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { final CompletableEventReaperFactory backgroundEventReaperFactory, final FetchCollectorFactory<K, V> fetchCollectorFactory, final ConsumerMetadataFactory metadataFactory, - final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) { + final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue, + final Optional<StreamsAssignmentInterface> streamsAssignmentInterface + ) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -346,7 +350,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { clientTelemetryReporter, metrics, offsetCommitCallbackInvoker, - this::updateGroupMetadata + this::updateGroupMetadata, + streamsAssignmentInterface ); final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, @@ -453,7 +458,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { Deserializer<V> valueDeserializer, KafkaClient client, SubscriptionState subscriptions, - ConsumerMetadata metadata) { + ConsumerMetadata metadata, + Optional<StreamsAssignmentInterface> streamsInstanceMetadata) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); @@ -523,7 +529,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { clientTelemetryReporter, metrics, offsetCommitCallbackInvoker, - this::updateGroupMetadata + this::updateGroupMetadata, + streamsInstanceMetadata ); Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java index 74592972b9d..3f5926080fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time; import java.util.List; import java.util.Locale; +import java.util.Optional; /** * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the @@ -60,7 +61,7 @@ public class ConsumerDelegateCreator { GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT)); if (groupProtocol == GroupProtocol.CONSUMER) - return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer); + return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer, Optional.empty()); else return new ClassicKafkaConsumer<>(config, keyDeserializer, valueDeserializer); } catch (KafkaException e) { @@ -91,7 +92,8 @@ public class ConsumerDelegateCreator { valueDeserializer, client, subscriptions, - metadata + metadata, + Optional.empty() ); else return new ClassicKafkaConsumer<>( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index c4aecf75c10..576a9cf6bce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -56,6 +56,9 @@ public class RequestManagers implements Closeable { public final TopicMetadataRequestManager topicMetadataRequestManager; public final FetchRequestManager fetchRequestManager; public final Optional<ShareConsumeRequestManager> shareConsumeRequestManager; + public final Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager; + public final Optional<StreamsPrepareAssignmentRequestManager> streamsPrepareAssignmentRequestManager; + public final Optional<StreamsInstallAssignmentRequestManager> streamsInstallAssignmentRequestManager; private final List<Optional<? extends RequestManager>> entries; private final IdempotentCloser closer = new IdempotentCloser(); @@ -67,6 +70,10 @@ public class RequestManagers implements Closeable { Optional<CommitRequestManager> commitRequestManager, Optional<ConsumerHeartbeatRequestManager> heartbeatRequestManager, Optional<ConsumerMembershipManager> membershipManager) { + Optional<StreamsHeartbeatRequestManager> streamsHeartbeatRequestManager, + Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager, + Optional<StreamsPrepareAssignmentRequestManager> streamsPrepareAssignmentRequestManager, + Optional<StreamsInstallAssignmentRequestManager> streamsInstallAssignmentRequestManager) { this.log = logContext.logger(RequestManagers.class); this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; @@ -78,6 +85,10 @@ public class RequestManagers implements Closeable { this.shareHeartbeatRequestManager = Optional.empty(); this.consumerMembershipManager = membershipManager; this.shareMembershipManager = Optional.empty(); + this.streamsHeartbeatRequestManager = streamsHeartbeatRequestManager; + this.streamsInitializeRequestManager = streamsInitializeRequestManager; + this.streamsPrepareAssignmentRequestManager = streamsPrepareAssignmentRequestManager; + this.streamsInstallAssignmentRequestManager = streamsInstallAssignmentRequestManager; List<Optional<? extends RequestManager>> list = new ArrayList<>(); list.add(coordinatorRequestManager); @@ -87,6 +98,10 @@ public class RequestManagers implements Closeable { list.add(Optional.of(offsetsRequestManager)); list.add(Optional.of(topicMetadataRequestManager)); list.add(Optional.of(fetchRequestManager)); + list.add(streamsHeartbeatRequestManager); + list.add(streamsInitializeRequestManager); + list.add(streamsPrepareAssignmentRequestManager); + list.add(streamsInstallAssignmentRequestManager); entries = Collections.unmodifiableList(list); } @@ -156,7 +171,8 @@ public class RequestManagers implements Closeable { final Optional<ClientTelemetryReporter> clientTelemetryReporter, final Metrics metrics, final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - final MemberStateListener applicationThreadMemberStateListener + final MemberStateListener applicationThreadMemberStateListener, + final Optional<StreamsAssignmentInterface> streamsInstanceMetadata ) { return new CachedSupplier<RequestManagers>() { @Override @@ -185,6 +201,10 @@ public class RequestManagers implements Closeable { ConsumerMembershipManager membershipManager = null; CoordinatorRequestManager coordinator = null; CommitRequestManager commitRequestManager = null; + StreamsHeartbeatRequestManager streamsHeartbeatRequestManager = null; + StreamsInitializeRequestManager streamsInitializeRequestManager = null; + StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager = null; + StreamsInstallAssignmentRequestManager streamsInstallAssignmentRequestManager = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional<String> serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); @@ -219,7 +239,7 @@ public class RequestManagers implements Closeable { metrics); membershipManager.registerStateListener(commitRequestManager); membershipManager.registerStateListener(applicationThreadMemberStateListener); - heartbeatRequestManager = new ConsumerHeartbeatRequestManager( + logContext, time, config, @@ -228,6 +248,7 @@ public class RequestManagers implements Closeable { membershipManager, backgroundEventHandler, metrics); + } } final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, @@ -250,7 +271,11 @@ public class RequestManagers implements Closeable { Optional.ofNullable(coordinator), Optional.ofNullable(commitRequestManager), Optional.ofNullable(heartbeatRequestManager), - Optional.ofNullable(membershipManager) + Optional.ofNullable(membershipManager), + Optional.ofNullable(streamsHeartbeatRequestManager), + Optional.ofNullable(streamsInitializeRequestManager), + Optional.ofNullable(streamsPrepareAssignmentRequestManager), + Optional.ofNullable(streamsInstallAssignmentRequestManager) ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java new file mode 100644 index 00000000000..acc7e314362 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Defines a self-contained object to exchange assignment-related metadata with the Kafka Streams instance. + * + * It's used to exchange information between the streams module and the clients module, and should be mostly self-contained + */ +public class StreamsAssignmentInterface { + + private UUID processID; + + private String userEndPointHost; + + private int userEndPointPort; + + private Map<String, SubTopology> subtopologyMap; + + private Map<String, Object> assignmentConfiguration; + + private Map<TaskId, Long> taskLags; + + private AtomicBoolean shutdownRequested; + + private Map<String, String> clientTags; + + public UUID processID() { + return processID; + } + + public String userEndPointHost() { + return userEndPointHost; + } + + public int userEndPointPort() { + return userEndPointPort; + } + + public Map<String, SubTopology> subtopologyMap() { + return subtopologyMap; + } + + public Map<String, Object> assignmentConfiguration() { + return assignmentConfiguration; + } + + public Map<TaskId, Long> taskLags() { + return taskLags; + } + + public byte[] computeTopologyHash() { + // TODO + return new byte[0]; + } + + public Map<String, String> clientTags() { + return clientTags; + } + + public void requestShutdown() { + shutdownRequested.set(true); + } + + public boolean shutdownRequested() { + return shutdownRequested.get(); + } + + public void setTaskLags(Map<TaskId, Long> taskLags) { + this.taskLags = taskLags; + } + + // TODO: Reconciled assignment updated by the stream thread + public final Assignment reconciledAssignment = new Assignment(); + + // TODO: Target assignment read by the stream thread + public final Assignment targetAssignment = new Assignment(); + + public static class Assignment { + + public final Set<TaskId> activeTasks = new HashSet<>(); + + public final Set<TaskId> standbyTasks = new HashSet<>(); + + public final Set<TaskId> warmupTasks = new HashSet<>(); + + } + + public static class TopicInfo { + + public final Optional<Integer> numPartitions; + public final Map<String, String> topicConfigs; + + public TopicInfo(final Optional<Integer> numPartitions, + final Map<String, String> topicConfigs) { + this.numPartitions = numPartitions; + this.topicConfigs = topicConfigs; + } + + @Override + public String toString() { + return "TopicInfo{" + + "numPartitions=" + numPartitions + + ", topicConfigs=" + topicConfigs + + '}'; + } + + } + + public static class TaskId { + public final String subtopologyId; + public final int taskId; + + public int taskId() { + return taskId; + } + + public String subtopologyId() { + return subtopologyId; + } + + public TaskId(final String subtopologyId, final int taskId) { + this.subtopologyId = subtopologyId; + this.taskId = taskId; + } + + @Override + public String toString() { + return "TaskId{" + + "subtopologyId=" + subtopologyId + + ", taskId=" + taskId + + '}'; + } + } + + public static class SubTopology { + + public final Set<String> sinkTopics; + public final Set<String> sourceTopics; + public final Map<String, TopicInfo> stateChangelogTopics; + public final Map<String, TopicInfo> repartitionSourceTopics; + + public SubTopology(final Set<String> sinkTopics, + final Set<String> sourceTopics, + final Map<String, TopicInfo> repartitionSourceTopics, + final Map<String, TopicInfo> stateChangelogTopics) { + this.sinkTopics = sinkTopics; + this.sourceTopics = sourceTopics; + this.stateChangelogTopics = stateChangelogTopics; + this.repartitionSourceTopics = repartitionSourceTopics; + } + + @Override + public String toString() { + return "SubTopology{" + + "sinkTopics=" + sinkTopics + + ", sourceTopics=" + sourceTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + ", repartitionSourceTopics=" + repartitionSourceTopics + + '}'; + } + } + + public StreamsAssignmentInterface(UUID processID, + String userEndPointHost, + int userEndPointPort, + Map<String, SubTopology> subtopologyMap, + Map<String, Object> assignmentConfiguration, + Map<String, String> clientTags + ) { + this.processID = processID; + this.userEndPointHost = userEndPointHost; + this.userEndPointPort = userEndPointPort; + this.subtopologyMap = subtopologyMap; + this.assignmentConfiguration = assignmentConfiguration; + this.taskLags = new HashMap<>(); + this.shutdownRequested = new AtomicBoolean(false); + this.clientTags = clientTags; + } + + @Override + public String toString() { + return "StreamsAssignmentMetadata{" + + "processID=" + processID + + ", userEndPointHost='" + userEndPointHost + '\'' + + ", userEndPointPort=" + userEndPointPort + + ", subtopologyMap=" + subtopologyMap + + ", assignmentConfiguration=" + assignmentConfiguration + + ", taskLags=" + taskLags + + ", clientTags=" + clientTags + + '}'; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java new file mode 100644 index 00000000000..685ab28da69 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StreamsHeartbeatRequestManager implements RequestManager { + + private StreamsAssignmentInterface streamsInterface; + + public StreamsHeartbeatRequestManager( + final StreamsInitializeRequestManager streamsInitializeRequestManager, + final StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager, + final StreamsAssignmentInterface streamsAssignmentInterface + ) { + this.streamsInterface = streamsAssignmentInterface; + } + + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java new file mode 100644 index 00000000000..73d2955ef2f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsInitializeRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsInitializeRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + // TODO: Return an initalize request + return PollResult.EMPTY; + } + + public void initialize() { + // TODO: Create a `StreamsInitializeRequest` + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java new file mode 100644 index 00000000000..7936e885b56 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsInstallAssignmentRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsInstallAssignmentRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java new file mode 100644 index 00000000000..069661f4893 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsPrepareAssignmentRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsPrepareAssignmentRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + return null; + } + + public void prepareAssignment() { + // TODO: Create a `StreamsPrepareAssignmentRequest` + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index ce7cd17c996..77f88bad774 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -197,7 +197,8 @@ public class AsyncKafkaConsumerTest { a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, - backgroundEventQueue + backgroundEventQueue, + Optional.empty() ); } @@ -1418,7 +1419,8 @@ public class AsyncKafkaConsumerTest { any(), any(), any(), - applicationThreadMemberStateListener.capture() + applicationThreadMemberStateListener.capture(), + any() )); return applicationThreadMemberStateListener.getValue(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java index 405ecabcf16..499c7f8f39d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java @@ -62,7 +62,8 @@ public class RequestManagersTest { Optional.empty(), new Metrics(), mock(OffsetCommitCallbackInvoker.class), - listener + listener, + Optional.empty() ).get(); requestManagers.consumerMembershipManager.ifPresent( membershipManager -> assertTrue(membershipManager.stateListeners().contains(listener)) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 80315099fb8..fb79b90d674 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -82,7 +82,13 @@ public class ApplicationEventProcessorTest { withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(commitRequestManager) : Optional.empty(), withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(), - withGroupId ? Optional.of(membershipManager) : Optional.empty()); + withGroupId ? Optional.of(membershipManager) : Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() + ); + processor = new ApplicationEventProcessor( new LogContext(), requestManagers, diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java index fc96ca701c6..43065467f67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.processor.StateStore; @@ -58,6 +59,16 @@ public interface KafkaClientSupplier { */ Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config); + /** + * Create a {@link Consumer} which is used to read records of source topics. + * + * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String, int) consumer config} which is + * supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance + * @param assignmentInterface the assignment interface to take part in the streams rebalance protocol. + * @return an instance of Kafka consumer + */ + Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface); + /** * Create a {@link Consumer} which is used to read records to restore {@link StateStore}s. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java index a6419f1dba7..71ac9982d67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java @@ -16,9 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -44,6 +48,14 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier { return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(); + ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(); + return new AsyncKafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(config, keyDeserializer, valueDeserializer)), + keyDeserializer, valueDeserializer, Optional.of(assignmentInterface)); + } + @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 864f7e3ed26..ce58a169cfa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -22,8 +22,11 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.SubTopology; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; @@ -52,12 +55,15 @@ import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.DefaultTaskExecutorCreator; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -469,7 +475,25 @@ public class StreamThread extends Thread implements ProcessingThread { consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs); + final Consumer<byte[], byte[]> mainConsumer; + if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) && + consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { + if (topologyMetadata.hasNamedTopologies()) { + throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time."); + } + log.info("Streams rebalance protocol enabled"); + + StreamsAssignmentInterface streamsAssignmentInterface = + initAssignmentInterface(processId, + config, + parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)), + topologyMetadata); + + mainConsumer = clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, streamsAssignmentInterface); + } else { + mainConsumer = clientSupplier.getConsumer(consumerConfigs); + } + taskManager.setMainConsumer(mainConsumer); referenceContainer.mainConsumer = mainConsumer; @@ -498,6 +522,65 @@ public class StreamThread extends Thread implements ProcessingThread { return streamThread.updateThreadMetadata(adminClientId(clientId)); } + private static HostInfo parseHostInfo(final String endPoint) { + final HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint); + if (hostInfo == null) { + return StreamsMetadataState.UNKNOWN_HOST; + } else { + return hostInfo; + } + } + + private static StreamsAssignmentInterface initAssignmentInterface(UUID processId, + StreamsConfig config, + HostInfo hostInfo, + final TopologyMetadata topologyMetadata) { + final InternalTopologyBuilder internalTopologyBuilder = topologyMetadata.lookupBuilderForNamedTopology(null); + + Map<String, SubTopology> subtopologyMap = new HashMap<>(); + for (Map.Entry<Subtopology, TopicsInfo> topicsInfoEntry: internalTopologyBuilder.subtopologyToTopicsInfo().entrySet()) { + subtopologyMap.put( + String.valueOf(topicsInfoEntry.getKey().nodeGroupId), + new SubTopology( + topicsInfoEntry.getValue().sourceTopics, + topicsInfoEntry.getValue().sinkTopics, + topicsInfoEntry.getValue().repartitionSourceTopics.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey , e-> + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))), + topicsInfoEntry.getValue().stateChangelogTopics.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey , e-> + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))) + + ) + ); + } + + // TODO: Which of these are actually needed? + // TODO: Maybe we want to split this into assignment properties and internal topic configuration properties + HashMap<String, Object> assignmentProperties = new HashMap<>(); + assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG)); + assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)); + assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); + assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG)); + assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG)); + assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); + + return new StreamsAssignmentInterface( + processId, + hostInfo.host(), + hostInfo.port(), + subtopologyMap, + assignmentProperties, + config.getClientTags() + ); + } + private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, final boolean stateUpdaterEnabled, final TopologyMetadata topologyMetadata, diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 88f24e8d9bd..57d5b4844b0 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -89,6 +90,10 @@ public class MockClientSupplier implements KafkaClientSupplier { public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { return consumer; } + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + throw new IllegalArgumentException("Should not be called"); + } @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 9e76fad4264..073fcf5a062 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -358,7 +359,10 @@ public class TopologyTestDriver implements Closeable { public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { throw new IllegalStateException(); } - + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + throw new IllegalStateException(); + } @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { throw new IllegalStateException();
