This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 9e3ba3e0f5a88b6d5c4af7fcb0e9aa563366c861 Author: Lucas Brutschy <[email protected]> AuthorDate: Thu May 16 22:27:46 2024 +0200 Resolve merge conflict from 11/25 rebase - Specify AsyncKafkaConsumer interface with Streams for the Rebalance PoC Extend internal AsyncKafkaConsumer interface to exchange rebalance information with Streams. See https://github.com/lucasbru/kafka/pull/9 --- .../consumer/internals/AsyncKafkaConsumer.java | 25 ++- .../internals/ConsumerDelegateCreator.java | 6 +- .../consumer/internals/RequestManagers.java | 31 ++- .../internals/StreamsAssignmentInterface.java | 216 +++++++++++++++++++++ .../internals/StreamsHeartbeatRequestManager.java | 68 +++++++ .../internals/StreamsInitializeRequestManager.java | 38 ++++ .../StreamsInstallAssignmentRequestManager.java | 34 ++++ .../StreamsPrepareAssignmentRequestManager.java | 37 ++++ .../consumer/internals/AsyncKafkaConsumerTest.java | 6 +- .../consumer/internals/RequestManagersTest.java | 3 +- .../events/ApplicationEventProcessorTest.java | 8 +- .../apache/kafka/streams/KafkaClientSupplier.java | 11 ++ .../internals/DefaultKafkaClientSupplier.java | 12 ++ .../streams/processor/internals/StreamThread.java | 86 +++++++- .../org/apache/kafka/test/MockClientSupplier.java | 5 + .../apache/kafka/streams/TopologyTestDriver.java | 2 + 16 files changed, 570 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 32e5fe32f48..807c900f6b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -275,9 +275,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { } }; - AsyncKafkaConsumer(final ConsumerConfig config, - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer) { + public AsyncKafkaConsumer(final ConsumerConfig config, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final Optional<StreamsAssignmentInterface> streamsAssignmentInterface) { this( config, keyDeserializer, @@ -287,7 +288,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, - new LinkedBlockingQueue<>() + new LinkedBlockingQueue<>(), + streamsAssignmentInterface ); } @@ -300,7 +302,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { final CompletableEventReaperFactory backgroundEventReaperFactory, final FetchCollectorFactory<K, V> fetchCollectorFactory, final ConsumerMetadataFactory metadataFactory, - final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) { + final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue, + final Optional<StreamsAssignmentInterface> streamsAssignmentInterface + ) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, @@ -367,7 +371,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { clientTelemetryReporter, metrics, offsetCommitCallbackInvoker, - memberStateListener + memberStateListener, + this::updateGroupMetadata, + streamsAssignmentInterface ); final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, @@ -471,7 +477,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { Deserializer<V> valueDeserializer, KafkaClient client, SubscriptionState subscriptions, - ConsumerMetadata metadata) { + ConsumerMetadata metadata, + Optional<StreamsAssignmentInterface> streamsInstanceMetadata) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); @@ -539,7 +546,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { clientTelemetryReporter, metrics, offsetCommitCallbackInvoker, - memberStateListener + memberStateListener, + this::updateGroupMetadata, + streamsInstanceMetadata ); Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java index 74592972b9d..3f5926080fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time; import java.util.List; import java.util.Locale; +import java.util.Optional; /** * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the @@ -60,7 +61,7 @@ public class ConsumerDelegateCreator { GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT)); if (groupProtocol == GroupProtocol.CONSUMER) - return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer); + return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer, Optional.empty()); else return new ClassicKafkaConsumer<>(config, keyDeserializer, valueDeserializer); } catch (KafkaException e) { @@ -91,7 +92,8 @@ public class ConsumerDelegateCreator { valueDeserializer, client, subscriptions, - metadata + metadata, + Optional.empty() ); else return new ClassicKafkaConsumer<>( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 304f0fffd4a..5045743848d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -58,6 +58,9 @@ public class RequestManagers implements Closeable { public final TopicMetadataRequestManager topicMetadataRequestManager; public final FetchRequestManager fetchRequestManager; public final Optional<ShareConsumeRequestManager> shareConsumeRequestManager; + public final Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager; + public final Optional<StreamsPrepareAssignmentRequestManager> streamsPrepareAssignmentRequestManager; + public final Optional<StreamsInstallAssignmentRequestManager> streamsInstallAssignmentRequestManager; private final List<Optional<? extends RequestManager>> entries; private final IdempotentCloser closer = new IdempotentCloser(); @@ -69,6 +72,10 @@ public class RequestManagers implements Closeable { Optional<CommitRequestManager> commitRequestManager, Optional<ConsumerHeartbeatRequestManager> heartbeatRequestManager, Optional<ConsumerMembershipManager> membershipManager) { + Optional<StreamsHeartbeatRequestManager> streamsHeartbeatRequestManager, + Optional<StreamsInitializeRequestManager> streamsInitializeRequestManager, + Optional<StreamsPrepareAssignmentRequestManager> streamsPrepareAssignmentRequestManager, + Optional<StreamsInstallAssignmentRequestManager> streamsInstallAssignmentRequestManager) { this.log = logContext.logger(RequestManagers.class); this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; @@ -80,6 +87,10 @@ public class RequestManagers implements Closeable { this.shareHeartbeatRequestManager = Optional.empty(); this.consumerMembershipManager = membershipManager; this.shareMembershipManager = Optional.empty(); + this.streamsHeartbeatRequestManager = streamsHeartbeatRequestManager; + this.streamsInitializeRequestManager = streamsInitializeRequestManager; + this.streamsPrepareAssignmentRequestManager = streamsPrepareAssignmentRequestManager; + this.streamsInstallAssignmentRequestManager = streamsInstallAssignmentRequestManager; List<Optional<? extends RequestManager>> list = new ArrayList<>(); list.add(coordinatorRequestManager); @@ -89,6 +100,10 @@ public class RequestManagers implements Closeable { list.add(Optional.of(offsetsRequestManager)); list.add(Optional.of(topicMetadataRequestManager)); list.add(Optional.of(fetchRequestManager)); + list.add(streamsHeartbeatRequestManager); + list.add(streamsInitializeRequestManager); + list.add(streamsPrepareAssignmentRequestManager); + list.add(streamsInstallAssignmentRequestManager); entries = Collections.unmodifiableList(list); } @@ -158,7 +173,8 @@ public class RequestManagers implements Closeable { final Optional<ClientTelemetryReporter> clientTelemetryReporter, final Metrics metrics, final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - final MemberStateListener applicationThreadMemberStateListener + final MemberStateListener applicationThreadMemberStateListener, + final Optional<StreamsAssignmentInterface> streamsInstanceMetadata ) { return new CachedSupplier<>() { @Override @@ -187,6 +203,10 @@ public class RequestManagers implements Closeable { ConsumerMembershipManager membershipManager = null; CoordinatorRequestManager coordinator = null; CommitRequestManager commitRequestManager = null; + StreamsHeartbeatRequestManager streamsHeartbeatRequestManager = null; + StreamsInitializeRequestManager streamsInitializeRequestManager = null; + StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager = null; + StreamsInstallAssignmentRequestManager streamsInstallAssignmentRequestManager = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional<String> serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); @@ -230,7 +250,7 @@ public class RequestManagers implements Closeable { membershipManager.registerStateListener(commitRequestManager); membershipManager.registerStateListener(applicationThreadMemberStateListener); - heartbeatRequestManager = new ConsumerHeartbeatRequestManager( + logContext, time, config, @@ -239,6 +259,7 @@ public class RequestManagers implements Closeable { membershipManager, backgroundEventHandler, metrics); + } } final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, @@ -261,7 +282,11 @@ public class RequestManagers implements Closeable { Optional.ofNullable(coordinator), Optional.ofNullable(commitRequestManager), Optional.ofNullable(heartbeatRequestManager), - Optional.ofNullable(membershipManager) + Optional.ofNullable(membershipManager), + Optional.ofNullable(streamsHeartbeatRequestManager), + Optional.ofNullable(streamsInitializeRequestManager), + Optional.ofNullable(streamsPrepareAssignmentRequestManager), + Optional.ofNullable(streamsInstallAssignmentRequestManager) ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java new file mode 100644 index 00000000000..acc7e314362 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Defines a self-contained object to exchange assignment-related metadata with the Kafka Streams instance. + * + * It's used to exchange information between the streams module and the clients module, and should be mostly self-contained + */ +public class StreamsAssignmentInterface { + + private UUID processID; + + private String userEndPointHost; + + private int userEndPointPort; + + private Map<String, SubTopology> subtopologyMap; + + private Map<String, Object> assignmentConfiguration; + + private Map<TaskId, Long> taskLags; + + private AtomicBoolean shutdownRequested; + + private Map<String, String> clientTags; + + public UUID processID() { + return processID; + } + + public String userEndPointHost() { + return userEndPointHost; + } + + public int userEndPointPort() { + return userEndPointPort; + } + + public Map<String, SubTopology> subtopologyMap() { + return subtopologyMap; + } + + public Map<String, Object> assignmentConfiguration() { + return assignmentConfiguration; + } + + public Map<TaskId, Long> taskLags() { + return taskLags; + } + + public byte[] computeTopologyHash() { + // TODO + return new byte[0]; + } + + public Map<String, String> clientTags() { + return clientTags; + } + + public void requestShutdown() { + shutdownRequested.set(true); + } + + public boolean shutdownRequested() { + return shutdownRequested.get(); + } + + public void setTaskLags(Map<TaskId, Long> taskLags) { + this.taskLags = taskLags; + } + + // TODO: Reconciled assignment updated by the stream thread + public final Assignment reconciledAssignment = new Assignment(); + + // TODO: Target assignment read by the stream thread + public final Assignment targetAssignment = new Assignment(); + + public static class Assignment { + + public final Set<TaskId> activeTasks = new HashSet<>(); + + public final Set<TaskId> standbyTasks = new HashSet<>(); + + public final Set<TaskId> warmupTasks = new HashSet<>(); + + } + + public static class TopicInfo { + + public final Optional<Integer> numPartitions; + public final Map<String, String> topicConfigs; + + public TopicInfo(final Optional<Integer> numPartitions, + final Map<String, String> topicConfigs) { + this.numPartitions = numPartitions; + this.topicConfigs = topicConfigs; + } + + @Override + public String toString() { + return "TopicInfo{" + + "numPartitions=" + numPartitions + + ", topicConfigs=" + topicConfigs + + '}'; + } + + } + + public static class TaskId { + public final String subtopologyId; + public final int taskId; + + public int taskId() { + return taskId; + } + + public String subtopologyId() { + return subtopologyId; + } + + public TaskId(final String subtopologyId, final int taskId) { + this.subtopologyId = subtopologyId; + this.taskId = taskId; + } + + @Override + public String toString() { + return "TaskId{" + + "subtopologyId=" + subtopologyId + + ", taskId=" + taskId + + '}'; + } + } + + public static class SubTopology { + + public final Set<String> sinkTopics; + public final Set<String> sourceTopics; + public final Map<String, TopicInfo> stateChangelogTopics; + public final Map<String, TopicInfo> repartitionSourceTopics; + + public SubTopology(final Set<String> sinkTopics, + final Set<String> sourceTopics, + final Map<String, TopicInfo> repartitionSourceTopics, + final Map<String, TopicInfo> stateChangelogTopics) { + this.sinkTopics = sinkTopics; + this.sourceTopics = sourceTopics; + this.stateChangelogTopics = stateChangelogTopics; + this.repartitionSourceTopics = repartitionSourceTopics; + } + + @Override + public String toString() { + return "SubTopology{" + + "sinkTopics=" + sinkTopics + + ", sourceTopics=" + sourceTopics + + ", stateChangelogTopics=" + stateChangelogTopics + + ", repartitionSourceTopics=" + repartitionSourceTopics + + '}'; + } + } + + public StreamsAssignmentInterface(UUID processID, + String userEndPointHost, + int userEndPointPort, + Map<String, SubTopology> subtopologyMap, + Map<String, Object> assignmentConfiguration, + Map<String, String> clientTags + ) { + this.processID = processID; + this.userEndPointHost = userEndPointHost; + this.userEndPointPort = userEndPointPort; + this.subtopologyMap = subtopologyMap; + this.assignmentConfiguration = assignmentConfiguration; + this.taskLags = new HashMap<>(); + this.shutdownRequested = new AtomicBoolean(false); + this.clientTags = clientTags; + } + + @Override + public String toString() { + return "StreamsAssignmentMetadata{" + + "processID=" + processID + + ", userEndPointHost='" + userEndPointHost + '\'' + + ", userEndPointPort=" + userEndPointPort + + ", subtopologyMap=" + subtopologyMap + + ", assignmentConfiguration=" + assignmentConfiguration + + ", taskLags=" + taskLags + + ", clientTags=" + clientTags + + '}'; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java new file mode 100644 index 00000000000..685ab28da69 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; +import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StreamsHeartbeatRequestManager implements RequestManager { + + private StreamsAssignmentInterface streamsInterface; + + public StreamsHeartbeatRequestManager( + final StreamsInitializeRequestManager streamsInitializeRequestManager, + final StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager, + final StreamsAssignmentInterface streamsAssignmentInterface + ) { + this.streamsInterface = streamsAssignmentInterface; + } + + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java new file mode 100644 index 00000000000..73d2955ef2f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsInitializeRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsInitializeRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + // TODO: Return an initalize request + return PollResult.EMPTY; + } + + public void initialize() { + // TODO: Create a `StreamsInitializeRequest` + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java new file mode 100644 index 00000000000..7936e885b56 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsInstallAssignmentRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsInstallAssignmentRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java new file mode 100644 index 00000000000..069661f4893 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; + +public class StreamsPrepareAssignmentRequestManager implements RequestManager { + + private final StreamsAssignmentInterface streamsAssignmentInterface; + + StreamsPrepareAssignmentRequestManager(final StreamsAssignmentInterface streamsAssignmentInterface) { + this.streamsAssignmentInterface = streamsAssignmentInterface; + } + + @Override + public PollResult poll(final long currentTimeMs) { + return null; + } + + public void prepareAssignment() { + // TODO: Create a `StreamsPrepareAssignmentRequest` + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 0f87fb9037c..3ba8e2a1cab 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -218,7 +218,8 @@ public class AsyncKafkaConsumerTest { a -> backgroundEventReaper, (a, b, c, d, e, f, g) -> fetchCollector, (a, b, c, d) -> metadata, - backgroundEventQueue + backgroundEventQueue, + Optional.empty() ); } @@ -1320,7 +1321,8 @@ public class AsyncKafkaConsumerTest { any(), any(), any(), - applicationThreadMemberStateListener.capture() + applicationThreadMemberStateListener.capture(), + any() )); return applicationThreadMemberStateListener.getValue(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java index 405ecabcf16..499c7f8f39d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java @@ -62,7 +62,8 @@ public class RequestManagersTest { Optional.empty(), new Metrics(), mock(OffsetCommitCallbackInvoker.class), - listener + listener, + Optional.empty() ).get(); requestManagers.consumerMembershipManager.ifPresent( membershipManager -> assertTrue(membershipManager.stateListeners().contains(listener)) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index bdbda721b00..28f938c2657 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -95,7 +95,13 @@ public class ApplicationEventProcessorTest { withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), withGroupId ? Optional.of(commitRequestManager) : Optional.empty(), withGroupId ? Optional.of(heartbeatRequestManager) : Optional.empty(), - withGroupId ? Optional.of(membershipManager) : Optional.empty()); + withGroupId ? Optional.of(membershipManager) : Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() + ); + processor = new ApplicationEventProcessor( new LogContext(), requestManagers, diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java index fc96ca701c6..43065467f67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.processor.StateStore; @@ -58,6 +59,16 @@ public interface KafkaClientSupplier { */ Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config); + /** + * Create a {@link Consumer} which is used to read records of source topics. + * + * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String, int) consumer config} which is + * supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance + * @param assignmentInterface the assignment interface to take part in the streams rebalance protocol. + * @return an instance of Kafka consumer + */ + Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface); + /** * Create a {@link Consumer} which is used to read records to restore {@link StateStore}s. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java index a6419f1dba7..71ac9982d67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java @@ -16,9 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -44,6 +48,14 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier { return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(); + ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(); + return new AsyncKafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(config, keyDeserializer, valueDeserializer)), + keyDeserializer, valueDeserializer, Optional.of(assignmentInterface)); + } + @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index ceb2c403c3e..7433896fab8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -22,8 +22,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.SubTopology; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; @@ -52,12 +56,15 @@ import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager; import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.DefaultTaskExecutorCreator; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -475,7 +482,25 @@ public class StreamThread extends Thread implements ProcessingThread { consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - final Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs); + final Consumer<byte[], byte[]> mainConsumer; + if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) && + consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { + if (topologyMetadata.hasNamedTopologies()) { + throw new IllegalStateException("Named topologies and the CONSUMER protocol cannot be used at the same time."); + } + log.info("Streams rebalance protocol enabled"); + + StreamsAssignmentInterface streamsAssignmentInterface = + initAssignmentInterface(processId, + config, + parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)), + topologyMetadata); + + mainConsumer = clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, streamsAssignmentInterface); + } else { + mainConsumer = clientSupplier.getConsumer(consumerConfigs); + } + taskManager.setMainConsumer(mainConsumer); referenceContainer.mainConsumer = mainConsumer; @@ -507,6 +532,65 @@ public class StreamThread extends Thread implements ProcessingThread { return streamThread.updateThreadMetadata(adminClientId(clientId)); } + private static HostInfo parseHostInfo(final String endPoint) { + final HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint); + if (hostInfo == null) { + return StreamsMetadataState.UNKNOWN_HOST; + } else { + return hostInfo; + } + } + + private static StreamsAssignmentInterface initAssignmentInterface(UUID processId, + StreamsConfig config, + HostInfo hostInfo, + final TopologyMetadata topologyMetadata) { + final InternalTopologyBuilder internalTopologyBuilder = topologyMetadata.lookupBuilderForNamedTopology(null); + + Map<String, SubTopology> subtopologyMap = new HashMap<>(); + for (Map.Entry<Subtopology, TopicsInfo> topicsInfoEntry: internalTopologyBuilder.subtopologyToTopicsInfo().entrySet()) { + subtopologyMap.put( + String.valueOf(topicsInfoEntry.getKey().nodeGroupId), + new SubTopology( + topicsInfoEntry.getValue().sourceTopics, + topicsInfoEntry.getValue().sinkTopics, + topicsInfoEntry.getValue().repartitionSourceTopics.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey , e-> + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))), + topicsInfoEntry.getValue().stateChangelogTopics.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey , e-> + new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))) + + ) + ); + } + + // TODO: Which of these are actually needed? + // TODO: Maybe we want to split this into assignment properties and internal topic configuration properties + HashMap<String, Object> assignmentProperties = new HashMap<>(); + assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG)); + assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)); + assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); + assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG)); + assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG)); + assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); + + return new StreamsAssignmentInterface( + processId, + hostInfo.host(), + hostInfo.port(), + subtopologyMap, + assignmentProperties, + config.getClientTags() + ); + } + private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, final boolean stateUpdaterEnabled, final TopologyMetadata topologyMetadata, diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 37723f1e7cc..c25a6c3bb06 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; @@ -89,6 +90,10 @@ public class MockClientSupplier implements KafkaClientSupplier { public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { return consumer; } + @Override + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + throw new IllegalArgumentException("Should not be called"); + } @Override public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 525e01f151c..64eec4c7b27 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer;
