This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bce00b  Fixed issue in backoff builder configuration (#3984)
6bce00b is described below

commit 6bce00bd177cc6c1d78dcbcf6cc3355d09456c6b
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 8 18:22:00 2019 -0700

    Fixed issue in backoff builder configuration (#3984)
    
    ### Motivation
    
    In #3848 there was a mismatch of the values when converting from 
constructor to builder. The values for `setMandatoryStop()` and `setMax()` were 
inverted.
    
    That makes a client to keep reconnecting every 100 millis, instead of doing 
the expected backoff.
---
 .../org/apache/pulsar/client/impl/Backoff.java     | 28 ++++++++++------------
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 20 ++++++++--------
 .../apache/pulsar/client/impl/ProducerImpl.java    |  8 +++----
 3 files changed, 26 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
index 1cb2e1e..868c375 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java
@@ -55,7 +55,7 @@ public class Backoff {
     @VisibleForTesting
     Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, 
long mandatoryStop,
             TimeUnit unitMandatoryStop, Clock clock) {
-       this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, clock, 
+        this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, clock,
                 Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, 
Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
     public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit 
unitMax, long mandatoryStop,
@@ -65,16 +65,16 @@ public class Backoff {
 
     public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit 
unitMax, long mandatoryStop,
                    TimeUnit unitMandatoryStop, long backoffIntervalMs, long 
maxBackoffIntervalMs) {
-       this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, Clock.systemDefaultZone(), 
+        this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, Clock.systemDefaultZone(),
             backoffIntervalMs, maxBackoffIntervalMs);
     }
-    
+
     public long next() {
         long current = this.next;
         if (current < max) {
             this.next = Math.min(this.next * 2, this.max);
         }
-        
+
         // Check for mandatory stop
         if (!mandatoryStopMade) {
             long now = clock.millis();
@@ -84,14 +84,14 @@ public class Backoff {
             } else {
                 timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
             }
-    
+
             if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
                 current = Math.max(initial, mandatoryStop - 
timeElapsedSinceFirstBackoff);
                 mandatoryStopMade = true;
             }
         }
-        
-        // Randomly decrease the timeout up to 10% to avoid simultaneous 
retries        
+
+        // Randomly decrease the timeout up to 10% to avoid simultaneous 
retries
         // If current < 10 then current/10 < 1 and we get an exception from 
Random saying "Bound must be positive"
         if (current > 10) {
             current -= random.nextInt((int) current / 10);
@@ -119,13 +119,13 @@ public class Backoff {
     long backoffIntervalNanos() {
        return backoffIntervalNanos;
     }
-    
+
     @VisibleForTesting
     long maxBackoffIntervalNanos() {
        return maxBackoffIntervalNanos;
     }
-    
-    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts, 
+
+    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts,
                                                                        long 
defaultInterval, long maxBackoffInterval) {
        long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
         long currentTime = System.nanoTime();
@@ -141,13 +141,9 @@ public class Backoff {
         // if the current time is less than the time at which next retry 
should occur, we should backoff
         return currentTime < (initialTimestampInNano + interval);
     }
-    
+
     public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts) {
-        return Backoff.shouldBackoff(initialTimestamp, unitInitial, 
failedAttempts, 
+        return Backoff.shouldBackoff(initialTimestamp, unitInitial, 
failedAttempts,
                                                                 
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
-    
-    public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts) {
-       return Backoff.shouldBackoff(initialTimestamp, unitInitial, 
failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
-    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6dc96d7..9b24d35 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -155,7 +155,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        return ConsumerImpl.newConsumerImpl(client, topic, conf, 
listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode, 
+        return ConsumerImpl.newConsumerImpl(client, topic, conf, 
listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
                                             startMessageId, schema, 
interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, 
Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
     }
 
@@ -171,7 +171,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     subscriptionMode, startMessageId, schema, interceptors, 
backoffIntervalNanos, maxBackoffIntervalNanos);
         }
     }
-    
+
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
                  SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors<T> interceptors,
@@ -221,10 +221,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         this.connectionHandler = new ConnectionHandler(this,
                                new BackoffBuilder()
-                              .setInitialTime(100, TimeUnit.MILLISECONDS)
-                              .setMandatoryStop(60, TimeUnit.SECONDS)
-                              .setMax(0, TimeUnit.MILLISECONDS)
-                              
.useUserConfiguredIntervals(backoffIntervalNanos, 
+                           .setInitialTime(100, TimeUnit.MILLISECONDS)
+                           .setMax(60, TimeUnit.SECONDS)
+                           .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                           .useUserConfiguredIntervals(backoffIntervalNanos,
                                                           
maxBackoffIntervalNanos)
                               .create(),
                         this);
@@ -1471,12 +1471,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                .setMax(0, TimeUnit.MILLISECONDS)
-                .useUserConfiguredIntervals(backoffIntervalNanos, 
+                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                .useUserConfiguredIntervals(backoffIntervalNanos,
                                             maxBackoffIntervalNanos)
                 .create();
-        
+
         CompletableFuture<MessageId> getLastMessageIdFuture = new 
CompletableFuture<>();
 
         internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c80e529..b4a0993 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -185,13 +185,13 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         this.connectionHandler = new ConnectionHandler(this,
                new BackoffBuilder()
                    .setInitialTime(100, TimeUnit.MILLISECONDS)
-                           .setMandatoryStop(60, TimeUnit.SECONDS)
-                           .setMax(Math.max(100, conf.getSendTimeoutMs() - 
100), TimeUnit.MILLISECONDS)
-                           
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
 
+                           .setMax(60, TimeUnit.SECONDS)
+                           .setMandatoryStop(Math.max(100, 
conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
+                           
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
                                                        
client.getConfiguration().getMaxBackoffIntervalNanos())
                            .create(),
             this);
-     
+
         grabCnx();
     }
 

Reply via email to