This is an automated email from the ASF dual-hosted git repository.
ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c185cad [GOBBLIN-760] Improve retrying behavior of throttling client,
add more informative …
c185cad is described below
commit c185cadcc3577bf7ad3cb44d47c540274fd6f0e8
Author: ibuenros <[email protected]>
AuthorDate: Wed May 1 11:03:39 2019 -0700
[GOBBLIN-760] Improve retrying behavior of throttling client, add more
informative …
Closes #2624 from ibuenros/limiter-logging
---
.../gobblin/restli/SharedRestClientFactory.java | 4 ++
.../apache/gobblin/restli/UriRestClientKey.java | 26 +-------
.../util/limiter/BatchedPermitsRequester.java | 78 +++++++++++++++++-----
.../RedirectAwareRestClientRequestSender.java | 59 +++++++++++++---
.../limiter/RestliServiceBasedLimiterTest.java | 2 +-
5 files changed, 118 insertions(+), 51 deletions(-)
diff --git
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
index a0219d2..7e04d59 100644
---
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
+++
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.util.ExecutorsUtils;
import io.netty.channel.nio.NioEventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
/**
@@ -58,6 +59,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
*
* To configure, specify rest server uri at key "serverUri". Note uri must
start with "http" or "https".
*/
+@Slf4j
public class SharedRestClientFactory<S extends ScopeType<S>> implements
SharedResourceFactory<RestClient, SharedRestClientKey, S> {
public static final String FACTORY_NAME = "restli";
@@ -82,6 +84,8 @@ public class SharedRestClientFactory<S extends ScopeType<S>>
implements SharedRe
String uriPrefix = ((UriRestClientKey) key).getUri();
+ log.info(String.format("Creating a brand new rest client for service
name %s and uri prefix %s", key.serviceName, uriPrefix));
+
HttpClientFactory http = new HttpClientFactory(FilterChains.empty(),
new NioEventLoopGroup(0 /* use default settings */,
ExecutorsUtils.newDaemonThreadFactory(Optional.<Logger>absent(),
Optional.of("R2 Nio Event Loop-%d"))),
diff --git
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
index a3e2ea9..043134b 100644
---
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
+++
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/UriRestClientKey.java
@@ -22,12 +22,14 @@ import java.net.URISyntaxException;
import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
/**
* A {@link SharedRestClientKey} that explicitly specifies the {@link URI} of
the remote server.
*/
+@EqualsAndHashCode(callSuper = true)
public class UriRestClientKey extends SharedRestClientKey {
@Getter
private final String uri;
@@ -51,28 +53,4 @@ public class UriRestClientKey extends SharedRestClientKey {
super(serviceName);
this.uri = uriPrefix;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
-
- UriRestClientKey that = (UriRestClientKey) o;
-
- return uri.equals(that.uri);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + uri.hashCode();
- return result;
- }
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index ba099a0..91f607e 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -87,6 +88,7 @@ class BatchedPermitsRequester {
private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 60000; //
10 minutes
private static final double MAX_DEPLETION_RATE = 1e20;
public static final int MAX_GROWTH_REQUEST = 2;
+ private static final long GET_PERMITS_MAX_SLEEP_MILLIS = 1000;
private static final ScheduledExecutorService SCHEDULE_EXECUTOR_SERVICE =
Executors.newScheduledThreadPool(1,
ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
@@ -102,15 +104,18 @@ class BatchedPermitsRequester {
private final Timer restRequestTimer;
private final Histogram restRequestHistogram;
- private volatile int retries = 0;
+ private volatile AtomicInteger retries = new AtomicInteger(0);
private final RetryStatus retryStatus;
private final SynchronizedAverager permitsOutstanding;
private final long targetMillisBetweenRequests;
private final AtomicLong callbackCounter;
+ /** Permit requests will timeout after this many millis. */
private final long maxTimeout;
/** Any request larger than this is known to be impossible to satisfy. */
private long knownUnsatisfiablePermits;
+ private volatile AllocationCallback currentCallback;
+
@Builder
private BatchedPermitsRequester(String resourceId, String
requestorIdentifier,
long targetMillisBetweenRequests, RequestSender requestSender,
MetricContext metricContext, long maxTimeoutMillis) {
@@ -137,7 +142,7 @@ class BatchedPermitsRequester {
this.restRequestTimer = metricContext == null ? null :
metricContext.timer(REST_REQUEST_TIMER);
this.restRequestHistogram = metricContext == null ? null :
metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
this.callbackCounter = new AtomicLong();
- this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 10000;
+ this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 120000;
this.knownUnsatisfiablePermits = Long.MAX_VALUE;
}
@@ -156,10 +161,13 @@ class BatchedPermitsRequester {
while (true) {
if (permits >= this.knownUnsatisfiablePermits) {
// We are requesting more permits than the remote policy will ever
be able to satisfy, return immediately with no permits
+ log.warn(String.format("Server has indicated number of permits is
unsatisfiable. "
+ + "Permits requested: %d, known unsatisfiable permits: %d ",
permits, this.knownUnsatisfiablePermits));
break;
}
if (elapsedMillis(startTimeNanos) > this.maxTimeout) {
// Max timeout reached, break
+ log.warn("Reached timeout waiting for permits. Timeout: " +
this.maxTimeout);
break;
}
if (this.permitBatchContainer.tryTake(permits)) {
@@ -172,7 +180,8 @@ class BatchedPermitsRequester {
if (this.callbackCounter.get() == callbackCounterSnap) {
// If a callback has happened since we tried to send the new
permit request, don't await
// Since some request senders may be synchronous, we would have
missed the notification
- boolean ignore =
this.newPermitsAvailable.await(remainingTime(startTimeNanos, this.maxTimeout),
TimeUnit.MILLISECONDS);
+ boolean ignore = this.newPermitsAvailable.await(
+ Math.min(GET_PERMITS_MAX_SLEEP_MILLIS,
remainingTime(startTimeNanos, this.maxTimeout)), TimeUnit.MILLISECONDS);
}
} else {
break;
@@ -196,18 +205,29 @@ class BatchedPermitsRequester {
/**
* Send a new permit request to the server.
*/
- private void maybeSendNewPermitRequest() {
- if (!this.requestSemaphore.tryAcquire()) {
- return;
+ private synchronized void maybeSendNewPermitRequest() {
+ while (!this.requestSemaphore.tryAcquire()) {
+ if (this.currentCallback == null) {
+ throw new IllegalStateException("Semaphore is unavailable while
callback is null!");
+ }
+ if (this.currentCallback.elapsedTime() > 30000) {
+ // If the previous callback has not returned after 30s, we consider
the call lost and try again
+ // Note we expect Rest.li to call onError for most failure situations,
this logic just handles the edge
+ // case were Rest.li fails somehow and we don't want to just hang.
+ log.warn("Last request did not return after 30s, considering it lost
and retrying.");
+ this.currentCallback.clearCallback();
+ } else {
+ return;
+ }
}
if (!this.retryStatus.canRetryNow()) {
- this.requestSemaphore.release();
+ clearSemaphore();
return;
}
try {
long permits = computeNextPermitRequest();
if (permits <= 0) {
- this.requestSemaphore.release();
+ clearSemaphore();
return;
}
@@ -221,16 +241,25 @@ class BatchedPermitsRequester {
log.debug("Sending permit request " + permitRequest);
- this.requestSender.sendRequest(permitRequest, new AllocationCallback(
+ this.currentCallback = new AllocationCallback(
BatchedPermitsRequester.this.restRequestTimer == null ?
NoopCloseable.INSTANCE :
- BatchedPermitsRequester.this.restRequestTimer.time(), new
Sleeper()));
+ BatchedPermitsRequester.this.restRequestTimer.time(), new
Sleeper());
+ this.requestSender.sendRequest(permitRequest, currentCallback);
} catch (CloneNotSupportedException cnse) {
// This should never happen.
- this.requestSemaphore.release();
+ clearSemaphore();
throw new RuntimeException(cnse);
}
}
+ private synchronized void clearSemaphore() {
+ if (this.requestSemaphore.availablePermits() > 0) {
+ throw new IllegalStateException("Semaphore should have 0 permits!");
+ }
+ BatchedPermitsRequester.this.requestSemaphore.release();
+ BatchedPermitsRequester.this.currentCallback = null;
+ }
+
/**
* @return the number of permits we should request in the next request.
*/
@@ -277,6 +306,9 @@ class BatchedPermitsRequester {
class AllocationCallback implements Callback<Response<PermitAllocation>> {
private final Closeable timerContext;
private final Sleeper sleeper;
+ private final long startTime = System.currentTimeMillis();
+
+ private volatile boolean callbackCleared = false;
public AllocationCallback(Closeable timerContext, Sleeper sleeper) {
this.timerContext = timerContext;
@@ -298,12 +330,12 @@ class BatchedPermitsRequester {
}
}
- BatchedPermitsRequester.this.retries++;
+ BatchedPermitsRequester.this.retries.incrementAndGet();
- if (BatchedPermitsRequester.this.retries >= MAX_RETRIES) {
+ if (BatchedPermitsRequester.this.retries.get() >= MAX_RETRIES) {
nonRetriableFail(exc, "Too many failures trying to communicate with
throttling service.");
} else {
- BatchedPermitsRequester.this.requestSemaphore.release();
+ clearCallback();
// retry
maybeSendNewPermitRequest();
}
@@ -321,7 +353,7 @@ class BatchedPermitsRequester {
@Override
public void onSuccess(Response<PermitAllocation> result) {
- BatchedPermitsRequester.this.retries = 0;
+ BatchedPermitsRequester.this.retries.set(0);
BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
BatchedPermitsRequester.this.lock.lock();
try {
@@ -347,7 +379,7 @@ class BatchedPermitsRequester {
BatchedPermitsRequester.this.permitBatchContainer.addPermitAllocation(allocation);
}
- BatchedPermitsRequester.this.requestSemaphore.release();
+ clearCallback();
if (allocation.getPermits() > 0) {
BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
}
@@ -364,10 +396,22 @@ class BatchedPermitsRequester {
}
}
+ public long elapsedTime() {
+ return System.currentTimeMillis() - this.startTime;
+ }
+
+ public synchronized void clearCallback() {
+ if (this.callbackCleared) {
+ return;
+ }
+ clearSemaphore();
+ this.callbackCleared = true;
+ }
+
private void nonRetriableFail(Throwable exc, String msg) {
BatchedPermitsRequester.this.retryStatus.blockRetries(RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION,
exc);
BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
- BatchedPermitsRequester.this.requestSemaphore.release();
+ clearCallback();
log.error(msg, exc);
// Wake up all threads so they can return false
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
index 2f557a5..0863b05 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
@@ -22,7 +22,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
@@ -59,6 +61,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RedirectAwareRestClientRequestSender extends
RestClientRequestSender {
+ private static final int MIN_RETRIES = 3;
+
/**
* A {@link SharedResourceFactory} that creates {@link
RedirectAwareRestClientRequestSender}s.
* @param <S>
@@ -97,6 +101,10 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
@Getter
private volatile String currentServerPrefix;
+ private String lastLogPrefix = "";
+ private AtomicInteger requestsSinceLastLog = new AtomicInteger(0);
+ private long lastLogTimeNanos = 0;
+
/**
* @param broker {@link SharedResourcesBroker} used to create {@link
RestClient}s.
* @param connectionPrefixes List of uri prefixes of available servers.
@@ -106,7 +114,7 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
throws NotConfiguredException {
this.broker = broker;
this.connectionPrefixes = connectionPrefixes;
- updateRestClient(getNextConnectionPrefix(), "service start");
+ updateRestClient(getNextConnectionPrefix(), "service start", null);
}
private String getNextConnectionPrefix() {
@@ -120,10 +128,39 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
@Override
public void sendRequest(PermitRequest request,
Callback<Response<PermitAllocation>> callback) {
- log.info("Sending request to " + getCurrentServerPrefix());
+ logRequest();
super.sendRequest(request, callback);
}
+ private void logRequest() {
+ String prefix = getCurrentServerPrefix();
+
+ if (!prefix.equals(this.lastLogPrefix)) {
+ logAggregatedRequests(this.lastLogPrefix);
+ log.info("Sending request to " + prefix);
+ this.lastLogPrefix = prefix;
+ return;
+ }
+
+ this.requestsSinceLastLog.incrementAndGet();
+ log.debug("Sending request to {}", prefix);
+
+ if (TimeUnit.SECONDS.convert(System.nanoTime() - this.lastLogTimeNanos,
TimeUnit.NANOSECONDS) > 60) { // 1 minute
+ logAggregatedRequests(prefix);
+ }
+ }
+
+ private void logAggregatedRequests(String prefix) {
+ int requests = this.requestsSinceLastLog.getAndSet(0);
+ long time = System.nanoTime();
+ long elapsedMillis = TimeUnit.MILLISECONDS.convert(time -
this.lastLogTimeNanos, TimeUnit.NANOSECONDS);
+ this.lastLogTimeNanos = time;
+
+ if (requests > 0) {
+ log.info(String.format("Made %d requests to %s over the last %d
millis.", requests, prefix, elapsedMillis));
+ }
+ }
+
@Override
protected RestClient getRestClient() {
return this.restClient;
@@ -139,8 +176,12 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
}
@VisibleForTesting
- void updateRestClient(String uri, String reason) throws
NotConfiguredException {
- log.info(String.format("Switching to server prefix %s due to: %s", uri,
reason));
+ void updateRestClient(String uri, String reason, Throwable errorCause)
throws NotConfiguredException {
+ if (errorCause == null) {
+ log.info(String.format("Switching to server prefix %s due to: %s", uri,
reason));
+ } else {
+ log.error(String.format("Switching to server prefix %s due to: %s", uri,
reason), errorCause);
+ }
this.currentServerPrefix = uri;
this.restClient = (RestClient) this.broker.getSharedResource(new
SharedRestClientFactory(),
new UriRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME, uri));
@@ -154,7 +195,7 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
private class CallbackDecorator implements
Callback<Response<PermitAllocation>> {
private final PermitRequest originalRequest;
private final Callback<Response<PermitAllocation>> underlying;
- private final ExponentialBackoff exponentialBackoff =
ExponentialBackoff.builder().maxDelay(10000L).build();
+ private final ExponentialBackoff exponentialBackoff =
ExponentialBackoff.builder().maxDelay(10000L).initialDelay(500L).build();
private int redirects = 0;
private int retries = 0;
@@ -170,16 +211,16 @@ public class RedirectAwareRestClientRequestSender extends
RestClientRequestSende
RestLiResponseException responseExc = (RestLiResponseException)
error;
String newUri = (String)
responseExc.getErrorDetails().get("Location");
RedirectAwareRestClientRequestSender.this.updateRestClient(
- SharedRestClientFactory.resolveUriPrefix(new URI(newUri)), "301
redirect");
+ SharedRestClientFactory.resolveUriPrefix(new URI(newUri)), "301
redirect", null);
this.exponentialBackoff.awaitNextRetry();
sendRequest(this.originalRequest, this);
} else if (error instanceof RemoteInvocationException
&& shouldCatchExceptionAndSwitchUrl((RemoteInvocationException)
error)) {
this.retries++;
- if (this.retries >
RedirectAwareRestClientRequestSender.this.connectionPrefixes.size()) {
- this.underlying.onError(new NonRetriableException("Failed to
connect to all available connection prefixes."));
+ if (this.retries >
RedirectAwareRestClientRequestSender.this.connectionPrefixes.size() +
MIN_RETRIES) {
+ this.underlying.onError(new NonRetriableException("Failed to
connect to all available connection prefixes.", error));
}
- updateRestClient(getNextConnectionPrefix(), "Failed to communicate
with " + getCurrentServerPrefix());
+ updateRestClient(getNextConnectionPrefix(), "Failed to communicate
with " + getCurrentServerPrefix(), error);
this.exponentialBackoff.awaitNextRetry();
sendRequest(this.originalRequest, this);
} else {
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
index feb7386..0910565 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiterTest.java
@@ -133,7 +133,7 @@ public class RestliServiceBasedLimiterTest {
// leader is currently 2501
Assert.assertEquals(parsePortOfCurrentServerPrefix(requestSender), 2501);
// set request to 2500 (not leader)
- requestSender.updateRestClient(server2500.getServer().getURIPrefix(),
"test");
+ requestSender.updateRestClient(server2500.getServer().getURIPrefix(),
"test", null);
Assert.assertEquals(parsePortOfCurrentServerPrefix(requestSender), 2500);
Assert.assertNotNull(limiter.acquirePermits(20));
// verify request sender switched back to leader