guozhangwang commented on code in PR #13330:
URL: https://github.com/apache/kafka/pull/13330#discussion_r1124915057


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ProtocolRequestManager.java:
##########
@@ -0,0 +1,118 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+public class ProtocolRequestManager implements RequestManager {
+    GroupState groupState;
+    HeartbeatRequestManager heartbeatRequestManager;
+    RebalanceProtocol protocol;
+    BlockingQueue<ApplicationEvent> eventQueue;
+    Optional<Long> callbackInvokedMs;
+
+    @Override
+    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+        switch (groupState.state) {
+            case UNJOINED:
+                return null;
+            case PREPARE:
+                return protocol.onPrepare(currentTimeMs);
+            case ASSIGNING:
+                invokeRebalance(currentTimeMs);
+                return protocol.onAssign(currentTimeMs);
+            case COMPLETE:
+                return protocol.onComplete(currentTimeMs);
+            case STABLE:
+                return protocol.onStable(currentTimeMs);
+        }
+        return null;
+    }
+
+    public void ackCallbackInvocation(long currentTimeMs) {

Review Comment:
   Not clear when this func would be called?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupState.java:
##########
@@ -27,6 +27,15 @@ public class GroupState {
     public final String groupId;
     public final Optional<String> groupInstanceId;
     public Generation generation = Generation.NO_GENERATION;
+    public State state = State.UNJOINED;
+
+    enum State {
+        UNJOINED,
+        PREPARE,
+        ASSIGNING,
+        COMPLETE,

Review Comment:
   What's the difference between COMPLETE and STABLE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to