This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ffd94e0 Add MultiTopicsConsumerImpl with leaked commit from
partitionedConsumer (#1495)
ffd94e0 is described below
commit ffd94e079ae567bd67f348f6faa29e2f4d815ed5
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Apr 4 08:48:17 2018 -0700
Add MultiTopicsConsumerImpl with leaked commit from partitionedConsumer
(#1495)
* add leaked merge from partitionedConsumer
* fix potential race in handleSubscribeOneTopicError
---
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6da72ec..e7d3b26 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -480,6 +480,15 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
+
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
+ internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
+ internalConsumerConfig.setProperties(conf.getProperties());
+ internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
+
+ if (null != conf.getConsumerEventListener()) {
+
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
+ }
+
if (conf.getCryptoKeyReader() != null) {
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
@@ -758,12 +767,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
consumers.values().stream().filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+ toCloseNum.incrementAndGet();
return true;
} else {
return false;
}
- }).forEach(consumer2 -> {
- toCloseNum.incrementAndGet();
+ }).collect(Collectors.toList()).forEach(consumer2 -> {
consumer2.closeAsync().whenComplete((r, ex) -> {
consumer2.subscribeFuture().completeExceptionally(error);
allTopicPartitionsNumber.decrementAndGet();
--
To stop receiving notification emails like this one, please contact
[email protected].