This is an automated email from the ASF dual-hosted git repository.
ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a68e1eb [GOBBLIN-724] Upgrade throttling server so waiting until
tokens can be used is done…
a68e1eb is described below
commit a68e1ebd7e25cee868b5900ed699849c1ec51268
Author: ibuenros <[email protected]>
AuthorDate: Mon Apr 15 15:19:33 2019 -0700
[GOBBLIN-724] Upgrade throttling server so waiting until tokens can be used
is done…
Upgrade throttling server so waiting until tokens
can be used is done by the client instead of the
server. See GOBBLIN-724.
Add unit tests for delegated sleep for throttling
server.
Address comments.
Closes #2591 from ibuenros/throttling-waitonclient
---
.../throttling/ThrottlingProtocolVersion.java | 29 ++++++++
.../restli/throttling/PermitAllocation.pdsc | 4 +-
.../gobblin/restli/throttling/PermitRequest.pdsc | 5 +-
.../util/limiter/BatchedPermitsRequester.java | 63 +++++++++++++++--
.../util/limiter/BatchedPermitsRequesterTest.java | 32 ++++++++-
.../restli/throttling/DynamicTokenBucket.java | 48 ++++++++++---
.../restli/throttling/LimiterServerResource.java | 20 ++++++
.../gobblin/restli/throttling/QPSPolicy.java | 9 ++-
.../throttling/ThrottlingGuiceServletConfig.java | 15 ++++
.../gobblin/restli/throttling/TokenBucket.java | 7 +-
.../restli/throttling/DynamicTokenBucketTest.java | 17 +++++
.../throttling/LimiterServerResourceTest.java | 80 ++++++++++++++++++++++
.../main/java/org/apache/gobblin/util/Sleeper.java | 54 +++++++++++++++
13 files changed, 354 insertions(+), 29 deletions(-)
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
new file mode 100644
index 0000000..4c2bd12
--- /dev/null
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.restli.throttling;
+
+/**
+ * Versions of the Throttling service protocol. Allows the server to know what
the client understands, and to adjust
+ * the response based on the client version. Only add new versions at the end.
+ */
+public enum ThrottlingProtocolVersion {
+ /** Base version of throttling server. */
+ BASE,
+ /** Clients at this level know to wait before distributing permits
allocated to them. */
+ WAIT_ON_CLIENT
+}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
index 5093c92..8601d29 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
@@ -6,6 +6,8 @@
"fields": [
{"name": "permits", "type": "long", "doc": "Number of permits allocated.
This may be 0 if no permits are allocated, or the number of requested
permits."},
{"name": "expiration", "type": "long", "doc": "Expiration time in Unix
timestamp of the allocated permits."},
- {"name": "minRetryDelayMillis", "type": "long", "doc": "Client should not
try to acquire permits before this delay has passed.", "optional" : true}
+ {"name": "minRetryDelayMillis", "type": "long", "doc": "Client should not
try to acquire permits before this delay has passed.", "optional" : true},
+ {"name": "waitForPermitUseMillis", "type": "long", "doc": "Client must
wait this many millis before allocating provided permits.", "optional": true,
"default": 0},
+ {"name": "unsatisfiablePermits", "type": "long", "doc": "If larger than 0,
specifies request larger than this number are impossible to satisfy by the
policy.", "optional": true, "default": 0}
]
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
index 053d695..1cee3df 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
@@ -6,7 +6,8 @@
"fields": [
{"name": "resource", "type": "string", "doc": "Resource for which permits
are needed."},
{"name": "permits", "type": "long", "doc": "Number of permits needed."},
- {"name": "minPermits", "type": "long", "doc": "Minimum number of useful
permits.", "optional" : true},
- {"name": "requestorIdentifier", "type": "string", "doc" : "Identifier of
the service requesting the permits."}
+ {"name": "minPermits", "type": "long", "doc": "Minimum number of useful
permits.", "optional" : true, "default": 0},
+ {"name": "requestorIdentifier", "type": "string", "doc" : "Identifier of
the service requesting the permits."},
+ {"name": "version", "type": "int", "doc": "Protocol version, see
ThrottlingProtocolVersion.java. Allows the server to avoid asking the client
for unsupported operations.", "optional": true, "default": 0}
]
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index 48eb60d..c5c5460 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -50,8 +50,10 @@ import com.linkedin.restli.common.HttpStatus;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.restli.throttling.PermitAllocation;
import org.apache.gobblin.restli.throttling.PermitRequest;
+import org.apache.gobblin.restli.throttling.ThrottlingProtocolVersion;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.NoopCloseable;
+import org.apache.gobblin.util.Sleeper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import javax.annotation.Nullable;
@@ -59,7 +61,6 @@ import javax.annotation.concurrent.NotThreadSafe;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -106,10 +107,13 @@ class BatchedPermitsRequester {
private final SynchronizedAverager permitsOutstanding;
private final long targetMillisBetweenRequests;
private final AtomicLong callbackCounter;
+ private final long maxTimeout;
+ /** Any request larger than this is known to be impossible to satisfy. */
+ private long knownUnsatisfiablePermits;
@Builder
private BatchedPermitsRequester(String resourceId, String
requestorIdentifier,
- long targetMillisBetweenRequests, RequestSender requestSender,
MetricContext metricContext) {
+ long targetMillisBetweenRequests, RequestSender requestSender,
MetricContext metricContext, long maxTimeoutMillis) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(resourceId), "Must
provide a resource id.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(requestorIdentifier),
"Must provide a requestor identifier.");
@@ -133,6 +137,8 @@ class BatchedPermitsRequester {
this.restRequestTimer = metricContext == null ? null :
metricContext.timer(REST_REQUEST_TIMER);
this.restRequestHistogram = metricContext == null ? null :
metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
this.callbackCounter = new AtomicLong();
+ this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 10000;
+ this.knownUnsatisfiablePermits = Long.MAX_VALUE;
}
/**
@@ -143,21 +149,30 @@ class BatchedPermitsRequester {
if (permits <= 0) {
return true;
}
+ long startTimeNanos = System.nanoTime();
this.permitsOutstanding.addEntryWithWeight(permits);
this.lock.lock();
try {
while (true) {
+ if (permits > this.knownUnsatisfiablePermits) {
+ // We are requesting more permits than the remote policy will ever
be able to satisfy, return immediately with no permits
+ break;
+ }
+ if (elapsedMillis(startTimeNanos) > this.maxTimeout) {
+ // Max timeout reached, break
+ break;
+ }
if (this.permitBatchContainer.tryTake(permits)) {
this.permitsOutstanding.removeEntryWithWeight(permits);
return true;
}
- if (this.retryStatus.canRetryWithinMillis(10000)) {
+ if
(this.retryStatus.canRetryWithinMillis(remainingTime(startTimeNanos,
this.maxTimeout))) {
long callbackCounterSnap = this.callbackCounter.get();
maybeSendNewPermitRequest();
if (this.callbackCounter.get() == callbackCounterSnap) {
// If a callback has happened since we tried to send the new
permit request, don't await
// Since some request senders may be synchronous, we would have
missed the notification
- this.newPermitsAvailable.await();
+ boolean ignore =
this.newPermitsAvailable.await(remainingTime(startTimeNanos, this.maxTimeout),
TimeUnit.MILLISECONDS);
}
} else {
break;
@@ -166,9 +181,18 @@ class BatchedPermitsRequester {
} finally {
this.lock.unlock();
}
+ this.permitsOutstanding.removeEntryWithWeight(permits);
return false;
}
+ private long remainingTime(long startTimeNanos, long timeout) {
+ return Math.max(timeout - elapsedMillis(startTimeNanos), 0);
+ }
+
+ private long elapsedMillis(long startTimeNanos) {
+ return (System.nanoTime() - startTimeNanos)/1000000;
+ }
+
/**
* Send a new permit request to the server.
*/
@@ -190,6 +214,7 @@ class BatchedPermitsRequester {
PermitRequest permitRequest = this.basePermitRequest.copy();
permitRequest.setPermits(permits);
permitRequest.setMinPermits((long)
this.permitsOutstanding.getAverageWeightOrZero());
+
permitRequest.setVersion(ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal());
if (BatchedPermitsRequester.this.restRequestHistogram != null) {
BatchedPermitsRequester.this.restRequestHistogram.update(permits);
}
@@ -198,7 +223,7 @@ class BatchedPermitsRequester {
this.requestSender.sendRequest(permitRequest, new AllocationCallback(
BatchedPermitsRequester.this.restRequestTimer == null ?
NoopCloseable.INSTANCE :
- BatchedPermitsRequester.this.restRequestTimer.time()));
+ BatchedPermitsRequester.this.restRequestTimer.time(), new
Sleeper()));
} catch (CloneNotSupportedException cnse) {
// This should never happen.
this.requestSemaphore.release();
@@ -240,12 +265,23 @@ class BatchedPermitsRequester {
}
}
+ @VisibleForTesting
+ AllocationCallback createAllocationCallback(Sleeper sleeper) {
+ return new AllocationCallback(new NoopCloseable(), sleeper);
+ }
+
/**
* Callback for Rest request.
*/
- @RequiredArgsConstructor
- private class AllocationCallback implements
Callback<Response<PermitAllocation>> {
+ @VisibleForTesting
+ class AllocationCallback implements Callback<Response<PermitAllocation>> {
private final Closeable timerContext;
+ private final Sleeper sleeper;
+
+ public AllocationCallback(Closeable timerContext, Sleeper sleeper) {
+ this.timerContext = timerContext;
+ this.sleeper = sleeper;
+ }
@Override
public void onError(Throwable exc) {
@@ -298,13 +334,26 @@ class BatchedPermitsRequester {
BatchedPermitsRequester.this.retryStatus.blockRetries(retryDelay,
null);
}
+ long waitForUse =
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT);
+ if (waitForUse > 0) {
+ this.sleeper.sleep(waitForUse);
+ }
+
+ if (allocation.getUnsatisfiablePermits(GetMode.DEFAULT) > 0) {
+ BatchedPermitsRequester.this.knownUnsatisfiablePermits =
allocation.getUnsatisfiablePermits(GetMode.DEFAULT);
+ }
+
if (allocation.getPermits() > 0) {
BatchedPermitsRequester.this.permitBatchContainer.addPermitAllocation(allocation);
}
+
BatchedPermitsRequester.this.requestSemaphore.release();
if (allocation.getPermits() > 0) {
BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
}
+ } catch (InterruptedException ie) {
+ // Thread was interrupted while waiting for permits to be usable.
Permits are not yet usable, so will not
+ // add permits to container
} finally {
try {
this.timerContext.close();
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
index b02f545..0fb487f 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.util.Sleeper;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.testng.Assert;
@@ -35,7 +36,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.Queues;
import com.linkedin.common.callback.Callback;
import com.linkedin.restli.client.Response;
-import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.HttpStatus;
@@ -158,6 +158,36 @@ public class BatchedPermitsRequesterTest {
}
}
+ @Test
+ public void testWaitToUsePermits() throws Exception {
+ Queue<RequestAndCallback> queue = Queues.newArrayDeque();
+
+ BatchedPermitsRequester container =
BatchedPermitsRequester.builder().resourceId("resource")
+ .requestorIdentifier("requestor").requestSender(new
TestRequestSender(queue, false)).build();
+
+ Sleeper.MockSleeper mockWaiter = new Sleeper.MockSleeper();
+ BatchedPermitsRequester.AllocationCallback callback =
container.createAllocationCallback(mockWaiter);
+
+ PermitAllocation allocation = new PermitAllocation();
+ allocation.setPermits(10);
+ allocation.setWaitForPermitUseMillis(20);
+ allocation.setExpiration(Long.MAX_VALUE);
+
+ Response<PermitAllocation> response = Mockito.mock(Response.class);
+ Mockito.when(response.getEntity()).thenReturn(allocation);
+
+ callback.onSuccess(response);
+ Assert.assertEquals((long) mockWaiter.getRequestedSleeps().peek(), 20);
+
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
10);
+
+ // A zero wait will not trigger a wait in the requester
+ allocation.setWaitForPermitUseMillis(0);
+ mockWaiter.reset();
+ callback.onSuccess(response);
+ Assert.assertTrue(mockWaiter.getRequestedSleeps().isEmpty());
+
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
20);
+ }
+
public static class TestRequestSender implements RequestSender {
private final Queue<RequestAndCallback> requestAndCallbacks;
private final boolean autoSatisfyRequests;
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
index c0f1d33..c2a3f12 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +41,16 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DynamicTokenBucket {
+ /**
+ * Contains number of allocated permits and delay before they can be used.
+ */
+ @Data
+ public static class PermitsAndDelay {
+ private final long permits;
+ private final long delay;
+ private final boolean possibleToSatisfy;
+ }
+
@VisibleForTesting
@Getter
private final TokenBucket tokenBucket;
@@ -63,39 +74,56 @@ public class DynamicTokenBucket {
* @param minPermits the minimum number of tokens useful for the calling
process. If this many tokens cannot be acquired,
* the method will return 0 instead,
* @param timeoutMillis the maximum wait the calling process is willing to
wait for tokens.
- * @return the number of allocated tokens.
+ * @return a {@link PermitsAndDelay} for the allocated permits.
*/
- public long getPermits(long requestedPermits, long minPermits, long
timeoutMillis) {
-
+ public PermitsAndDelay getPermitsAndDelay(long requestedPermits, long
minPermits, long timeoutMillis) {
try {
long storedTokens = this.tokenBucket.getStoredTokens();
long eagerTokens = storedTokens / 2;
if (eagerTokens > requestedPermits &&
this.tokenBucket.getTokens(eagerTokens, 0, TimeUnit.MILLISECONDS)) {
- return eagerTokens;
+ return new PermitsAndDelay(eagerTokens, 0, true);
}
long millisToSatisfyMinPermits = (long) (minPermits /
this.tokenBucket.getTokensPerMilli());
if (millisToSatisfyMinPermits > timeoutMillis) {
- return 0;
+ return new PermitsAndDelay(0, 0, false);
}
long allowedTimeout = Math.min(millisToSatisfyMinPermits +
this.baseTimeout, timeoutMillis);
while (requestedPermits > minPermits) {
- if (this.tokenBucket.getTokens(requestedPermits, allowedTimeout,
TimeUnit.MILLISECONDS)) {
- return requestedPermits;
+ long wait = this.tokenBucket.tryReserveTokens(requestedPermits,
allowedTimeout);
+ if (wait >= 0) {
+ return new PermitsAndDelay(requestedPermits, wait, true);
}
requestedPermits /= 2;
}
- if (this.tokenBucket.getTokens(minPermits, allowedTimeout,
TimeUnit.MILLISECONDS)) {
- return minPermits;
+ long wait = this.tokenBucket.tryReserveTokens(minPermits,
allowedTimeout);
+ if (wait >= 0) {
+ return new PermitsAndDelay(requestedPermits, wait, true);
}
+
} catch (InterruptedException ie) {
// Fallback to returning 0
}
- return 0;
+ return new PermitsAndDelay(0, 0, true);
+ }
+
+ /**
+ * Request tokens. Like {@link #getPermitsAndDelay(long, long, long)} but
block until the wait time passes.
+ */
+ public long getPermits(long requestedPermits, long minPermits, long
timeoutMillis) {
+ PermitsAndDelay permitsAndDelay = getPermitsAndDelay(requestedPermits,
minPermits, timeoutMillis);
+ if (permitsAndDelay.delay > 0) {
+ try {
+ Thread.sleep(permitsAndDelay.delay);
+ } catch (InterruptedException ie) {
+ return 0;
+ }
+ }
+ return permitsAndDelay.permits;
}
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
index 0f06967..099505f 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.data.DataMap;
+import com.linkedin.data.template.GetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -45,6 +46,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.broker.MetricContextFactory;
import org.apache.gobblin.metrics.broker.SubTaggedMetricContextKey;
import org.apache.gobblin.util.NoopCloseable;
+import org.apache.gobblin.util.Sleeper;
import org.apache.gobblin.util.limiter.Limiter;
import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
@@ -87,6 +89,9 @@ public class LimiterServerResource extends
ComplexKeyResourceAsyncTemplate<Permi
@Inject @Named(LEADER_FINDER_INJECT_NAME)
Optional<LeaderFinder<URIMetadata>> leaderFinderOpt;
+ @Inject
+ Sleeper sleeper;
+
/**
* Request permits from the limiter server. The returned {@link
PermitAllocation} specifies the number of permits
* that the client can use.
@@ -124,6 +129,21 @@ public class LimiterServerResource extends
ComplexKeyResourceAsyncTemplate<Permi
try (Closeable thisContext = limiterTimer.time()) {
allocation = policy.computePermitAllocation(request);
}
+
+ if (request.getVersion(GetMode.DEFAULT) <
ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal()) {
+ // If the client does not understand "waitForPermitsUse", delay the
response at the server side.
+ // This has a detrimental effect to server performance
+ long wait = allocation.getWaitForPermitUseMillis(GetMode.DEFAULT);
+ allocation.setWaitForPermitUseMillis(0);
+ if (wait > 0) {
+ try {
+ this.sleeper.sleep(wait);
+ } catch (InterruptedException ie) {
+ allocation.setPermits(0);
+ }
+ }
+ }
+
permitsGrantedMeter.mark(allocation.getPermits());
callback.onSuccess(allocation);
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
index dbb05b9..be1ecb8 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
@@ -91,12 +91,15 @@ public class QPSPolicy implements ThrottlingPolicy {
minPermits = permitsRequested;
}
- long permitsGranted = this.tokenBucket.getPermits(permitsRequested,
minPermits, LimiterServerResource.TIMEOUT_MILLIS);
+ DynamicTokenBucket.PermitsAndDelay permitsGranted =
+ this.tokenBucket.getPermitsAndDelay(permitsRequested, minPermits,
LimiterServerResource.TIMEOUT_MILLIS);
PermitAllocation allocation = new PermitAllocation();
- allocation.setPermits(permitsGranted);
+ allocation.setPermits(permitsGranted.getPermits());
allocation.setExpiration(Long.MAX_VALUE);
- if (permitsGranted <= 0) {
+ allocation.setWaitForPermitUseMillis(permitsGranted.getDelay());
+ allocation.setUnsatisfiablePermits(request.getMinPermits(GetMode.DEFAULT));
+ if (permitsGranted.getPermits() <= 0) {
allocation.setMinRetryDelayMillis(LimiterServerResource.TIMEOUT_MILLIS);
}
return allocation;
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
index 6c43f84..28c9267 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.broker.MetricContextFactory;
import org.apache.gobblin.metrics.broker.MetricContextKey;
+import org.apache.gobblin.util.Sleeper;
import java.io.Closeable;
import java.io.IOException;
@@ -82,6 +83,7 @@ public class ThrottlingGuiceServletConfig extends
GuiceServletContextListener im
private Optional<LeaderFinder<URIMetadata>> _leaderFinder;
private Config _config;
+ private Sleeper _sleeper = null;
private Injector _injector;
@Override
@@ -99,6 +101,14 @@ public class ThrottlingGuiceServletConfig extends
GuiceServletContextListener im
super.contextInitialized(servletContextEvent);
}
+ /**
+ * Use a mock sleeper for testing. Note this should be called before
initialization.
+ */
+ public Sleeper.MockSleeper mockSleeper() {
+ this._sleeper = new Sleeper.MockSleeper();
+ return (Sleeper.MockSleeper) this._sleeper;
+ }
+
public void initialize(Config config) {
try {
this._config = config;
@@ -128,9 +138,14 @@ public class ThrottlingGuiceServletConfig extends
GuiceServletContextListener im
protected void configure() {
try {
+ if (_sleeper == null) {
+ _sleeper = new Sleeper();
+ }
+
RestLiConfig restLiConfig = new RestLiConfig();
restLiConfig.setResourcePackageNames("org.apache.gobblin.restli.throttling");
bind(RestLiConfig.class).toInstance(restLiConfig);
+ bind(Sleeper.class).toInstance(_sleeper);
bind(SharedResourcesBroker.class).annotatedWith(Names.named(LimiterServerResource.BROKER_INJECT_NAME)).toInstance(topLevelBroker);
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
index ac21e87..35cb630 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
@@ -68,10 +68,7 @@ public class TokenBucket {
*/
public boolean getTokens(long tokens, long timeout, TimeUnit timeoutUnit)
throws InterruptedException {
long timeoutMillis = timeoutUnit.toMillis(timeout);
- long wait;
- synchronized (this) {
- wait = tryReserveTokens(tokens, timeoutMillis);
- }
+ long wait = tryReserveTokens(tokens, timeoutMillis);
if (wait < 0) {
return false;
@@ -101,7 +98,7 @@ public class TokenBucket {
*
* @return the wait until the tokens are available or negative if they can't
be acquired in the give timeout.
*/
- private long tryReserveTokens(long tokens, long maxWaitMillis) {
+ synchronized long tryReserveTokens(long tokens, long maxWaitMillis) {
long now = System.currentTimeMillis();
long waitUntilNextTokenAvailable = Math.max(0,
this.nextTokenAvailableMillis - now);
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
index 95960e8..9501db3 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
@@ -40,6 +40,23 @@ public class DynamicTokenBucketTest {
}
@Test
+ public void testDelegateSleep() throws Exception {
+ int qps = 10;
+ DynamicTokenBucket limiter = new DynamicTokenBucket(qps, 10, 0);
+
+ long startTime = System.currentTimeMillis();
+ // Requesting 10 seconds worth of permits with 20 second timeout
+ DynamicTokenBucket.PermitsAndDelay permitsAndDelay =
limiter.getPermitsAndDelay(10 * qps, 10 * qps, 20000);
+ long elapsed = System.currentTimeMillis() - startTime;
+ // verify call returned immediately
+ Assert.assertTrue(elapsed < 1000);
+
+ // Verify we got expected tokens and delay is about 10 seconds
+ Assert.assertEquals(permitsAndDelay.getPermits(), 10 * qps);
+ Assert.assertTrue(permitsAndDelay.getDelay() > 9000 &&
permitsAndDelay.getDelay() < 11000);
+ }
+
+ @Test
public void testEagerGrantingIfUnderused() throws Exception {
int qps = 100;
DynamicTokenBucket limiter = new DynamicTokenBucket(qps, 10, 100);
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
index efac2a2..bc538ec 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
@@ -19,15 +19,19 @@ package org.apache.gobblin.restli.throttling;
import java.util.Map;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.util.Sleeper;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.codahale.metrics.Timer;
import com.google.inject.Injector;
+import com.linkedin.data.template.GetMode;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.RestLiServiceException;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
@@ -57,6 +61,53 @@ public class LimiterServerResourceTest {
}
@Test
+ public void testSleepOnClientDelegation() {
+
+ ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
+ SharedLimiterKey res1key = new SharedLimiterKey("res1");
+
+ Map<String, String> configMap = ImmutableMap.<String, String>builder()
+ .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key,
null, ThrottlingPolicyFactory.POLICY_KEY),
+ TestWaitPolicy.class.getName())
+ .build();
+
+ ThrottlingGuiceServletConfig guiceServletConfig = new
ThrottlingGuiceServletConfig();
+ Sleeper.MockSleeper sleeper = guiceServletConfig.mockSleeper();
+ guiceServletConfig.initialize(ConfigFactory.parseMap(configMap));
+ Injector injector = guiceServletConfig.getInjector();
+
+ LimiterServerResource limiterServer =
injector.getInstance(LimiterServerResource.class);
+
+ PermitRequest request = new PermitRequest();
+ request.setPermits(5);
+ request.setResource(res1key.getResourceLimitedPath());
+ request.setVersion(ThrottlingProtocolVersion.BASE.ordinal());
+
+ // policy does not require sleep, verify no sleep happened or is requested
from client
+ PermitAllocation allocation = limiterServer.getSync(new
ComplexResourceKey<>(request, new EmptyRecord()));
+ Assert.assertEquals((long) allocation.getPermits(), 5);
+ Assert.assertEquals((long)
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 0);
+ Assert.assertTrue(sleeper.getRequestedSleeps().isEmpty());
+
+ // policy requests a sleep of 10 millis, using BASE protocol version,
verify server executes the sleep
+ request.setPermits(20);
+ request.setVersion(ThrottlingProtocolVersion.BASE.ordinal());
+ allocation = limiterServer.getSync(new ComplexResourceKey<>(request, new
EmptyRecord()));
+ Assert.assertEquals((long) allocation.getPermits(), 20);
+ Assert.assertEquals((long)
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 0);
+ Assert.assertEquals((long) sleeper.getRequestedSleeps().peek(), 10);
+ sleeper.reset();
+
+ // policy requests a sleep of 10 millis, using WAIT_ON_CLIENT protocol
version, verify server delegates sleep to client
+ request.setVersion(ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal());
+ request.setPermits(20);
+ allocation = limiterServer.getSync(new ComplexResourceKey<>(request, new
EmptyRecord()));
+ Assert.assertEquals((long) allocation.getPermits(), 20);
+ Assert.assertEquals((long)
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 10);
+ Assert.assertTrue(sleeper.getRequestedSleeps().isEmpty());
+ }
+
+ @Test
public void testLimitedRequests() {
ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
@@ -141,4 +192,33 @@ public class LimiterServerResourceTest {
Assert.assertEquals(timer.getCount(), 3);
}
+ public static class TestWaitPolicy implements ThrottlingPolicy,
ThrottlingPolicyFactory.SpecificPolicyFactory {
+ @Override
+ public PermitAllocation computePermitAllocation(PermitRequest request) {
+
+ PermitAllocation allocation = new PermitAllocation();
+ allocation.setPermits(request.getPermits());
+ if (request.getPermits() > 10) {
+ allocation.setWaitForPermitUseMillis(10);
+ }
+ return allocation;
+ }
+
+ @Override
+ public Map<String, String> getParameters() {
+ return null;
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+
+ @Override
+ public ThrottlingPolicy createPolicy(SharedLimiterKey sharedLimiterKey,
+ SharedResourcesBroker<ThrottlingServerScopes> broker, Config config) {
+ return this;
+ }
+ }
+
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java
new file mode 100644
index 0000000..60c308b
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import lombok.Getter;
+
+
+/**
+ * A class surrounding {@link Thread#sleep(long)} that allows mocking sleeps
during testing.
+ */
+public class Sleeper {
+
+ /**
+ * A mock version of {@link Sleeper} that just register calls to sleep but
returns immediately.
+ */
+ @Getter
+ public static class MockSleeper extends Sleeper {
+ private Queue<Long> requestedSleeps = new LinkedList<>();
+
+ @Override
+ public void sleep(long millis) {
+ this.requestedSleeps.add(millis);
+ }
+
+ public void reset() {
+ this.requestedSleeps.clear();
+ }
+ }
+
+ /**
+ * Equivalent to {@link Thread#sleep(long)}.
+ */
+ public void sleep(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+}