This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9e3ba3e0f5a88b6d5c4af7fcb0e9aa563366c861
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu May 16 22:27:46 2024 +0200

    Resolve merge conflict from 11/25 rebase - Specify AsyncKafkaConsumer 
interface with Streams for the Rebalance PoC
    
    Extend internal AsyncKafkaConsumer interface to exchange rebalance 
information with Streams.
    
    See https://github.com/lucasbru/kafka/pull/9
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  25 ++-
 .../internals/ConsumerDelegateCreator.java         |   6 +-
 .../consumer/internals/RequestManagers.java        |  31 ++-
 .../internals/StreamsAssignmentInterface.java      | 216 +++++++++++++++++++++
 .../internals/StreamsHeartbeatRequestManager.java  |  68 +++++++
 .../internals/StreamsInitializeRequestManager.java |  38 ++++
 .../StreamsInstallAssignmentRequestManager.java    |  34 ++++
 .../StreamsPrepareAssignmentRequestManager.java    |  37 ++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |   6 +-
 .../consumer/internals/RequestManagersTest.java    |   3 +-
 .../events/ApplicationEventProcessorTest.java      |   8 +-
 .../apache/kafka/streams/KafkaClientSupplier.java  |  11 ++
 .../internals/DefaultKafkaClientSupplier.java      |  12 ++
 .../streams/processor/internals/StreamThread.java  |  86 +++++++-
 .../org/apache/kafka/test/MockClientSupplier.java  |   5 +
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +
 16 files changed, 570 insertions(+), 18 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 32e5fe32f48..807c900f6b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -275,9 +275,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     };
 
-    AsyncKafkaConsumer(final ConsumerConfig config,
-                       final Deserializer<K> keyDeserializer,
-                       final Deserializer<V> valueDeserializer) {
+   public AsyncKafkaConsumer(final ConsumerConfig config,
+                              final Deserializer<K> keyDeserializer,
+                              final Deserializer<V> valueDeserializer,
+                              final Optional<StreamsAssignmentInterface> 
streamsAssignmentInterface) {
         this(
             config,
             keyDeserializer,
@@ -287,7 +288,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             CompletableEventReaper::new,
             FetchCollector::new,
             ConsumerMetadata::new,
-            new LinkedBlockingQueue<>()
+            new LinkedBlockingQueue<>(),
+            streamsAssignmentInterface
         );
     }
 
@@ -300,7 +302,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        final CompletableEventReaperFactory 
backgroundEventReaperFactory,
                        final FetchCollectorFactory<K, V> fetchCollectorFactory,
                        final ConsumerMetadataFactory metadataFactory,
-                       final LinkedBlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+                       final LinkedBlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                       final Optional<StreamsAssignmentInterface> 
streamsAssignmentInterface
+                       ) {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(
                 config,
@@ -367,7 +371,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     clientTelemetryReporter,
                     metrics,
                     offsetCommitCallbackInvoker,
-                    memberStateListener
+                    memberStateListener,
+                    this::updateGroupMetadata,
+                    streamsAssignmentInterface
             );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
@@ -471,7 +477,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        Deserializer<V> valueDeserializer,
                        KafkaClient client,
                        SubscriptionState subscriptions,
-                       ConsumerMetadata metadata) {
+                       ConsumerMetadata metadata,
+                       Optional<StreamsAssignmentInterface> 
streamsInstanceMetadata) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
@@ -539,7 +546,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             clientTelemetryReporter,
             metrics,
             offsetCommitCallbackInvoker,
-            memberStateListener
+            memberStateListener,
+            this::updateGroupMetadata,
+            streamsInstanceMetadata
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
index 74592972b9d..3f5926080fc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
 
 import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 
 /**
  * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow 
the caller to remain unaware of the
@@ -60,7 +61,7 @@ public class ConsumerDelegateCreator {
             GroupProtocol groupProtocol = 
GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
 
             if (groupProtocol == GroupProtocol.CONSUMER)
-                return new AsyncKafkaConsumer<>(config, keyDeserializer, 
valueDeserializer);
+                return new AsyncKafkaConsumer<>(config, keyDeserializer, 
valueDeserializer, Optional.empty());
             else
                 return new ClassicKafkaConsumer<>(config, keyDeserializer, 
valueDeserializer);
         } catch (KafkaException e) {
@@ -91,7 +92,8 @@ public class ConsumerDelegateCreator {
                     valueDeserializer,
                     client,
                     subscriptions,
-                    metadata
+                    metadata,
+                    Optional.empty()
                 );
             else
                 return new ClassicKafkaConsumer<>(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 304f0fffd4a..5045743848d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -58,6 +58,9 @@ public class RequestManagers implements Closeable {
     public final TopicMetadataRequestManager topicMetadataRequestManager;
     public final FetchRequestManager fetchRequestManager;
     public final Optional<ShareConsumeRequestManager> 
shareConsumeRequestManager;
+    public final Optional<StreamsInitializeRequestManager> 
streamsInitializeRequestManager;
+    public final Optional<StreamsPrepareAssignmentRequestManager> 
streamsPrepareAssignmentRequestManager;
+    public final Optional<StreamsInstallAssignmentRequestManager> 
streamsInstallAssignmentRequestManager;
     private final List<Optional<? extends RequestManager>> entries;
     private final IdempotentCloser closer = new IdempotentCloser();
 
@@ -69,6 +72,10 @@ public class RequestManagers implements Closeable {
                            Optional<CommitRequestManager> commitRequestManager,
                            Optional<ConsumerHeartbeatRequestManager> 
heartbeatRequestManager,
                            Optional<ConsumerMembershipManager> 
membershipManager) {
+                           Optional<StreamsHeartbeatRequestManager> 
streamsHeartbeatRequestManager,
+                           Optional<StreamsInitializeRequestManager> 
streamsInitializeRequestManager,
+                           Optional<StreamsPrepareAssignmentRequestManager> 
streamsPrepareAssignmentRequestManager,
+                           Optional<StreamsInstallAssignmentRequestManager> 
streamsInstallAssignmentRequestManager) {
         this.log = logContext.logger(RequestManagers.class);
         this.offsetsRequestManager = requireNonNull(offsetsRequestManager, 
"OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
@@ -80,6 +87,10 @@ public class RequestManagers implements Closeable {
         this.shareHeartbeatRequestManager = Optional.empty();
         this.consumerMembershipManager = membershipManager;
         this.shareMembershipManager = Optional.empty();
+        this.streamsHeartbeatRequestManager = streamsHeartbeatRequestManager;
+        this.streamsInitializeRequestManager = streamsInitializeRequestManager;
+        this.streamsPrepareAssignmentRequestManager = 
streamsPrepareAssignmentRequestManager;
+        this.streamsInstallAssignmentRequestManager = 
streamsInstallAssignmentRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
@@ -89,6 +100,10 @@ public class RequestManagers implements Closeable {
         list.add(Optional.of(offsetsRequestManager));
         list.add(Optional.of(topicMetadataRequestManager));
         list.add(Optional.of(fetchRequestManager));
+        list.add(streamsHeartbeatRequestManager);
+        list.add(streamsInitializeRequestManager);
+        list.add(streamsPrepareAssignmentRequestManager);
+        list.add(streamsInstallAssignmentRequestManager);
         entries = Collections.unmodifiableList(list);
     }
 
@@ -158,7 +173,8 @@ public class RequestManagers implements Closeable {
                                                      final 
Optional<ClientTelemetryReporter> clientTelemetryReporter,
                                                      final Metrics metrics,
                                                      final 
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
-                                                     final MemberStateListener 
applicationThreadMemberStateListener
+                                                     final MemberStateListener 
applicationThreadMemberStateListener,
+                                                     final 
Optional<StreamsAssignmentInterface> streamsInstanceMetadata
                                                      ) {
         return new CachedSupplier<>() {
             @Override
@@ -187,6 +203,10 @@ public class RequestManagers implements Closeable {
                 ConsumerMembershipManager membershipManager = null;
                 CoordinatorRequestManager coordinator = null;
                 CommitRequestManager commitRequestManager = null;
+                StreamsHeartbeatRequestManager streamsHeartbeatRequestManager 
= null;
+                StreamsInitializeRequestManager 
streamsInitializeRequestManager = null;
+                StreamsPrepareAssignmentRequestManager 
streamsPrepareAssignmentRequestManager = null;
+                StreamsInstallAssignmentRequestManager 
streamsInstallAssignmentRequestManager = null;
 
                 if (groupRebalanceConfig != null && 
groupRebalanceConfig.groupId != null) {
                     Optional<String> serverAssignor = 
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
@@ -230,7 +250,7 @@ public class RequestManagers implements Closeable {
 
                     
membershipManager.registerStateListener(commitRequestManager);
                     
membershipManager.registerStateListener(applicationThreadMemberStateListener);
-                    heartbeatRequestManager = new 
ConsumerHeartbeatRequestManager(
+
                             logContext,
                             time,
                             config,
@@ -239,6 +259,7 @@ public class RequestManagers implements Closeable {
                             membershipManager,
                             backgroundEventHandler,
                             metrics);
+                    }
                 }
 
                 final OffsetsRequestManager listOffsets = new 
OffsetsRequestManager(subscriptions,
@@ -261,7 +282,11 @@ public class RequestManagers implements Closeable {
                         Optional.ofNullable(coordinator),
                         Optional.ofNullable(commitRequestManager),
                         Optional.ofNullable(heartbeatRequestManager),
-                        Optional.ofNullable(membershipManager)
+                        Optional.ofNullable(membershipManager),
+                        Optional.ofNullable(streamsHeartbeatRequestManager),
+                        Optional.ofNullable(streamsInitializeRequestManager),
+                        
Optional.ofNullable(streamsPrepareAssignmentRequestManager),
+                        
Optional.ofNullable(streamsInstallAssignmentRequestManager)
                 );
             }
         };
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
new file mode 100644
index 00000000000..acc7e314362
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Defines a self-contained object to exchange assignment-related metadata 
with the Kafka Streams instance.
+ *
+ * It's used to exchange information between the streams module and the 
clients module, and should be mostly self-contained
+ */
+public class StreamsAssignmentInterface {
+
+    private UUID processID;
+
+    private String userEndPointHost;
+
+    private int userEndPointPort;
+
+    private Map<String, SubTopology> subtopologyMap;
+
+    private Map<String, Object> assignmentConfiguration;
+
+    private Map<TaskId, Long> taskLags;
+
+    private AtomicBoolean shutdownRequested;
+
+    private Map<String, String> clientTags;
+
+    public UUID processID() {
+        return processID;
+    }
+
+    public String userEndPointHost() {
+        return userEndPointHost;
+    }
+
+    public int userEndPointPort() {
+        return userEndPointPort;
+    }
+
+    public Map<String, SubTopology> subtopologyMap() {
+        return subtopologyMap;
+    }
+
+    public Map<String, Object> assignmentConfiguration() {
+        return assignmentConfiguration;
+    }
+
+    public Map<TaskId, Long> taskLags() {
+        return taskLags;
+    }
+
+    public byte[] computeTopologyHash() {
+        // TODO
+        return new byte[0];
+    }
+
+    public Map<String, String> clientTags() {
+        return clientTags;
+    }
+
+    public void requestShutdown() {
+        shutdownRequested.set(true);
+    }
+
+    public boolean shutdownRequested() {
+        return shutdownRequested.get();
+    }
+
+    public void setTaskLags(Map<TaskId, Long> taskLags) {
+        this.taskLags = taskLags;
+    }
+
+    // TODO: Reconciled assignment updated by the stream thread
+    public final Assignment reconciledAssignment = new Assignment();
+
+    // TODO: Target assignment read by the stream thread
+    public final Assignment targetAssignment = new Assignment();
+
+    public static class Assignment {
+
+        public final Set<TaskId> activeTasks = new HashSet<>();
+
+        public final Set<TaskId> standbyTasks = new HashSet<>();
+
+        public final Set<TaskId> warmupTasks = new HashSet<>();
+
+    }
+
+    public static class TopicInfo {
+
+        public final Optional<Integer> numPartitions;
+        public final Map<String, String> topicConfigs;
+
+        public TopicInfo(final Optional<Integer> numPartitions,
+                         final Map<String, String> topicConfigs) {
+            this.numPartitions = numPartitions;
+            this.topicConfigs = topicConfigs;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicInfo{" +
+                "numPartitions=" + numPartitions +
+                ", topicConfigs=" + topicConfigs +
+                '}';
+        }
+
+    }
+
+    public static class TaskId {
+        public final String subtopologyId;
+        public final int taskId;
+
+        public int taskId() {
+            return taskId;
+        }
+
+        public String subtopologyId() {
+            return subtopologyId;
+        }
+
+        public TaskId(final String subtopologyId, final int taskId) {
+            this.subtopologyId = subtopologyId;
+            this.taskId = taskId;
+        }
+
+        @Override
+        public String toString() {
+            return "TaskId{" +
+                "subtopologyId=" + subtopologyId +
+                ", taskId=" + taskId +
+                '}';
+        }
+    }
+
+    public static class SubTopology {
+
+        public final Set<String> sinkTopics;
+        public final Set<String> sourceTopics;
+        public final Map<String, TopicInfo> stateChangelogTopics;
+        public final Map<String, TopicInfo> repartitionSourceTopics;
+
+       public SubTopology(final Set<String> sinkTopics,
+                          final Set<String> sourceTopics,
+                          final Map<String, TopicInfo> repartitionSourceTopics,
+                          final Map<String, TopicInfo> stateChangelogTopics) {
+            this.sinkTopics = sinkTopics;
+            this.sourceTopics = sourceTopics;
+            this.stateChangelogTopics = stateChangelogTopics;
+            this.repartitionSourceTopics = repartitionSourceTopics;
+        }
+
+        @Override
+        public String toString() {
+            return "SubTopology{" +
+                "sinkTopics=" + sinkTopics +
+                ", sourceTopics=" + sourceTopics +
+                ", stateChangelogTopics=" + stateChangelogTopics +
+                ", repartitionSourceTopics=" + repartitionSourceTopics +
+                '}';
+        }
+    }
+
+    public StreamsAssignmentInterface(UUID processID,
+                                      String userEndPointHost,
+                                      int userEndPointPort,
+                                      Map<String, SubTopology> subtopologyMap,
+                                      Map<String, Object> 
assignmentConfiguration,
+                                      Map<String, String> clientTags
+                                      ) {
+        this.processID = processID;
+        this.userEndPointHost = userEndPointHost;
+        this.userEndPointPort = userEndPointPort;
+        this.subtopologyMap = subtopologyMap;
+        this.assignmentConfiguration = assignmentConfiguration;
+        this.taskLags = new HashMap<>();
+        this.shutdownRequested = new AtomicBoolean(false);
+        this.clientTags = clientTags;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamsAssignmentMetadata{" +
+            "processID=" + processID +
+            ", userEndPointHost='" + userEndPointHost + '\'' +
+            ", userEndPointPort=" + userEndPointPort +
+            ", subtopologyMap=" + subtopologyMap +
+            ", assignmentConfiguration=" + assignmentConfiguration +
+            ", taskLags=" + taskLags +
+            ", clientTags=" + clientTags +
+            '}';
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
new file mode 100644
index 00000000000..685ab28da69
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsHeartbeatRequest;
+import org.apache.kafka.common.requests.StreamsHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StreamsHeartbeatRequestManager implements RequestManager {
+
+    private StreamsAssignmentInterface streamsInterface;
+
+    public StreamsHeartbeatRequestManager(
+        final StreamsInitializeRequestManager streamsInitializeRequestManager,
+        final StreamsPrepareAssignmentRequestManager 
streamsPrepareAssignmentRequestManager,
+        final StreamsAssignmentInterface streamsAssignmentInterface
+    ) {
+        this.streamsInterface = streamsAssignmentInterface;
+    }
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+        return null;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
new file mode 100644
index 00000000000..73d2955ef2f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+
+public class StreamsInitializeRequestManager implements RequestManager {
+
+    private final StreamsAssignmentInterface streamsAssignmentInterface;
+
+    StreamsInitializeRequestManager(final StreamsAssignmentInterface 
streamsAssignmentInterface) {
+        this.streamsAssignmentInterface = streamsAssignmentInterface;
+    }
+
+    @Override
+    public PollResult poll(final long currentTimeMs) {
+        // TODO: Return an initalize request
+        return PollResult.EMPTY;
+    }
+
+    public void initialize() {
+        // TODO: Create a `StreamsInitializeRequest`
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
new file mode 100644
index 00000000000..7936e885b56
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+
+public class StreamsInstallAssignmentRequestManager implements RequestManager {
+
+    private final StreamsAssignmentInterface streamsAssignmentInterface;
+
+    StreamsInstallAssignmentRequestManager(final StreamsAssignmentInterface 
streamsAssignmentInterface) {
+        this.streamsAssignmentInterface = streamsAssignmentInterface;
+    }
+
+    @Override
+    public PollResult poll(final long currentTimeMs) {
+        return null;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
new file mode 100644
index 00000000000..069661f4893
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+
+public class StreamsPrepareAssignmentRequestManager implements RequestManager {
+
+    private final StreamsAssignmentInterface streamsAssignmentInterface;
+
+    StreamsPrepareAssignmentRequestManager(final StreamsAssignmentInterface 
streamsAssignmentInterface) {
+        this.streamsAssignmentInterface = streamsAssignmentInterface;
+    }
+
+    @Override
+    public PollResult poll(final long currentTimeMs) {
+        return null;
+    }
+
+    public void prepareAssignment() {
+        // TODO: Create a `StreamsPrepareAssignmentRequest`
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 0f87fb9037c..3ba8e2a1cab 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -218,7 +218,8 @@ public class AsyncKafkaConsumerTest {
             a -> backgroundEventReaper,
             (a, b, c, d, e, f, g) -> fetchCollector,
             (a, b, c, d) -> metadata,
-            backgroundEventQueue
+            backgroundEventQueue,
+            Optional.empty()
         );
     }
 
@@ -1320,7 +1321,8 @@ public class AsyncKafkaConsumerTest {
             any(),
             any(),
             any(),
-            applicationThreadMemberStateListener.capture()
+            applicationThreadMemberStateListener.capture(),
+            any()
         ));
         return applicationThreadMemberStateListener.getValue();
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
index 405ecabcf16..499c7f8f39d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -62,7 +62,8 @@ public class RequestManagersTest {
             Optional.empty(),
             new Metrics(),
             mock(OffsetCommitCallbackInvoker.class),
-            listener
+            listener,
+            Optional.empty()
         ).get();
         requestManagers.consumerMembershipManager.ifPresent(
             membershipManager -> 
assertTrue(membershipManager.stateListeners().contains(listener))
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index bdbda721b00..28f938c2657 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -95,7 +95,13 @@ public class ApplicationEventProcessorTest {
                 withGroupId ? 
Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(),
                 withGroupId ? Optional.of(commitRequestManager) : 
Optional.empty(),
                 withGroupId ? Optional.of(heartbeatRequestManager) : 
Optional.empty(),
-                withGroupId ? Optional.of(membershipManager) : 
Optional.empty());
+                withGroupId ? Optional.of(membershipManager) : 
Optional.empty(),
+                Optional.empty(),
+                Optional.empty(),
+                Optional.empty(),
+                Optional.empty()
+        );
+
         processor = new ApplicationEventProcessor(
                 new LogContext(),
                 requestManagers,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index fc96ca701c6..43065467f67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.processor.StateStore;
@@ -58,6 +59,16 @@ public interface KafkaClientSupplier {
      */
     Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
 
+    /**
+     * Create a {@link Consumer} which is used to read records of source 
topics.
+     *
+     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, 
String, int) consumer config} which is
+     *               supplied by the {@link java.util.Properties} given to the 
{@link KafkaStreams} instance
+     * @param assignmentInterface the assignment interface to take part in the 
streams rebalance protocol.
+     * @return an instance of Kafka consumer
+     */
+    Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final 
Map<String, Object> config, StreamsAssignmentInterface assignmentInterface);
+
     /**
      * Create a {@link Consumer} which is used to read records to restore 
{@link StateStore}s.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index a6419f1dba7..71ac9982d67 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -16,9 +16,13 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Optional;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -44,6 +48,14 @@ public class DefaultKafkaClientSupplier implements 
KafkaClientSupplier {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
     }
 
+    @Override
+    public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final 
Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) {
+        ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer();
+        ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer();
+        return new AsyncKafkaConsumer<>(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(config, 
keyDeserializer, valueDeserializer)),
+            keyDeserializer, valueDeserializer, 
Optional.of(assignmentInterface));
+    }
+
     @Override
     public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, 
Object> config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ceb2c403c3e..7433896fab8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -22,8 +22,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.SubTopology;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
@@ -52,12 +56,15 @@ import 
org.apache.kafka.streams.processor.StandbyUpdateListener;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
+import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
 import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
 import 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.DefaultTaskExecutorCreator;
+import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import org.slf4j.Logger;
@@ -475,7 +482,25 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
         }
 
-        final Consumer<byte[], byte[]> mainConsumer = 
clientSupplier.getConsumer(consumerConfigs);
+        final Consumer<byte[], byte[]> mainConsumer;
+        if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) 
&&
+            
consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name))
 {
+            if (topologyMetadata.hasNamedTopologies()) {
+                throw new IllegalStateException("Named topologies and the 
CONSUMER protocol cannot be used at the same time.");
+            }
+            log.info("Streams rebalance protocol enabled");
+
+            StreamsAssignmentInterface streamsAssignmentInterface =
+                initAssignmentInterface(processId,
+                config,
+                
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+                topologyMetadata);
+
+            mainConsumer = 
clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, 
streamsAssignmentInterface);
+        } else {
+            mainConsumer = clientSupplier.getConsumer(consumerConfigs);
+        }
+
         taskManager.setMainConsumer(mainConsumer);
         referenceContainer.mainConsumer = mainConsumer;
 
@@ -507,6 +532,65 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return streamThread.updateThreadMetadata(adminClientId(clientId));
     }
 
+    private static HostInfo parseHostInfo(final String endPoint) {
+        final HostInfo hostInfo = HostInfo.buildFromEndpoint(endPoint);
+        if (hostInfo == null) {
+            return StreamsMetadataState.UNKNOWN_HOST;
+        } else {
+            return hostInfo;
+        }
+    }
+
+    private static StreamsAssignmentInterface initAssignmentInterface(UUID 
processId,
+                                                                      
StreamsConfig config,
+                                                                      HostInfo 
hostInfo,
+                                                                      final 
TopologyMetadata topologyMetadata) {
+        final InternalTopologyBuilder internalTopologyBuilder = 
topologyMetadata.lookupBuilderForNamedTopology(null);
+
+        Map<String, SubTopology> subtopologyMap = new HashMap<>();
+        for (Map.Entry<Subtopology, TopicsInfo> topicsInfoEntry: 
internalTopologyBuilder.subtopologyToTopicsInfo().entrySet()) {
+            subtopologyMap.put(
+                String.valueOf(topicsInfoEntry.getKey().nodeGroupId),
+                new SubTopology(
+                    topicsInfoEntry.getValue().sourceTopics,
+                    topicsInfoEntry.getValue().sinkTopics,
+                    
topicsInfoEntry.getValue().repartitionSourceTopics.entrySet()
+                        .stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey , e->
+                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs))),
+                    topicsInfoEntry.getValue().stateChangelogTopics.entrySet()
+                        .stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey , e->
+                            new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs)))
+
+                )
+            );
+        }
+
+        // TODO: Which of these are actually needed?
+        // TODO: Maybe we want to split this into assignment properties and 
internal topic configuration properties
+        HashMap<String, Object> assignmentProperties = new HashMap<>();
+        assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG));
+        assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG));
+        assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+        assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG));
+        
assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
 config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, 
config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
 config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+
+        return new StreamsAssignmentInterface(
+            processId,
+            hostInfo.host(),
+            hostInfo.port(),
+            subtopologyMap,
+            assignmentProperties,
+            config.getClientTags()
+        );
+    }
+
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
                                                                        final 
boolean stateUpdaterEnabled,
                                                                        final 
TopologyMetadata topologyMetadata,
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 37723f1e7cc..c25a6c3bb06 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -89,6 +90,10 @@ public class MockClientSupplier implements 
KafkaClientSupplier {
     public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> 
config) {
         return consumer;
     }
+    @Override
+    public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final 
Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) {
+        throw new IllegalArgumentException("Should not be called");
+    }
 
     @Override
     public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, 
Object> config) {
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 525e01f151c..64eec4c7b27 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -21,6 +21,8 @@ import 
org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;


Reply via email to