This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bdc302426db [fix][client] Fix internal receive used wrong timeout type 
(#15014)
bdc302426db is described below

commit bdc302426dba955652c743e20eef853c34849a24
Author: Kai Wang <[email protected]>
AuthorDate: Mon Apr 4 12:01:35 2022 +0800

    [fix][client] Fix internal receive used wrong timeout type (#15014)
    
    ### Motivation
    
    Currently, when we called `redeliverUnacknowledgedMessages` and consumer 
use receive with timeout unit is second, the timeout might not work.
    
    When we use the second as receive timeout unit like `consumer.receive(3, 
TimeUnit.SECONDS);`, line 473 will pass a negative timeout to the next 
`internalReceive` calls, because we are subtracting nanoseconds from seconds.
    
    
https://github.com/apache/pulsar/blob/f3b87b65c6946eb197c1eece22cff8ff04e16fcb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L459-L476
    
    
    ### Modifications
    
    * Use the same type to subtract.
    * Use long type as timeout argument.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java  | 2 +-
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 ++++---
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java     | 7 ++++---
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 5084e8df395..98dc43a1ebc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -205,7 +205,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         return internalReceive(timeout, unit);
     }
 
-    protected abstract Message<T> internalReceive(int timeout, TimeUnit unit) 
throws PulsarClientException;
+    protected abstract Message<T> internalReceive(long timeout, TimeUnit unit) 
throws PulsarClientException;
 
     @Override
     public Messages<T> batchReceive() throws PulsarClientException {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 50f0eda5267..5c5f48f7d1a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -456,7 +456,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     @Override
-    protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
+    protected Message<T> internalReceive(long timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
         long callTime = System.nanoTime();
         try {
@@ -467,10 +467,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             messageProcessed(message);
             if (!isValidConsumerEpoch(message)) {
                 long executionTime = System.nanoTime() - callTime;
-                if (executionTime >= unit.toNanos(timeout)) {
+                long timeoutInNanos = unit.toNanos(timeout);
+                if (executionTime >= timeoutInNanos) {
                     return null;
                 } else {
-                    return internalReceive((int) (timeout - executionTime), 
TimeUnit.NANOSECONDS);
+                    return internalReceive(timeoutInNanos - executionTime, 
TimeUnit.NANOSECONDS);
                 }
             }
             return beforeConsume(message);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 42f5ac0034d..57fc2b07327 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -360,7 +360,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     @Override
-    protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
+    protected Message<T> internalReceive(long timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
 
         long callTime = System.nanoTime();
@@ -371,11 +371,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 checkArgument(message instanceof TopicMessageImpl);
                 if (!isValidConsumerEpoch(message)) {
                     long executionTime = System.nanoTime() - callTime;
-                    if (executionTime >= unit.toNanos(timeout)) {
+                    long timeoutInNanos = unit.toNanos(timeout);
+                    if (executionTime >= timeoutInNanos) {
                         return null;
                     } else {
                         resumeReceivingFromPausedConsumersIfNeeded();
-                        return internalReceive((int) (timeout - 
executionTime), TimeUnit.NANOSECONDS);
+                        return internalReceive(timeoutInNanos - executionTime, 
TimeUnit.NANOSECONDS);
                     }
                 }
                 unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());

Reply via email to