This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bce00b Fixed issue in backoff builder configuration (#3984)
6bce00b is described below
commit 6bce00bd177cc6c1d78dcbcf6cc3355d09456c6b
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 8 18:22:00 2019 -0700
Fixed issue in backoff builder configuration (#3984)
### Motivation
In #3848 there was a mismatch of the values when converting from
constructor to builder. The values for `setMandatoryStop()` and `setMax()` were
inverted.
That makes a client to keep reconnecting every 100 millis, instead of doing
the expected backoff.
---
.../org/apache/pulsar/client/impl/Backoff.java | 28 ++++++++++------------
.../apache/pulsar/client/impl/ConsumerImpl.java | 20 ++++++++--------
.../apache/pulsar/client/impl/ProducerImpl.java | 8 +++----
3 files changed, 26 insertions(+), 30 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index 1cb2e1e..868c375 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -55,7 +55,7 @@ public class Backoff {
@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax,
long mandatoryStop,
TimeUnit unitMandatoryStop, Clock clock) {
- this(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, clock,
+ this(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, clock,
Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS,
Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit
unitMax, long mandatoryStop,
@@ -65,16 +65,16 @@ public class Backoff {
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit
unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, long backoffIntervalMs, long
maxBackoffIntervalMs) {
- this(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, Clock.systemDefaultZone(),
+ this(initial, unitInitial, max, unitMax, mandatoryStop,
unitMandatoryStop, Clock.systemDefaultZone(),
backoffIntervalMs, maxBackoffIntervalMs);
}
-
+
public long next() {
long current = this.next;
if (current < max) {
this.next = Math.min(this.next * 2, this.max);
}
-
+
// Check for mandatory stop
if (!mandatoryStopMade) {
long now = clock.millis();
@@ -84,14 +84,14 @@ public class Backoff {
} else {
timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
}
-
+
if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
current = Math.max(initial, mandatoryStop -
timeElapsedSinceFirstBackoff);
mandatoryStopMade = true;
}
}
-
- // Randomly decrease the timeout up to 10% to avoid simultaneous
retries
+
+ // Randomly decrease the timeout up to 10% to avoid simultaneous
retries
// If current < 10 then current/10 < 1 and we get an exception from
Random saying "Bound must be positive"
if (current > 10) {
current -= random.nextInt((int) current / 10);
@@ -119,13 +119,13 @@ public class Backoff {
long backoffIntervalNanos() {
return backoffIntervalNanos;
}
-
+
@VisibleForTesting
long maxBackoffIntervalNanos() {
return maxBackoffIntervalNanos;
}
-
- public static boolean shouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts,
+
+ public static boolean shouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts,
long
defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long currentTime = System.nanoTime();
@@ -141,13 +141,9 @@ public class Backoff {
// if the current time is less than the time at which next retry
should occur, we should backoff
return currentTime < (initialTimestampInNano + interval);
}
-
+
public static boolean shouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts) {
- return Backoff.shouldBackoff(initialTimestamp, unitInitial,
failedAttempts,
+ return Backoff.shouldBackoff(initialTimestamp, unitInitial,
failedAttempts,
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
-
- public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit
unitInitial, int failedAttempts) {
- return Backoff.shouldBackoff(initialTimestamp, unitInitial,
failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
- }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6dc96d7..9b24d35 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -155,7 +155,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex,
CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId,
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- return ConsumerImpl.newConsumerImpl(client, topic, conf,
listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
+ return ConsumerImpl.newConsumerImpl(client, topic, conf,
listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
startMessageId, schema,
interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS,
Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
@@ -171,7 +171,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
subscriptionMode, startMessageId, schema, interceptors,
backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
-
+
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex,
CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId,
Schema<T> schema, ConsumerInterceptors<T> interceptors,
@@ -221,10 +221,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
- .setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(60, TimeUnit.SECONDS)
- .setMax(0, TimeUnit.MILLISECONDS)
-
.useUserConfiguredIntervals(backoffIntervalNanos,
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(60, TimeUnit.SECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create(),
this);
@@ -1471,12 +1471,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
- .setMax(0, TimeUnit.MILLISECONDS)
- .useUserConfiguredIntervals(backoffIntervalNanos,
+ .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create();
-
+
CompletableFuture<MessageId> getLastMessageIdFuture = new
CompletableFuture<>();
internalGetLastMessageIdAsync(backoff, opTimeoutMs,
getLastMessageIdFuture);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c80e529..b4a0993 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -185,13 +185,13 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
- .setMandatoryStop(60, TimeUnit.SECONDS)
- .setMax(Math.max(100, conf.getSendTimeoutMs() -
100), TimeUnit.MILLISECONDS)
-
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
+ .setMax(60, TimeUnit.SECONDS)
+ .setMandatoryStop(Math.max(100,
conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
+
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos())
.create(),
this);
-
+
grabCnx();
}