This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a68e1eb  [GOBBLIN-724] Upgrade throttling server so waiting until 
tokens can be used is done…
a68e1eb is described below

commit a68e1ebd7e25cee868b5900ed699849c1ec51268
Author: ibuenros <[email protected]>
AuthorDate: Mon Apr 15 15:19:33 2019 -0700

    [GOBBLIN-724] Upgrade throttling server so waiting until tokens can be used 
is done…
    
    Upgrade throttling server so waiting until tokens
    can be used is done by the client instead of the
    server. See GOBBLIN-724.
    
    Add unit tests for delegated sleep for throttling
    server.
    
    Address comments.
    
    Closes #2591 from ibuenros/throttling-waitonclient
---
 .../throttling/ThrottlingProtocolVersion.java      | 29 ++++++++
 .../restli/throttling/PermitAllocation.pdsc        |  4 +-
 .../gobblin/restli/throttling/PermitRequest.pdsc   |  5 +-
 .../util/limiter/BatchedPermitsRequester.java      | 63 +++++++++++++++--
 .../util/limiter/BatchedPermitsRequesterTest.java  | 32 ++++++++-
 .../restli/throttling/DynamicTokenBucket.java      | 48 ++++++++++---
 .../restli/throttling/LimiterServerResource.java   | 20 ++++++
 .../gobblin/restli/throttling/QPSPolicy.java       |  9 ++-
 .../throttling/ThrottlingGuiceServletConfig.java   | 15 ++++
 .../gobblin/restli/throttling/TokenBucket.java     |  7 +-
 .../restli/throttling/DynamicTokenBucketTest.java  | 17 +++++
 .../throttling/LimiterServerResourceTest.java      | 80 ++++++++++++++++++++++
 .../main/java/org/apache/gobblin/util/Sleeper.java | 54 +++++++++++++++
 13 files changed, 354 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
new file mode 100644
index 0000000..4c2bd12
--- /dev/null
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingProtocolVersion.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.restli.throttling;
+
+/**
+ * Versions of the Throttling service protocol. Allows the server to know what 
the client understands, and to adjust
+ * the response based on the client version. Only add new versions at the end.
+ */
+public enum ThrottlingProtocolVersion {
+       /** Base version of throttling server. */
+       BASE,
+       /** Clients at this level know to wait before distributing permits 
allocated to them. */
+       WAIT_ON_CLIENT
+}
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
index 5093c92..8601d29 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitAllocation.pdsc
@@ -6,6 +6,8 @@
   "fields": [
     {"name": "permits", "type": "long", "doc": "Number of permits allocated. 
This may be 0 if no permits are allocated, or the number of requested 
permits."},
     {"name": "expiration", "type": "long", "doc": "Expiration time in Unix 
timestamp of the allocated permits."},
-    {"name": "minRetryDelayMillis", "type": "long", "doc": "Client should not 
try to acquire permits before this delay has passed.", "optional" : true}
+    {"name": "minRetryDelayMillis", "type": "long", "doc": "Client should not 
try to acquire permits before this delay has passed.", "optional" : true},
+    {"name": "waitForPermitUseMillis", "type": "long", "doc": "Client must 
wait this many millis before allocating provided permits.", "optional": true, 
"default": 0},
+    {"name": "unsatisfiablePermits", "type": "long", "doc": "If larger than 0, 
specifies request larger than this number are impossible to satisfy by the 
policy.", "optional": true, "default": 0}
   ]
 }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
index 053d695..1cee3df 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-api/src/main/pegasus/org/apache/gobblin/restli/throttling/PermitRequest.pdsc
@@ -6,7 +6,8 @@
   "fields": [
     {"name": "resource", "type": "string", "doc": "Resource for which permits 
are needed."},
     {"name": "permits", "type": "long", "doc": "Number of permits needed."},
-    {"name": "minPermits", "type": "long", "doc": "Minimum number of useful 
permits.", "optional" : true},
-    {"name": "requestorIdentifier", "type": "string", "doc" : "Identifier of 
the service requesting the permits."}
+    {"name": "minPermits", "type": "long", "doc": "Minimum number of useful 
permits.", "optional" : true, "default": 0},
+    {"name": "requestorIdentifier", "type": "string", "doc" : "Identifier of 
the service requesting the permits."},
+    {"name": "version", "type": "int", "doc": "Protocol version, see 
ThrottlingProtocolVersion.java. Allows the server to avoid asking the client 
for unsupported operations.", "optional": true, "default": 0}
   ]
 }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index 48eb60d..c5c5460 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -50,8 +50,10 @@ import com.linkedin.restli.common.HttpStatus;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.restli.throttling.PermitAllocation;
 import org.apache.gobblin.restli.throttling.PermitRequest;
+import org.apache.gobblin.restli.throttling.ThrottlingProtocolVersion;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.NoopCloseable;
+import org.apache.gobblin.util.Sleeper;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import javax.annotation.Nullable;
@@ -59,7 +61,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 import lombok.AccessLevel;
 import lombok.Builder;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -106,10 +107,13 @@ class BatchedPermitsRequester {
   private final SynchronizedAverager permitsOutstanding;
   private final long targetMillisBetweenRequests;
   private final AtomicLong callbackCounter;
+  private final long maxTimeout;
+  /** Any request larger than this is known to be impossible to satisfy. */
+  private long knownUnsatisfiablePermits;
 
   @Builder
   private BatchedPermitsRequester(String resourceId, String 
requestorIdentifier,
-      long targetMillisBetweenRequests, RequestSender requestSender, 
MetricContext metricContext) {
+      long targetMillisBetweenRequests, RequestSender requestSender, 
MetricContext metricContext, long maxTimeoutMillis) {
 
     Preconditions.checkArgument(!Strings.isNullOrEmpty(resourceId), "Must 
provide a resource id.");
     Preconditions.checkArgument(!Strings.isNullOrEmpty(requestorIdentifier), 
"Must provide a requestor identifier.");
@@ -133,6 +137,8 @@ class BatchedPermitsRequester {
     this.restRequestTimer = metricContext == null ? null : 
metricContext.timer(REST_REQUEST_TIMER);
     this.restRequestHistogram = metricContext == null ? null : 
metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
     this.callbackCounter = new AtomicLong();
+    this.maxTimeout = maxTimeoutMillis > 0 ? maxTimeoutMillis : 10000;
+    this.knownUnsatisfiablePermits = Long.MAX_VALUE;
   }
 
   /**
@@ -143,21 +149,30 @@ class BatchedPermitsRequester {
     if (permits <= 0) {
       return true;
     }
+    long startTimeNanos = System.nanoTime();
     this.permitsOutstanding.addEntryWithWeight(permits);
     this.lock.lock();
     try {
       while (true) {
+        if (permits > this.knownUnsatisfiablePermits) {
+          // We are requesting more permits than the remote policy will ever 
be able to satisfy, return immediately with no permits
+          break;
+        }
+        if (elapsedMillis(startTimeNanos) > this.maxTimeout) {
+          // Max timeout reached, break
+          break;
+        }
         if (this.permitBatchContainer.tryTake(permits)) {
           this.permitsOutstanding.removeEntryWithWeight(permits);
           return true;
         }
-        if (this.retryStatus.canRetryWithinMillis(10000)) {
+        if 
(this.retryStatus.canRetryWithinMillis(remainingTime(startTimeNanos, 
this.maxTimeout))) {
           long callbackCounterSnap = this.callbackCounter.get();
           maybeSendNewPermitRequest();
           if (this.callbackCounter.get() == callbackCounterSnap) {
             // If a callback has happened since we tried to send the new 
permit request, don't await
             // Since some request senders may be synchronous, we would have 
missed the notification
-            this.newPermitsAvailable.await();
+            boolean ignore = 
this.newPermitsAvailable.await(remainingTime(startTimeNanos, this.maxTimeout), 
TimeUnit.MILLISECONDS);
           }
         } else {
           break;
@@ -166,9 +181,18 @@ class BatchedPermitsRequester {
     } finally {
       this.lock.unlock();
     }
+    this.permitsOutstanding.removeEntryWithWeight(permits);
     return false;
   }
 
+  private long remainingTime(long startTimeNanos, long timeout) {
+    return Math.max(timeout - elapsedMillis(startTimeNanos), 0);
+  }
+
+  private long elapsedMillis(long startTimeNanos) {
+    return (System.nanoTime() - startTimeNanos)/1000000;
+  }
+
   /**
    * Send a new permit request to the server.
    */
@@ -190,6 +214,7 @@ class BatchedPermitsRequester {
       PermitRequest permitRequest = this.basePermitRequest.copy();
       permitRequest.setPermits(permits);
       permitRequest.setMinPermits((long) 
this.permitsOutstanding.getAverageWeightOrZero());
+      
permitRequest.setVersion(ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal());
       if (BatchedPermitsRequester.this.restRequestHistogram != null) {
         BatchedPermitsRequester.this.restRequestHistogram.update(permits);
       }
@@ -198,7 +223,7 @@ class BatchedPermitsRequester {
 
       this.requestSender.sendRequest(permitRequest, new AllocationCallback(
           BatchedPermitsRequester.this.restRequestTimer == null ? 
NoopCloseable.INSTANCE :
-              BatchedPermitsRequester.this.restRequestTimer.time()));
+              BatchedPermitsRequester.this.restRequestTimer.time(), new 
Sleeper()));
     } catch (CloneNotSupportedException cnse) {
       // This should never happen.
       this.requestSemaphore.release();
@@ -240,12 +265,23 @@ class BatchedPermitsRequester {
     }
   }
 
+  @VisibleForTesting
+  AllocationCallback createAllocationCallback(Sleeper sleeper) {
+    return new AllocationCallback(new NoopCloseable(), sleeper);
+  }
+
   /**
    * Callback for Rest request.
    */
-  @RequiredArgsConstructor
-  private class AllocationCallback implements 
Callback<Response<PermitAllocation>> {
+  @VisibleForTesting
+  class AllocationCallback implements Callback<Response<PermitAllocation>> {
     private final Closeable timerContext;
+    private final Sleeper sleeper;
+
+    public AllocationCallback(Closeable timerContext, Sleeper sleeper) {
+      this.timerContext = timerContext;
+      this.sleeper = sleeper;
+    }
 
     @Override
     public void onError(Throwable exc) {
@@ -298,13 +334,26 @@ class BatchedPermitsRequester {
           BatchedPermitsRequester.this.retryStatus.blockRetries(retryDelay, 
null);
         }
 
+        long waitForUse = 
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT);
+        if (waitForUse > 0) {
+          this.sleeper.sleep(waitForUse);
+        }
+
+        if (allocation.getUnsatisfiablePermits(GetMode.DEFAULT) > 0) {
+          BatchedPermitsRequester.this.knownUnsatisfiablePermits = 
allocation.getUnsatisfiablePermits(GetMode.DEFAULT);
+        }
+
         if (allocation.getPermits() > 0) {
           
BatchedPermitsRequester.this.permitBatchContainer.addPermitAllocation(allocation);
         }
+
         BatchedPermitsRequester.this.requestSemaphore.release();
         if (allocation.getPermits() > 0) {
           BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
         }
+      } catch (InterruptedException ie) {
+        // Thread was interrupted while waiting for permits to be usable. 
Permits are not yet usable, so will not
+        // add permits to container
       } finally {
         try {
           this.timerContext.close();
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
index b02f545..0fb487f 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.util.Sleeper;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.testng.Assert;
@@ -35,7 +36,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Queues;
 import com.linkedin.common.callback.Callback;
 import com.linkedin.restli.client.Response;
-import com.linkedin.restli.client.RestClient;
 import com.linkedin.restli.client.RestLiResponseException;
 import com.linkedin.restli.common.HttpStatus;
 
@@ -158,6 +158,36 @@ public class BatchedPermitsRequesterTest {
     }
   }
 
+  @Test
+  public void testWaitToUsePermits() throws Exception {
+    Queue<RequestAndCallback> queue = Queues.newArrayDeque();
+
+    BatchedPermitsRequester container = 
BatchedPermitsRequester.builder().resourceId("resource")
+        .requestorIdentifier("requestor").requestSender(new 
TestRequestSender(queue, false)).build();
+
+    Sleeper.MockSleeper mockWaiter = new Sleeper.MockSleeper();
+    BatchedPermitsRequester.AllocationCallback callback = 
container.createAllocationCallback(mockWaiter);
+
+    PermitAllocation allocation = new PermitAllocation();
+    allocation.setPermits(10);
+    allocation.setWaitForPermitUseMillis(20);
+    allocation.setExpiration(Long.MAX_VALUE);
+
+    Response<PermitAllocation> response = Mockito.mock(Response.class);
+    Mockito.when(response.getEntity()).thenReturn(allocation);
+
+    callback.onSuccess(response);
+    Assert.assertEquals((long) mockWaiter.getRequestedSleeps().peek(), 20);
+    
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
 10);
+
+    // A zero wait will not trigger a wait in the requester
+    allocation.setWaitForPermitUseMillis(0);
+    mockWaiter.reset();
+    callback.onSuccess(response);
+    Assert.assertTrue(mockWaiter.getRequestedSleeps().isEmpty());
+    
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
 20);
+  }
+
   public static class TestRequestSender implements RequestSender {
     private final Queue<RequestAndCallback> requestAndCallbacks;
     private final boolean autoSatisfyRequests;
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
index c0f1d33..c2a3f12 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/DynamicTokenBucket.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -40,6 +41,16 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class DynamicTokenBucket {
 
+  /**
+   * Contains number of allocated permits and delay before they can be used.
+   */
+  @Data
+  public static class PermitsAndDelay {
+    private final long permits;
+    private final long delay;
+    private final boolean possibleToSatisfy;
+  }
+
   @VisibleForTesting
   @Getter
   private final TokenBucket tokenBucket;
@@ -63,39 +74,56 @@ public class DynamicTokenBucket {
    * @param minPermits the minimum number of tokens useful for the calling 
process. If this many tokens cannot be acquired,
    *                   the method will return 0 instead,
    * @param timeoutMillis the maximum wait the calling process is willing to 
wait for tokens.
-   * @return the number of allocated tokens.
+   * @return a {@link PermitsAndDelay} for the allocated permits.
    */
-  public long getPermits(long requestedPermits, long minPermits, long 
timeoutMillis) {
-
+  public PermitsAndDelay getPermitsAndDelay(long requestedPermits, long 
minPermits, long timeoutMillis) {
     try {
       long storedTokens = this.tokenBucket.getStoredTokens();
 
       long eagerTokens = storedTokens / 2;
       if (eagerTokens > requestedPermits && 
this.tokenBucket.getTokens(eagerTokens, 0, TimeUnit.MILLISECONDS)) {
-        return eagerTokens;
+        return new PermitsAndDelay(eagerTokens, 0, true);
       }
 
       long millisToSatisfyMinPermits = (long) (minPermits / 
this.tokenBucket.getTokensPerMilli());
       if (millisToSatisfyMinPermits > timeoutMillis) {
-        return 0;
+        return new PermitsAndDelay(0, 0, false);
       }
       long allowedTimeout = Math.min(millisToSatisfyMinPermits + 
this.baseTimeout, timeoutMillis);
 
       while (requestedPermits > minPermits) {
-        if (this.tokenBucket.getTokens(requestedPermits, allowedTimeout, 
TimeUnit.MILLISECONDS)) {
-          return requestedPermits;
+        long wait = this.tokenBucket.tryReserveTokens(requestedPermits, 
allowedTimeout);
+        if (wait >= 0) {
+          return new PermitsAndDelay(requestedPermits, wait, true);
         }
         requestedPermits /= 2;
       }
 
-      if (this.tokenBucket.getTokens(minPermits, allowedTimeout, 
TimeUnit.MILLISECONDS)) {
-        return minPermits;
+      long wait = this.tokenBucket.tryReserveTokens(minPermits, 
allowedTimeout);
+      if (wait >= 0) {
+        return new PermitsAndDelay(requestedPermits, wait, true);
       }
+
     } catch (InterruptedException ie) {
       // Fallback to returning 0
     }
 
-    return 0;
+    return new PermitsAndDelay(0, 0, true);
+  }
+
+  /**
+   * Request tokens. Like {@link #getPermitsAndDelay(long, long, long)} but 
block until the wait time passes.
+   */
+  public long getPermits(long requestedPermits, long minPermits, long 
timeoutMillis) {
+    PermitsAndDelay permitsAndDelay = getPermitsAndDelay(requestedPermits, 
minPermits, timeoutMillis);
+    if (permitsAndDelay.delay > 0) {
+      try {
+        Thread.sleep(permitsAndDelay.delay);
+      } catch (InterruptedException ie) {
+        return 0;
+      }
+    }
+    return permitsAndDelay.permits;
   }
 
 }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
index 0f06967..099505f 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/LimiterServerResource.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
 import com.linkedin.common.callback.Callback;
 import com.linkedin.common.callback.FutureCallback;
 import com.linkedin.data.DataMap;
+import com.linkedin.data.template.GetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -45,6 +46,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.broker.MetricContextFactory;
 import org.apache.gobblin.metrics.broker.SubTaggedMetricContextKey;
 import org.apache.gobblin.util.NoopCloseable;
+import org.apache.gobblin.util.Sleeper;
 import org.apache.gobblin.util.limiter.Limiter;
 import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
 
@@ -87,6 +89,9 @@ public class LimiterServerResource extends 
ComplexKeyResourceAsyncTemplate<Permi
   @Inject @Named(LEADER_FINDER_INJECT_NAME)
   Optional<LeaderFinder<URIMetadata>> leaderFinderOpt;
 
+  @Inject
+  Sleeper sleeper;
+
   /**
    * Request permits from the limiter server. The returned {@link 
PermitAllocation} specifies the number of permits
    * that the client can use.
@@ -124,6 +129,21 @@ public class LimiterServerResource extends 
ComplexKeyResourceAsyncTemplate<Permi
         try (Closeable thisContext = limiterTimer.time()) {
           allocation = policy.computePermitAllocation(request);
         }
+
+        if (request.getVersion(GetMode.DEFAULT) < 
ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal()) {
+          // If the client does not understand "waitForPermitsUse", delay the 
response at the server side.
+          // This has a detrimental effect to server performance
+          long wait = allocation.getWaitForPermitUseMillis(GetMode.DEFAULT);
+          allocation.setWaitForPermitUseMillis(0);
+          if (wait > 0) {
+            try {
+              this.sleeper.sleep(wait);
+            } catch (InterruptedException ie) {
+              allocation.setPermits(0);
+            }
+          }
+        }
+
         permitsGrantedMeter.mark(allocation.getPermits());
 
         callback.onSuccess(allocation);
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
index dbb05b9..be1ecb8 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/QPSPolicy.java
@@ -91,12 +91,15 @@ public class QPSPolicy implements ThrottlingPolicy {
       minPermits = permitsRequested;
     }
 
-    long permitsGranted = this.tokenBucket.getPermits(permitsRequested, 
minPermits, LimiterServerResource.TIMEOUT_MILLIS);
+    DynamicTokenBucket.PermitsAndDelay permitsGranted =
+        this.tokenBucket.getPermitsAndDelay(permitsRequested, minPermits, 
LimiterServerResource.TIMEOUT_MILLIS);
 
     PermitAllocation allocation = new PermitAllocation();
-    allocation.setPermits(permitsGranted);
+    allocation.setPermits(permitsGranted.getPermits());
     allocation.setExpiration(Long.MAX_VALUE);
-    if (permitsGranted <= 0) {
+    allocation.setWaitForPermitUseMillis(permitsGranted.getDelay());
+    allocation.setUnsatisfiablePermits(request.getMinPermits(GetMode.DEFAULT));
+    if (permitsGranted.getPermits() <= 0) {
       allocation.setMinRetryDelayMillis(LimiterServerResource.TIMEOUT_MILLIS);
     }
     return allocation;
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
index 6c43f84..28c9267 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/ThrottlingGuiceServletConfig.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.broker.MetricContextFactory;
 import org.apache.gobblin.metrics.broker.MetricContextKey;
+import org.apache.gobblin.util.Sleeper;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -82,6 +83,7 @@ public class ThrottlingGuiceServletConfig extends 
GuiceServletContextListener im
 
   private Optional<LeaderFinder<URIMetadata>> _leaderFinder;
   private Config _config;
+  private Sleeper _sleeper = null;
   private Injector _injector;
 
   @Override
@@ -99,6 +101,14 @@ public class ThrottlingGuiceServletConfig extends 
GuiceServletContextListener im
     super.contextInitialized(servletContextEvent);
   }
 
+  /**
+   * Use a mock sleeper for testing. Note this should be called before 
initialization.
+   */
+  public Sleeper.MockSleeper mockSleeper() {
+    this._sleeper = new Sleeper.MockSleeper();
+    return (Sleeper.MockSleeper) this._sleeper;
+  }
+
   public void initialize(Config config) {
     try {
       this._config = config;
@@ -128,9 +138,14 @@ public class ThrottlingGuiceServletConfig extends 
GuiceServletContextListener im
       protected void configure() {
         try {
 
+          if (_sleeper == null) {
+            _sleeper = new Sleeper();
+          }
+
           RestLiConfig restLiConfig = new RestLiConfig();
           
restLiConfig.setResourcePackageNames("org.apache.gobblin.restli.throttling");
           bind(RestLiConfig.class).toInstance(restLiConfig);
+          bind(Sleeper.class).toInstance(_sleeper);
 
           
bind(SharedResourcesBroker.class).annotatedWith(Names.named(LimiterServerResource.BROKER_INJECT_NAME)).toInstance(topLevelBroker);
 
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
index ac21e87..35cb630 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/java/org/apache/gobblin/restli/throttling/TokenBucket.java
@@ -68,10 +68,7 @@ public class TokenBucket {
    */
   public boolean getTokens(long tokens, long timeout, TimeUnit timeoutUnit) 
throws InterruptedException {
     long timeoutMillis = timeoutUnit.toMillis(timeout);
-    long wait;
-    synchronized (this) {
-      wait = tryReserveTokens(tokens, timeoutMillis);
-    }
+    long wait = tryReserveTokens(tokens, timeoutMillis);
 
     if (wait < 0) {
       return false;
@@ -101,7 +98,7 @@ public class TokenBucket {
    *
    * @return the wait until the tokens are available or negative if they can't 
be acquired in the give timeout.
    */
-  private long tryReserveTokens(long tokens, long maxWaitMillis) {
+  synchronized long tryReserveTokens(long tokens, long maxWaitMillis) {
     long now = System.currentTimeMillis();
     long waitUntilNextTokenAvailable = Math.max(0, 
this.nextTokenAvailableMillis - now);
 
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
index 95960e8..9501db3 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/DynamicTokenBucketTest.java
@@ -40,6 +40,23 @@ public class DynamicTokenBucketTest {
   }
 
   @Test
+  public void testDelegateSleep() throws Exception {
+    int qps = 10;
+    DynamicTokenBucket limiter = new DynamicTokenBucket(qps, 10, 0);
+
+    long startTime = System.currentTimeMillis();
+    // Requesting 10 seconds worth of permits with 20 second timeout
+    DynamicTokenBucket.PermitsAndDelay permitsAndDelay = 
limiter.getPermitsAndDelay(10 * qps, 10 * qps, 20000);
+    long elapsed = System.currentTimeMillis() - startTime;
+    // verify call returned immediately
+    Assert.assertTrue(elapsed < 1000);
+
+    // Verify we got expected tokens and delay is about 10 seconds
+    Assert.assertEquals(permitsAndDelay.getPermits(), 10 * qps);
+    Assert.assertTrue(permitsAndDelay.getDelay() > 9000 && 
permitsAndDelay.getDelay() < 11000);
+  }
+
+  @Test
   public void testEagerGrantingIfUnderused() throws Exception {
     int qps = 100;
     DynamicTokenBucket limiter = new DynamicTokenBucket(qps, 10, 100);
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
index efac2a2..bc538ec 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
@@ -19,15 +19,19 @@ package org.apache.gobblin.restli.throttling;
 
 import java.util.Map;
 
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.util.Sleeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.codahale.metrics.Timer;
 import com.google.inject.Injector;
+import com.linkedin.data.template.GetMode;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.RestLiServiceException;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
@@ -57,6 +61,53 @@ public class LimiterServerResourceTest {
   }
 
   @Test
+  public void testSleepOnClientDelegation() {
+
+    ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
+    SharedLimiterKey res1key = new SharedLimiterKey("res1");
+
+    Map<String, String> configMap = ImmutableMap.<String, String>builder()
+        .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key, 
null, ThrottlingPolicyFactory.POLICY_KEY),
+            TestWaitPolicy.class.getName())
+        .build();
+
+    ThrottlingGuiceServletConfig guiceServletConfig = new 
ThrottlingGuiceServletConfig();
+    Sleeper.MockSleeper sleeper = guiceServletConfig.mockSleeper();
+    guiceServletConfig.initialize(ConfigFactory.parseMap(configMap));
+    Injector injector = guiceServletConfig.getInjector();
+
+    LimiterServerResource limiterServer = 
injector.getInstance(LimiterServerResource.class);
+
+    PermitRequest request = new PermitRequest();
+    request.setPermits(5);
+    request.setResource(res1key.getResourceLimitedPath());
+    request.setVersion(ThrottlingProtocolVersion.BASE.ordinal());
+
+    // policy does not require sleep, verify no sleep happened or is requested 
from client
+    PermitAllocation allocation = limiterServer.getSync(new 
ComplexResourceKey<>(request, new EmptyRecord()));
+    Assert.assertEquals((long) allocation.getPermits(), 5);
+    Assert.assertEquals((long) 
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 0);
+    Assert.assertTrue(sleeper.getRequestedSleeps().isEmpty());
+
+    // policy requests a sleep of 10 millis, using BASE protocol version, 
verify server executes the sleep
+    request.setPermits(20);
+    request.setVersion(ThrottlingProtocolVersion.BASE.ordinal());
+    allocation = limiterServer.getSync(new ComplexResourceKey<>(request, new 
EmptyRecord()));
+    Assert.assertEquals((long) allocation.getPermits(), 20);
+    Assert.assertEquals((long) 
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 0);
+    Assert.assertEquals((long) sleeper.getRequestedSleeps().peek(), 10);
+    sleeper.reset();
+
+    // policy requests a sleep of 10 millis, using WAIT_ON_CLIENT protocol 
version, verify server delegates sleep to client
+    request.setVersion(ThrottlingProtocolVersion.WAIT_ON_CLIENT.ordinal());
+    request.setPermits(20);
+    allocation = limiterServer.getSync(new ComplexResourceKey<>(request, new 
EmptyRecord()));
+    Assert.assertEquals((long) allocation.getPermits(), 20);
+    Assert.assertEquals((long) 
allocation.getWaitForPermitUseMillis(GetMode.DEFAULT), 10);
+    Assert.assertTrue(sleeper.getRequestedSleeps().isEmpty());
+  }
+
+    @Test
   public void testLimitedRequests() {
 
     ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
@@ -141,4 +192,33 @@ public class LimiterServerResourceTest {
     Assert.assertEquals(timer.getCount(), 3);
   }
 
+  public static class TestWaitPolicy implements ThrottlingPolicy, 
ThrottlingPolicyFactory.SpecificPolicyFactory {
+    @Override
+    public PermitAllocation computePermitAllocation(PermitRequest request) {
+
+      PermitAllocation allocation = new PermitAllocation();
+      allocation.setPermits(request.getPermits());
+      if (request.getPermits() > 10) {
+        allocation.setWaitForPermitUseMillis(10);
+      }
+      return allocation;
+    }
+
+    @Override
+    public Map<String, String> getParameters() {
+      return null;
+    }
+
+    @Override
+    public String getDescription() {
+      return null;
+    }
+
+    @Override
+    public ThrottlingPolicy createPolicy(SharedLimiterKey sharedLimiterKey,
+        SharedResourcesBroker<ThrottlingServerScopes> broker, Config config) {
+      return this;
+    }
+  }
+
 }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java
new file mode 100644
index 0000000..60c308b
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/Sleeper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import lombok.Getter;
+
+
+/**
+ * A class surrounding {@link Thread#sleep(long)} that allows mocking sleeps 
during testing.
+ */
+public class Sleeper {
+
+  /**
+   * A mock version of {@link Sleeper} that just register calls to sleep but 
returns immediately.
+   */
+  @Getter
+  public static class MockSleeper extends Sleeper {
+    private Queue<Long> requestedSleeps = new LinkedList<>();
+
+    @Override
+    public void sleep(long millis) {
+      this.requestedSleeps.add(millis);
+    }
+
+    public void reset() {
+      this.requestedSleeps.clear();
+    }
+  }
+
+  /**
+   * Equivalent to {@link Thread#sleep(long)}.
+   */
+  public void sleep(long millis) throws InterruptedException {
+    Thread.sleep(millis);
+  }
+}

Reply via email to