This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8b921c always remove message data size (#8566)
a8b921c is described below
commit a8b921cf15c0a0f652f1d9f62a6481efea243881
Author: Mak <[email protected]>
AuthorDate: Sun Feb 28 10:07:16 2021 +0800
always remove message data size (#8566)
* always remove message data size
Signed-off-by: mak <[email protected]>
* Fix the CI issue
* Fix the CI
Co-authored-by: Yong Zhang <[email protected]>
---
.../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2a2e77a..c7f67f4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1343,13 +1343,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was
cleared after reconnection.
- return;
- }
-
- increaseAvailablePermits(currentCnx);
- stats.updateNumMsgsReceived(msg);
+ } else {
+ increaseAvailablePermits(currentCnx);
+ stats.updateNumMsgsReceived(msg);
- trackMessage(msg);
+ trackMessage(msg);
+ }
decreaseIncomingMessageSize(msg);
}