This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0735d  [GOBBLIN-764] Allow injection of Rest.li configurations for 
throttling client and fixed unit test.
4c0735d is described below

commit 4c0735dfe2055c8fc4da87e9871ea4dec01196e7
Author: ibuenros <[email protected]>
AuthorDate: Mon May 6 10:39:20 2019 -0700

    [GOBBLIN-764] Allow injection of Rest.li configurations for throttling 
client and fixed unit test.
    
    Closes #2628 from ibuenros/limiter-timeout
---
 .../org/apache/gobblin/restli/SharedRestClientFactory.java  | 13 +++++++++++--
 .../gobblin/util/limiter/BatchedPermitsRequester.java       |  5 +++++
 .../apache/gobblin/util/limiter/RestliLimiterFactory.java   |  5 +++++
 .../gobblin/util/limiter/RestliServiceBasedLimiter.java     |  5 +++--
 .../gobblin/util/limiter/BatchedPermitsRequesterTest.java   | 10 ++++++++--
 .../gobblin/util/limiter/broker/SharedLimiterFactory.java   |  2 ++
 6 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
index 7e04d59..4b29a57 100644
--- 
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
+++ 
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.restli;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Executors;
 
+import org.apache.gobblin.util.ConfigUtils;
 import org.slf4j.Logger;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.linkedin.r2.filter.FilterChains;
 import com.linkedin.r2.transport.common.Client;
@@ -93,7 +95,14 @@ public class SharedRestClientFactory<S extends ScopeType<S>> 
implements SharedRe
           Executors.newSingleThreadScheduledExecutor(
               ExecutorsUtils.newDaemonThreadFactory(Optional.<Logger>absent(), 
Optional.of("R2 Netty Scheduler"))),
           true);
-      Client r2Client = new 
TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap()));
+
+      Properties props = ConfigUtils.configToProperties(config.getConfig());
+      if (!props.containsKey(HttpClientFactory.HTTP_REQUEST_TIMEOUT)) {
+        // Rest.li changed the default timeout from 10s to 1s. Since some 
clients (e.g. throttling) relied on the longer
+        // timeout, override this property uless set by the user explicitly
+        props.setProperty(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
+      }
+      Client r2Client = new 
TransportClientAdapter(http.getClient(Maps.fromProperties(props)));
 
       return new ResourceInstance<>(new RestClient(r2Client,uriPrefix));
     } catch (URISyntaxException use) {
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index 91f607e..338d4a1 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -252,6 +252,11 @@ class BatchedPermitsRequester {
     }
   }
 
+  @VisibleForTesting
+  synchronized boolean reserveSemaphore() {
+    return this.requestSemaphore.tryAcquire();
+  }
+
   private synchronized void clearSemaphore() {
     if (this.requestSemaphore.availablePermits() > 0) {
       throw new IllegalStateException("Semaphore should have 0 permits!");
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
index 1e5a4c9..62758ef 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
@@ -49,6 +49,7 @@ public class RestliLimiterFactory<S extends ScopeType<S>>
   public static final String FACTORY_NAME = "limiter.restli";
   public static final String RESTLI_SERVICE_NAME = "throttling";
   public static final String SERVICE_IDENTIFIER_KEY = "serviceId";
+  public static final String PERMIT_REQUEST_TIMEOUT = 
"permitRequestTimeoutMillis";
 
   @Override
   public String getName() {
@@ -72,12 +73,16 @@ public class RestliLimiterFactory<S extends ScopeType<S>>
         new 
SubTaggedMetricContextKey(RestliServiceBasedLimiter.class.getSimpleName() + "_" 
+ resourceLimited,
         ImmutableMap.of("resourceLimited", resourceLimited));
 
+    long permitRequestTimeout = 
config.getConfig().hasPath(PERMIT_REQUEST_TIMEOUT)
+        ? config.getConfig().getLong(PERMIT_REQUEST_TIMEOUT) : 0L;
+
     return new ResourceInstance<>(
         RestliServiceBasedLimiter.builder()
             .resourceLimited(resourceLimited)
             .serviceIdentifier(serviceIdentifier)
             .metricContext(broker.getSharedResource(new 
MetricContextFactory<S>(), metricContextKey))
             .requestSender(broker.getSharedResource(new 
RedirectAwareRestClientRequestSender.Factory<S>(), new 
SharedRestClientKey(RESTLI_SERVICE_NAME)))
+            .permitRequestTimeoutMillis(permitRequestTimeout)
             .build()
     );
   }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
index fc0dc9b..d27577b 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
@@ -50,11 +50,12 @@ public class RestliServiceBasedLimiter implements Limiter {
 
   @Builder
   private RestliServiceBasedLimiter(String resourceLimited, String 
serviceIdentifier,
-      MetricContext metricContext, RequestSender requestSender) {
+      MetricContext metricContext, RequestSender requestSender, long 
permitRequestTimeoutMillis) {
     Preconditions.checkNotNull(requestSender, "Request sender cannot be 
null.");
 
     this.bachedPermitsContainer = BatchedPermitsRequester.builder()
-        
.resourceId(resourceLimited).requestorIdentifier(serviceIdentifier).requestSender(requestSender).build();
+        
.resourceId(resourceLimited).requestorIdentifier(serviceIdentifier).requestSender(requestSender)
+        .maxTimeoutMillis(permitRequestTimeoutMillis).build();
 
     this.metricContext = Optional.fromNullable(metricContext);
     if (this.metricContext.isPresent()) {
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
index 0fb487f..cee8f70 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
@@ -119,7 +119,8 @@ public class BatchedPermitsRequesterTest {
     Queue<RequestAndCallback> queue = Queues.newArrayDeque();
 
     BatchedPermitsRequester container = 
BatchedPermitsRequester.builder().resourceId("resource")
-        .requestorIdentifier("requestor").requestSender(new 
TestRequestSender(queue, false)).build();
+        .requestorIdentifier("requestor").requestSender(new 
TestRequestSender(queue, false))
+        .maxTimeoutMillis(1000).build();
     try (ParallelRequester requester = new ParallelRequester(container)) {
 
       Future<Boolean> future = requester.request(10);
@@ -143,7 +144,8 @@ public class BatchedPermitsRequesterTest {
     Queue<RequestAndCallback> queue = Queues.newArrayDeque();
 
     BatchedPermitsRequester container = 
BatchedPermitsRequester.builder().resourceId("resource")
-        .requestorIdentifier("requestor").requestSender(new 
TestRequestSender(queue, false)).build();
+        .requestorIdentifier("requestor").requestSender(new 
TestRequestSender(queue, false))
+        .maxTimeoutMillis(1000).build();
     try (ParallelRequester requester = new ParallelRequester(container)) {
 
       Future<Boolean> future = requester.request(10);
@@ -176,6 +178,10 @@ public class BatchedPermitsRequesterTest {
     Response<PermitAllocation> response = Mockito.mock(Response.class);
     Mockito.when(response.getEntity()).thenReturn(allocation);
 
+    // Normally the semaphore is reserved during a request. Since we're 
mocking a response without ever starting a request,
+    // manually reserve the semaphore
+    Assert.assertTrue(container.reserveSemaphore());
+
     callback.onSuccess(response);
     Assert.assertEquals((long) mockWaiter.getRequestedSleeps().peek(), 20);
     
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
 10);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
index 6f398c6..3ce2724 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
@@ -81,6 +81,7 @@ public class SharedLimiterFactory<S extends ScopeType<S>> 
implements SharedResou
 
     if (ConfigUtils.getBoolean(config, SKIP_GLOBAL_LIMITER_KEY, false)) {
       if (globalLimiterPolicy != 
SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY) {
+        log.info("Skip global limiter is set. Switching to local only 
limiter.");
         SharedLimiterKey modifiedKey = new 
SharedLimiterKey(configView.getKey().getResourceLimitedPath(),
             SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY);
         return new ResourceCoordinate<>(this, modifiedKey, (S) 
configView.getScope());
@@ -98,6 +99,7 @@ public class SharedLimiterFactory<S extends ScopeType<S>> 
implements SharedResou
 
     if (!configView.getScope().isLocal() && 
!globalLimiterPolicy.equals(SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY)) {
       try {
+        log.info("Looking for Restli Limiter factory.");
         Class<?> klazz = 
Class.forName("org.apache.gobblin.util.limiter.RestliLimiterFactory");
         return new ResourceCoordinate<>((SharedResourceFactory<Limiter, 
SharedLimiterKey, S>) klazz.newInstance(),
             configView.getKey(), (S) configView.getScope());

Reply via email to