This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bdc302426db [fix][client] Fix internal receive used wrong timeout type
(#15014)
bdc302426db is described below
commit bdc302426dba955652c743e20eef853c34849a24
Author: Kai Wang <[email protected]>
AuthorDate: Mon Apr 4 12:01:35 2022 +0800
[fix][client] Fix internal receive used wrong timeout type (#15014)
### Motivation
Currently, when we called `redeliverUnacknowledgedMessages` and consumer
use receive with timeout unit is second, the timeout might not work.
When we use the second as receive timeout unit like `consumer.receive(3,
TimeUnit.SECONDS);`, line 473 will pass a negative timeout to the next
`internalReceive` calls, because we are subtracting nanoseconds from seconds.
https://github.com/apache/pulsar/blob/f3b87b65c6946eb197c1eece22cff8ff04e16fcb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L459-L476
### Modifications
* Use the same type to subtract.
* Use long type as timeout argument.
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 2 +-
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 ++++---
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 7 ++++---
3 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 5084e8df395..98dc43a1ebc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -205,7 +205,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
return internalReceive(timeout, unit);
}
- protected abstract Message<T> internalReceive(int timeout, TimeUnit unit)
throws PulsarClientException;
+ protected abstract Message<T> internalReceive(long timeout, TimeUnit unit)
throws PulsarClientException;
@Override
public Messages<T> batchReceive() throws PulsarClientException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 50f0eda5267..5c5f48f7d1a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -456,7 +456,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
@Override
- protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
+ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws
PulsarClientException {
Message<T> message;
long callTime = System.nanoTime();
try {
@@ -467,10 +467,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
messageProcessed(message);
if (!isValidConsumerEpoch(message)) {
long executionTime = System.nanoTime() - callTime;
- if (executionTime >= unit.toNanos(timeout)) {
+ long timeoutInNanos = unit.toNanos(timeout);
+ if (executionTime >= timeoutInNanos) {
return null;
} else {
- return internalReceive((int) (timeout - executionTime),
TimeUnit.NANOSECONDS);
+ return internalReceive(timeoutInNanos - executionTime,
TimeUnit.NANOSECONDS);
}
}
return beforeConsume(message);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 42f5ac0034d..57fc2b07327 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -360,7 +360,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
@Override
- protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
+ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws
PulsarClientException {
Message<T> message;
long callTime = System.nanoTime();
@@ -371,11 +371,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(message instanceof TopicMessageImpl);
if (!isValidConsumerEpoch(message)) {
long executionTime = System.nanoTime() - callTime;
- if (executionTime >= unit.toNanos(timeout)) {
+ long timeoutInNanos = unit.toNanos(timeout);
+ if (executionTime >= timeoutInNanos) {
return null;
} else {
resumeReceivingFromPausedConsumersIfNeeded();
- return internalReceive((int) (timeout -
executionTime), TimeUnit.NANOSECONDS);
+ return internalReceive(timeoutInNanos - executionTime,
TimeUnit.NANOSECONDS);
}
}
unAckedMessageTracker.add(message.getMessageId(),
message.getRedeliveryCount());