hangc0276 commented on a change in pull request #12792:
URL: https://github.com/apache/pulsar/pull/12792#discussion_r752821409



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -74,6 +80,7 @@ public void addConsumer(Consumer consumer) throws 
ConsumerAssignException {
                     }
                 });
             }
+            notifyActiveConsumerChanged();

Review comment:
       Why call notifyActiveConsumerChanged in write lock area?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -465,7 +466,20 @@ protected void 
handleActiveConsumerChange(CommandActiveConsumerChange change) {
         }
         ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
         if (consumer != null) {
-            consumer.activeConsumerChanged(change.isIsActive());
+            switch (consumer.getSubType()){
+                case Failover:{
+                    consumer.activeConsumerChanged(change.isIsActive(), null);
+                    break;
+                }
+                case Key_Shared:{
+                    if (StringUtils.isNoneBlank(change.getKeySharedProps())) {
+                        consumer.activeConsumerChanged(true,
+                                
StickyKeyConsumerPredicate.decode(change.getKeySharedProps()));
+                    }
+                    break;
+                }
+                default : {}

Review comment:
       do we nedd `default: {}` ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -148,4 +156,29 @@ public Consumer select(int hash) {
     Map<Integer, List<Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(hashRing);
     }
+
+    @Override
+    public StickyKeyConsumerPredicate generateSpecialPredicate(final Consumer 
consumer){
+        NavigableMap<Integer, List<String>> cpHashRing = new 
ConcurrentSkipListMap<>();
+        for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
+            if (CollectionUtils.isEmpty(entry.getValue())) {
+                continue;
+            }
+            cpHashRing.put(entry.getKey(), entry.getValue()
+                    .stream()
+                    .map(v -> v == consumer ? 
StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK
+                            : StickyKeyConsumerPredicate.OTHER_CONSUMER_MARK)
+                    .collect(Collectors.toList())
+            );
+        }
+        return new 
StickyKeyConsumerPredicate.Predicate4ConsistentHashingStickyKeyConsumerSelector(cpHashRing);
+    }
+
+    private void notifyActiveConsumerChanged() {
+        // TODO add configuration for this events in future
+        Set<Consumer> consumerSet = new HashSet<>(hashRing.values().stream()

Review comment:
       can be replaced to `Set<Consumer> consumerSet = 
hashRing.values().stream()
               .flatMap(Collection::stream).collect(Collectors.toSet());`

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
##########
@@ -99,6 +100,11 @@ public void becameInactive(Consumer<?> consumer, int 
partitionId) {
             } catch (InterruptedException e) {
             }
         }
+
+        @Override
+        public void keySharedRuleChanged(Consumer<?> consumer, 
Predicate<String> keyPredicate) {

Review comment:
       you have add the `default` tag for this method in the interface, do we 
need to override it in sub class?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -96,9 +104,20 @@ public synchronized void removeConsumer(Consumer consumer) {
                 rangeMap.put(removeRange, lowerEntry.getValue());
                 rangeMap.remove(lowerEntry.getKey());
                 consumerRange.put(lowerEntry.getValue(), removeRange);
+                // notify change for effected consumer
+                lowerEntry.getValue().notifyActiveConsumerChange(
+                        
generateSpecialPredicate(lowerEntry.getValue()).encode());
             } else {
                 rangeMap.remove(removeRange);
+                // notify change for effected consumer.
+                Map.Entry<Integer, Consumer> lowerEntry = 
rangeMap.higherEntry(removeRange);
+                if (lowerEntry != null) {
+                    lowerEntry.getValue().notifyActiveConsumerChange(
+                            
generateSpecialPredicate(lowerEntry.getValue()).encode());
+                }
             }
+//            // notify removed consumer change

Review comment:
       remove this code

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
##########
@@ -148,4 +156,29 @@ public Consumer select(int hash) {
     Map<Integer, List<Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(hashRing);
     }
+
+    @Override
+    public StickyKeyConsumerPredicate generateSpecialPredicate(final Consumer 
consumer){
+        NavigableMap<Integer, List<String>> cpHashRing = new 
ConcurrentSkipListMap<>();

Review comment:
       Do we need ConcurrentSkipListMap? 

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -643,6 +643,15 @@ public static ByteBuf newActiveConsumerChange(long 
consumerId, boolean isActive)
         return serializeWithSize(cmd);
     }
 
+    public static ByteBuf newActiveConsumerChange(long consumerId, String 
keySharedProps) {

Review comment:
       Can we merge this method into the above method?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -96,9 +104,20 @@ public synchronized void removeConsumer(Consumer consumer) {
                 rangeMap.put(removeRange, lowerEntry.getValue());
                 rangeMap.remove(lowerEntry.getKey());
                 consumerRange.put(lowerEntry.getValue(), removeRange);
+                // notify change for effected consumer
+                lowerEntry.getValue().notifyActiveConsumerChange(
+                        
generateSpecialPredicate(lowerEntry.getValue()).encode());
             } else {
                 rangeMap.remove(removeRange);
+                // notify change for effected consumer.
+                Map.Entry<Integer, Consumer> lowerEntry = 
rangeMap.higherEntry(removeRange);
+                if (lowerEntry != null) {
+                    lowerEntry.getValue().notifyActiveConsumerChange(

Review comment:
       please simplify the duplicated code

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
##########
@@ -421,7 +421,8 @@ public Clock getClientClock() {
                     "Read compacted can only be used with exclusive or 
failover persistent subscriptions"));
         }
 
-        if (conf.getConsumerEventListener() != null && 
conf.getSubscriptionType() != SubscriptionType.Failover) {
+        if (conf.getConsumerEventListener() != null && 
conf.getSubscriptionType() != SubscriptionType.Failover
+                && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(

Review comment:
       Please update the exception message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to