This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0b37b0c Fix race condition on close consumer while reconnect to
broker. (#7589)
0b37b0c is described below
commit 0b37b0c76dd2faaa9b8cc8d5b316ff35a307cfc1
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 28 13:57:46 2020 +0800
Fix race condition on close consumer while reconnect to broker. (#7589)
### Modifications
Add state check when connection opened of the consumer. If the consumer
state is closing or closed, we don’t need to send the subscribe command
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ba54a1a..76e0726 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -729,6 +729,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
public void connectionOpened(final ClientCnx cnx) {
+ if (getState() == State.Closing || getState() == State.Closed) {
+ setState(State.Closed);
+ closeConsumerTasks();
+ client.cleanupConsumer(this);
+ failPendingReceive();
+ clearReceiverQueue();
+ return;
+ }
setClientCnx(cnx);
cnx.registerConsumer(consumerId, this);