This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ffd94e0 Add MultiTopicsConsumerImpl with leaked commit from partitionedConsumer (#1495) ffd94e0 is described below commit ffd94e079ae567bd67f348f6faa29e2f4d815ed5 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Wed Apr 4 08:48:17 2018 -0700 Add MultiTopicsConsumerImpl with leaked commit from partitionedConsumer (#1495) * add leaked merge from partitionedConsumer * fix potential race in handleSubscribeOneTopicError --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6da72ec..e7d3b26 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -480,6 +480,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); internalConsumerConfig.setConsumerName(consumerName); + internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros()); + internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); + internalConsumerConfig.setProperties(conf.getProperties()); + internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); + + if (null != conf.getConsumerEventListener()) { + internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); + } + if (conf.getCryptoKeyReader() != null) { internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); @@ -758,12 +767,12 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { consumers.values().stream().filter(consumer1 -> { String consumerTopicName = consumer1.getTopic(); if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) { + toCloseNum.incrementAndGet(); return true; } else { return false; } - }).forEach(consumer2 -> { - toCloseNum.incrementAndGet(); + }).collect(Collectors.toList()).forEach(consumer2 -> { consumer2.closeAsync().whenComplete((r, ex) -> { consumer2.subscribeFuture().completeExceptionally(error); allTopicPartitionsNumber.decrementAndGet(); -- To stop receiving notification emails like this one, please contact mme...@apache.org.