This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd94e0  Add MultiTopicsConsumerImpl with leaked commit from 
partitionedConsumer (#1495)
ffd94e0 is described below

commit ffd94e079ae567bd67f348f6faa29e2f4d815ed5
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Wed Apr 4 08:48:17 2018 -0700

    Add MultiTopicsConsumerImpl with leaked commit from partitionedConsumer 
(#1495)
    
    * add leaked merge from partitionedConsumer
    
    * fix potential race in handleSubscribeOneTopicError
---
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java  | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6da72ec..e7d3b26 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -480,6 +480,15 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
+        
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
+        internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
+        internalConsumerConfig.setProperties(conf.getProperties());
+        internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
+
+        if (null != conf.getConsumerEventListener()) {
+            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
+        }
+
         if (conf.getCryptoKeyReader() != null) {
             
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
             
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
@@ -758,12 +767,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             consumers.values().stream().filter(consumer1 -> {
                 String consumerTopicName = consumer1.getTopic();
                 if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                    toCloseNum.incrementAndGet();
                     return true;
                 } else {
                     return false;
                 }
-            }).forEach(consumer2 -> {
-                toCloseNum.incrementAndGet();
+            }).collect(Collectors.toList()).forEach(consumer2 -> {
                 consumer2.closeAsync().whenComplete((r, ex) -> {
                     consumer2.subscribeFuture().completeExceptionally(error);
                     allTopicPartitionsNumber.decrementAndGet();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to