codelipenghui commented on a change in pull request #12792:
URL: https://github.com/apache/pulsar/pull/12792#discussion_r753284405
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -191,7 +191,18 @@ public void sendActiveConsumerChange(long consumerId,
boolean isActive) {
return;
}
cnx.ctx().writeAndFlush(
- Commands.newActiveConsumerChange(consumerId, isActive),
+ Commands.newActiveConsumerChange(consumerId, isActive, null),
+ cnx.ctx().voidPromise());
+ }
+
+ @Override
+ public void sendActiveConsumerChange(long consumerId, String
keySharedProps) {
+ if
(!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion()))
{
+ // if the client is older than `v12`, we don't need to send
consumer group changes.
+ return;
+ }
+ cnx.ctx().writeAndFlush(
+ Commands.newActiveConsumerChange(consumerId, true,
keySharedProps),
Review comment:
Why we should combine the active consumer notification with the key hash
ranges change notification? It looks like in general, we should not send active
consumer notification to the consumers with Shared and Key_Shared subscription.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
##########
@@ -191,7 +191,18 @@ public void sendActiveConsumerChange(long consumerId,
boolean isActive) {
return;
}
cnx.ctx().writeAndFlush(
- Commands.newActiveConsumerChange(consumerId, isActive),
+ Commands.newActiveConsumerChange(consumerId, isActive, null),
+ cnx.ctx().voidPromise());
+ }
+
+ @Override
+ public void sendActiveConsumerChange(long consumerId, String
keySharedProps) {
+ if
(!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion()))
{
+ // if the client is older than `v12`, we don't need to send
consumer group changes.
+ return;
+ }
+ cnx.ctx().writeAndFlush(
+ Commands.newActiveConsumerChange(consumerId, true,
keySharedProps),
Review comment:
My suggestion is to use a new message for the key hash ranges change
events.
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
##########
@@ -49,4 +50,17 @@
*/
void becameInactive(Consumer<?> consumer, int partitionId);
+
+ /**
+ * Notified when the consumer key_shared_rule is changed.
+ *
+ * @param consumer
+ * the consumer that originated the event
+ * @param keyPredicate
+ * Determine whether the key will be consumed by {@param
consumer}
+ */
+ default void keySharedRuleChanged(Consumer<?> consumer, Predicate<String>
keyPredicate) {
Review comment:
`keyHashRangesChanged` may be more reasonable here.
Sorry, I'm a little confused here, why do we need a `keyPredicate` here? We
should send the latest key hash ranges to the consumer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]