philipnee commented on code in PR #13330: URL: https://github.com/apache/kafka/pull/13330#discussion_r1123760146
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ProtocolRequestManager.java: ########## @@ -0,0 +1,96 @@ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; + +public class ProtocolRequestManager implements RequestManager { + GroupState groupState; + HeartbeatRequestManager heartbeatRequestManager; + RebalanceProtocol protocol; + BlockingQueue<ApplicationEvent> eventQueue; + Optional<Long> callbackInvokedMs; + + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + switch (groupState.state) { + case UNJOINED: + return null; + case PREPARE: + return protocol.onPrepare(currentTimeMs); + case ASSIGNING: + invokeRebalance(currentTimeMs); + return protocol.onAssign(currentTimeMs); + case COMPLETE: + return protocol.onComplete(currentTimeMs); + case STABLE: + return protocol.onStable(currentTimeMs); + } + return null; + } + + public void ackCallbackInvocation(long currentTimeMs) { + callbackInvokedMs = Optional.empty(); + groupState.state = GroupState.State.COMPLETE; + } + + private void invokeRebalance(long currentTimeMs) { + if (callbackInvokedMs.isPresent()) { + // do nothing as we've invoked callback + return; + } + callbackInvokedMs = Optional.of(currentTimeMs); + eventQueue.add(new RebalanceCallbackEvent()); + } + + class RebalanceCallbackEvent extends ApplicationEvent { + protected RebalanceCallbackEvent() { + super(null); // it should actually accept a type.REBALANCE_CALLBACK_TRIGGER but i'm too lazy to create one + } + } + + class HeartbeatRequestManager implements RequestManager { + RequestState heartbeatRequestState; + + @Override + public NetworkClientDelegate.PollResult poll(long currentTimeMs) { + // just a demo, don't do anything + return null; + } + + public void sendHeartbeat(long currentTimeMs) { + heartbeatRequestState.canSendRequest(currentTimeMs); + } + } + + class ServierSideProtocol implements RebalanceProtocol { Review Comment: same, just a prototype: as you can see, nothing was implemented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org