This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 271c6ee Fixed initialization order of acknowledgmentsGroupingTracker
in ConsumerImpl (#2399)
271c6ee is described below
commit 271c6ee56a007506020ecf317c88df95e4511714
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Aug 20 10:39:54 2018 -0700
Fixed initialization order of acknowledgmentsGroupingTracker in
ConsumerImpl (#2399)
### Motivation
With delayed acks enabled (the default), there is a potential race
condition that lead to a NPE:
```
java.lang.NullPointerException
at
org.apache.pulsar.client.impl.ConsumerImpl.getClientCnx(ConsumerImpl.java:1446)
at
org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.flush(PersistentAcknowledgmentsGroupingTracker.java:154)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
...
```
The reason is that the delayed ack commit task gets scheduled (eg: in
100ms) and might be executed before the the main thread has finished
initializing the `ConsumerImpl` instance.
### Modifications
Reordered the initialization in `ConsumerImpl` constructor to make sure
`connectionHandler` is already set when we create the
`PersistentAcknowledgmentsGroupingTracker` instance.
---
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e94724..da04534 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -160,15 +160,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition =
conf.getSubscriptionInitialPosition();
- TopicName topicName = TopicName.get(topic);
- if (topicName.isPersistent()) {
- this.acknowledgmentsGroupingTracker =
- new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
- } else {
- this.acknowledgmentsGroupingTracker =
- NonPersistentAcknowledgmentGroupingTracker.of();
- }
-
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
} else {
@@ -203,6 +194,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0,
TimeUnit.MILLISECONDS),
this);
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPersistent()) {
+ this.acknowledgmentsGroupingTracker =
+ new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
+ } else {
+ this.acknowledgmentsGroupingTracker =
+ NonPersistentAcknowledgmentGroupingTracker.of();
+ }
+
grabCnx();
}