This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c185cad  [GOBBLIN-760] Improve retrying behavior of throttling client, 
add more informative …
c185cad is described below

commit c185cadcc3577bf7ad3cb44d47c540274fd6f0e8
Author: ibuenros <[email protected]>
AuthorDate: Wed May 1 11:03:39 2019 -0700

    [GOBBLIN-760] Improve retrying behavior of throttling client, add more 
informative …
    
    Closes #2624 from ibuenros/limiter-logging
---
 .../gobblin/restli/SharedRestClientFactory.java    |  4 ++
 .../apache/gobblin/restli/UriRestClientKey.java    | 26 +-------
 .../util/limiter/BatchedPermitsRequester.java      | 78 +++++++++++++++++-----
 .../RedirectAwareRestClientRequestSender.java      | 59 +++++++++++++---
 .../limiter/RestliServiceBasedLimiterTest.java     |  2 +-
 5 files changed, 118 insertions(+), 51 deletions(-)

diff --git 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
index a0219d2..7e04d59 100644
--- 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
+++ 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.util.ExecutorsUtils;
 
 import io.netty.channel.nio.NioEventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
 
 
 /**
@@ -58,6 +59,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
  *
  * To configure, specify rest server uri at key "serverUri". Note uri must 
start with "http" or "https".
  */
+@Slf4j
 public class SharedRestClientFactory<S extends ScopeType<S>> implements 
SharedResourceFactory<RestClient, SharedRestClientKey, S> {
 
   public static final String FACTORY_NAME = "restli";
@@ -82,6 +84,8 @@ public class SharedRestClientFactory<S extends ScopeType<S>> 
implements SharedRe
 
       String uriPrefix = ((UriRestClientKey) key).getUri();
 
+      log.info(String.format("Creating a brand new rest client for service 
name %s and uri prefix %s", key.serviceName, uriPrefix));
+
       HttpClientFactory http = new HttpClientFactory(FilterChains.empty(),
           new NioEventLoopGroup(0 /* use default settings */,
               ExecutorsUtils.newDaemonThreadFactory(Optional.<Logger>absent(), 
Optional.of("R2 Nio Event Loop-%d"))),
diff --git 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
index a3e2ea9..043134b 100644
--- 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
+++ 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
@@ -22,12 +22,14 @@ import java.net.URISyntaxException;
 
 import com.google.common.base.Preconditions;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 
 /**
  * A {@link SharedRestClientKey} that explicitly specifies the {@link URI} of 
the remote server.
  */
+@EqualsAndHashCode(callSuper = true)
 public class UriRestClientKey extends SharedRestClientKey {
   @Getter
   private final String uri;
@@ -51,28 +53,4 @@ public class UriRestClientKey extends SharedRestClientKey {
     super(serviceName);
     this.uri = uriPrefix;
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-
-    UriRestClientKey that = (UriRestClientKey) o;
-
-    return uri.equals(that.uri);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    result = 31 * result + uri.hashCode();
-    return result;
-  }
 }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index ba099a0..91f607e 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -87,6 +88,7 @@ class BatchedPermitsRequester {
   private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 60000; // 
10 minutes
   private static final double MAX_DEPLETION_RATE = 1e20;
   public static final int MAX_GROWTH_REQUEST = 2;
+  private static final long GET_PERMITS_MAX_SLEEP_MILLIS = 1000;
 
   private static final ScheduledExecutorService SCHEDULE_EXECUTOR_SERVICE =
       Executors.newScheduledThreadPool(1, 
ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
@@ -102,15 +104,18 @@ class BatchedPermitsRequester {
   private final Timer restRequestTimer;
   private final Histogram restRequestHistogram;
 
-  private volatile int retries = 0;
+  private volatile AtomicInteger retries = new AtomicInteger(0);
   private final RetryStatus retryStatus;
   private final SynchronizedAverager permitsOutstanding;
   private final long targetMillisBetweenRequests;
   private final AtomicLong callbackCounter;
+  /** Permit requests will timeout after this many millis. */
   private final long maxTimeout;
   /** Any request larger than this is known to be impossible to satisfy. */
   private long knownUnsatisfiablePermits;
 
+  private volatile AllocationCallback currentCallback;
+
   @Builder
   private BatchedPermitsRequester(String resourceId, String 
requestorIdentifier,
       long targetMillisBetweenRequests, RequestSender requestSender, 
MetricContext metricContext, long maxTimeoutMillis) {
@@ -137,7 +142,7 @@ class BatchedPermitsRequester {
     this.restRequestTimer = metricContext == null ? null : 
metricContext.timer(REST_REQUEST_TIMER);
     this.restRequestHistogram = metricContext == null ? null : 
metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
     this.callbackCounter = new AtomicLong();
-    this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 10000;
+    this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 120000;
     this.knownUnsatisfiablePermits = Long.MAX_VALUE;
   }
 
@@ -156,10 +161,13 @@ class BatchedPermitsRequester {
       while (true) {
         if (permits >= this.knownUnsatisfiablePermits) {
           // We are requesting more permits than the remote policy will ever 
be able to satisfy, return immediately with no permits
+          log.warn(String.format("Server has indicated number of permits is 
unsatisfiable. "
+              + "Permits requested: %d, known unsatisfiable permits: %d ", 
permits, this.knownUnsatisfiablePermits));
           break;
         }
         if (elapsedMillis(startTimeNanos) > this.maxTimeout) {
           // Max timeout reached, break
+          log.warn("Reached timeout waiting for permits. Timeout: " + 
this.maxTimeout);
           break;
         }
         if (this.permitBatchContainer.tryTake(permits)) {
@@ -172,7 +180,8 @@ class BatchedPermitsRequester {
           if (this.callbackCounter.get() == callbackCounterSnap) {
             // If a callback has happened since we tried to send the new 
permit request, don't await
             // Since some request senders may be synchronous, we would have 
missed the notification
-            boolean ignore = 
this.newPermitsAvailable.await(remainingTime(startTimeNanos, this.maxTimeout), 
TimeUnit.MILLISECONDS);
+            boolean ignore = this.newPermitsAvailable.await(
+                Math.min(GET_PERMITS_MAX_SLEEP_MILLIS, 
remainingTime(startTimeNanos, this.maxTimeout)), TimeUnit.MILLISECONDS);
           }
         } else {
           break;
@@ -196,18 +205,29 @@ class BatchedPermitsRequester {
   /**
    * Send a new permit request to the server.
    */
-  private void maybeSendNewPermitRequest() {
-    if (!this.requestSemaphore.tryAcquire()) {
-      return;
+  private synchronized void maybeSendNewPermitRequest() {
+    while (!this.requestSemaphore.tryAcquire()) {
+      if (this.currentCallback == null) {
+        throw new IllegalStateException("Semaphore is unavailable while 
callback is null!");
+      }
+      if (this.currentCallback.elapsedTime() > 30000) {
+        // If the previous callback has not returned after 30s, we consider 
the call lost and try again
+        // Note we expect Rest.li to call onError for most failure situations, 
this logic just handles the edge
+        // case were Rest.li fails somehow and we don't want to just hang.
+        log.warn("Last request did not return after 30s, considering it lost 
and retrying.");
+        this.currentCallback.clearCallback();
+      } else {
+        return;
+      }
     }
     if (!this.retryStatus.canRetryNow()) {
-      this.requestSemaphore.release();
+      clearSemaphore();
       return;
     }
     try {
       long permits = computeNextPermitRequest();
       if (permits <= 0) {
-        this.requestSemaphore.release();
+        clearSemaphore();
         return;
       }
 
@@ -221,16 +241,25 @@ class BatchedPermitsRequester {
 
       log.debug("Sending permit request " + permitRequest);
 
-      this.requestSender.sendRequest(permitRequest, new AllocationCallback(
+      this.currentCallback = new AllocationCallback(
           BatchedPermitsRequester.this.restRequestTimer == null ? 
NoopCloseable.INSTANCE :
-              BatchedPermitsRequester.this.restRequestTimer.time(), new 
Sleeper()));
+              BatchedPermitsRequester.this.restRequestTimer.time(), new 
Sleeper());
+      this.requestSender.sendRequest(permitRequest, currentCallback);
     } catch (CloneNotSupportedException cnse) {
       // This should never happen.
-      this.requestSemaphore.release();
+      clearSemaphore();
       throw new RuntimeException(cnse);
     }
   }
 
+  private synchronized void clearSemaphore() {
+    if (this.requestSemaphore.availablePermits() > 0) {
+      throw new IllegalStateException("Semaphore should have 0 permits!");
+    }
+    BatchedPermitsRequester.this.requestSemaphore.release();
+    BatchedPermitsRequester.this.currentCallback = null;
+  }
+
   /**
    * @return the number of permits we should request in the next request.
    */
@@ -277,6 +306,9 @@ class BatchedPermitsRequester {
   class AllocationCallback implements Callback<Response<PermitAllocation>> {
     private final Closeable timerContext;
     private final Sleeper sleeper;
+    private final long startTime = System.currentTimeMillis();
+
+    private volatile boolean callbackCleared = false;
 
     public AllocationCallback(Closeable timerContext, Sleeper sleeper) {
       this.timerContext = timerContext;
@@ -298,12 +330,12 @@ class BatchedPermitsRequester {
           }
         }
 
-        BatchedPermitsRequester.this.retries++;
+        BatchedPermitsRequester.this.retries.incrementAndGet();
 
-        if (BatchedPermitsRequester.this.retries >= MAX_RETRIES) {
+        if (BatchedPermitsRequester.this.retries.get() >= MAX_RETRIES) {
           nonRetriableFail(exc, "Too many failures trying to communicate with 
throttling service.");
         } else {
-          BatchedPermitsRequester.this.requestSemaphore.release();
+          clearCallback();
           // retry
           maybeSendNewPermitRequest();
         }
@@ -321,7 +353,7 @@ class BatchedPermitsRequester {
 
     @Override
     public void onSuccess(Response<PermitAllocation> result) {
-      BatchedPermitsRequester.this.retries = 0;
+      BatchedPermitsRequester.this.retries.set(0);
       BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
       BatchedPermitsRequester.this.lock.lock();
       try {
@@ -347,7 +379,7 @@ class BatchedPermitsRequester {
           
BatchedPermitsRequester.this.permitBatchContainer.addPermitAllocation(allocation);
         }
 
-        BatchedPermitsRequester.this.requestSemaphore.release();
+        clearCallback();
         if (allocation.getPermits() > 0) {
           BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
         }
@@ -364,10 +396,22 @@ class BatchedPermitsRequester {
       }
     }
 
+    public long elapsedTime() {
+      return System.currentTimeMillis() - this.startTime;
+    }
+
+    public synchronized void clearCallback() {
+      if (this.callbackCleared) {
+        return;
+      }
+      clearSemaphore();
+      this.callbackCleared = true;
+    }
+
     private void nonRetriableFail(Throwable exc, String msg) {
       
BatchedPermitsRequester.this.retryStatus.blockRetries(RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION,
 exc);
       BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
-      BatchedPermitsRequester.this.requestSemaphore.release();
+      clearCallback();
       log.error(msg, exc);
 
       // Wake up all threads so they can return false
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
index 2f557a5..0863b05 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
@@ -22,7 +22,9 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.common.callback.Callback;
@@ -59,6 +61,8 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSender {
 
+  private static final int MIN_RETRIES = 3;
+
   /**
    * A {@link SharedResourceFactory} that creates {@link 
RedirectAwareRestClientRequestSender}s.
    * @param <S>
@@ -97,6 +101,10 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
   @Getter
   private volatile String currentServerPrefix;
 
+  private String lastLogPrefix = "";
+  private AtomicInteger requestsSinceLastLog = new AtomicInteger(0);
+  private long lastLogTimeNanos = 0;
+
   /**
    * @param broker {@link SharedResourcesBroker} used to create {@link 
RestClient}s.
    * @param connectionPrefixes List of uri prefixes of available servers.
@@ -106,7 +114,7 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
       throws NotConfiguredException {
     this.broker = broker;
     this.connectionPrefixes = connectionPrefixes;
-    updateRestClient(getNextConnectionPrefix(), "service start");
+    updateRestClient(getNextConnectionPrefix(), "service start", null);
   }
 
   private String getNextConnectionPrefix() {
@@ -120,10 +128,39 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
 
   @Override
   public void sendRequest(PermitRequest request, 
Callback<Response<PermitAllocation>> callback) {
-    log.info("Sending request to " + getCurrentServerPrefix());
+    logRequest();
     super.sendRequest(request, callback);
   }
 
+  private void logRequest() {
+    String prefix = getCurrentServerPrefix();
+
+    if (!prefix.equals(this.lastLogPrefix)) {
+      logAggregatedRequests(this.lastLogPrefix);
+      log.info("Sending request to " + prefix);
+      this.lastLogPrefix = prefix;
+      return;
+    }
+
+    this.requestsSinceLastLog.incrementAndGet();
+    log.debug("Sending request to {}", prefix);
+
+    if (TimeUnit.SECONDS.convert(System.nanoTime() - this.lastLogTimeNanos, 
TimeUnit.NANOSECONDS) > 60) { // 1 minute
+      logAggregatedRequests(prefix);
+    }
+  }
+
+  private void logAggregatedRequests(String prefix) {
+    int requests = this.requestsSinceLastLog.getAndSet(0);
+    long time = System.nanoTime();
+    long elapsedMillis = TimeUnit.MILLISECONDS.convert(time - 
this.lastLogTimeNanos, TimeUnit.NANOSECONDS);
+    this.lastLogTimeNanos = time;
+
+    if (requests > 0) {
+      log.info(String.format("Made %d requests to %s over the last %d 
millis.", requests, prefix, elapsedMillis));
+    }
+  }
+
   @Override
   protected RestClient getRestClient() {
     return this.restClient;
@@ -139,8 +176,12 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
   }
 
   @VisibleForTesting
-  void updateRestClient(String uri, String reason) throws 
NotConfiguredException {
-    log.info(String.format("Switching to server prefix %s due to: %s", uri, 
reason));
+  void updateRestClient(String uri, String reason, Throwable errorCause) 
throws NotConfiguredException {
+    if (errorCause == null) {
+      log.info(String.format("Switching to server prefix %s due to: %s", uri, 
reason));
+    } else {
+      log.error(String.format("Switching to server prefix %s due to: %s", uri, 
reason), errorCause);
+    }
     this.currentServerPrefix = uri;
     this.restClient = (RestClient) this.broker.getSharedResource(new 
SharedRestClientFactory(),
           new UriRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME, uri));
@@ -154,7 +195,7 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
   private class CallbackDecorator implements 
Callback<Response<PermitAllocation>> {
     private final PermitRequest originalRequest;
     private final Callback<Response<PermitAllocation>> underlying;
-    private final ExponentialBackoff exponentialBackoff = 
ExponentialBackoff.builder().maxDelay(10000L).build();
+    private final ExponentialBackoff exponentialBackoff = 
ExponentialBackoff.builder().maxDelay(10000L).initialDelay(500L).build();
     private int redirects = 0;
     private int retries = 0;
 
@@ -170,16 +211,16 @@ public class RedirectAwareRestClientRequestSender extends 
RestClientRequestSende
           RestLiResponseException responseExc = (RestLiResponseException) 
error;
           String newUri = (String) 
responseExc.getErrorDetails().get("Location");
           RedirectAwareRestClientRequestSender.this.updateRestClient(
-              SharedRestClientFactory.resolveUriPrefix(new URI(newUri)), "301 
redirect");
+              SharedRestClientFactory.resolveUriPrefix(new URI(newUri)), "301 
redirect", null);
           this.exponentialBackoff.awaitNextRetry();
           sendRequest(this.originalRequest, this);
         } else if (error instanceof RemoteInvocationException
             && shouldCatchExceptionAndSwitchUrl((RemoteInvocationException) 
error)) {
           this.retries++;
-          if (this.retries > 
RedirectAwareRestClientRequestSender.this.connectionPrefixes.size()) {
-            this.underlying.onError(new NonRetriableException("Failed to 
connect to all available connection prefixes."));
+          if (this.retries > 
RedirectAwareRestClientRequestSender.this.connectionPrefixes.size() + 
MIN_RETRIES) {
+            this.underlying.onError(new NonRetriableException("Failed to 
connect to all available connection prefixes.", error));
           }
-          updateRestClient(getNextConnectionPrefix(), "Failed to communicate 
with " + getCurrentServerPrefix());
+          updateRestClient(getNextConnectionPrefix(), "Failed to communicate 
with " + getCurrentServerPrefix(), error);
           this.exponentialBackoff.awaitNextRetry();
           sendRequest(this.originalRequest, this);
         } else {
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
index feb7386..0910565 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
@@ -133,7 +133,7 @@ public class RestliServiceBasedLimiterTest {
       // leader is currently 2501
       Assert.assertEquals(parsePortOfCurrentServerPrefix(requestSender), 2501);
       // set request to 2500 (not leader)
-      requestSender.updateRestClient(server2500.getServer().getURIPrefix(), 
"test");
+      requestSender.updateRestClient(server2500.getServer().getURIPrefix(), 
"test", null);
       Assert.assertEquals(parsePortOfCurrentServerPrefix(requestSender), 2500);
       Assert.assertNotNull(limiter.acquirePermits(20));
       // verify request sender switched back to leader

Reply via email to