This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0735d [GOBBLIN-764] Allow injection of Rest.li configurations for
throttling client and fixed unit test.
4c0735d is described below
commit 4c0735dfe2055c8fc4da87e9871ea4dec01196e7
Author: ibuenros <[email protected]>
AuthorDate: Mon May 6 10:39:20 2019 -0700
[GOBBLIN-764] Allow injection of Rest.li configurations for throttling
client and fixed unit test.
Closes #2628 from ibuenros/limiter-timeout
---
.../org/apache/gobblin/restli/SharedRestClientFactory.java | 13 +++++++++++--
.../gobblin/util/limiter/BatchedPermitsRequester.java | 5 +++++
.../apache/gobblin/util/limiter/RestliLimiterFactory.java | 5 +++++
.../gobblin/util/limiter/RestliServiceBasedLimiter.java | 5 +++--
.../gobblin/util/limiter/BatchedPermitsRequesterTest.java | 10 ++++++++--
.../gobblin/util/limiter/broker/SharedLimiterFactory.java | 2 ++
6 files changed, 34 insertions(+), 6 deletions(-)
diff --git
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
index 7e04d59..4b29a57 100644
---
a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
+++
b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/SharedRestClientFactory.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.restli;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
+import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.linkedin.r2.filter.FilterChains;
import com.linkedin.r2.transport.common.Client;
@@ -93,7 +95,14 @@ public class SharedRestClientFactory<S extends ScopeType<S>>
implements SharedRe
Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newDaemonThreadFactory(Optional.<Logger>absent(),
Optional.of("R2 Netty Scheduler"))),
true);
- Client r2Client = new
TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap()));
+
+ Properties props = ConfigUtils.configToProperties(config.getConfig());
+ if (!props.containsKey(HttpClientFactory.HTTP_REQUEST_TIMEOUT)) {
+ // Rest.li changed the default timeout from 10s to 1s. Since some
clients (e.g. throttling) relied on the longer
+ // timeout, override this property uless set by the user explicitly
+ props.setProperty(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
+ }
+ Client r2Client = new
TransportClientAdapter(http.getClient(Maps.fromProperties(props)));
return new ResourceInstance<>(new RestClient(r2Client,uriPrefix));
} catch (URISyntaxException use) {
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
index 91f607e..338d4a1 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -252,6 +252,11 @@ class BatchedPermitsRequester {
}
}
+ @VisibleForTesting
+ synchronized boolean reserveSemaphore() {
+ return this.requestSemaphore.tryAcquire();
+ }
+
private synchronized void clearSemaphore() {
if (this.requestSemaphore.availablePermits() > 0) {
throw new IllegalStateException("Semaphore should have 0 permits!");
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
index 1e5a4c9..62758ef 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliLimiterFactory.java
@@ -49,6 +49,7 @@ public class RestliLimiterFactory<S extends ScopeType<S>>
public static final String FACTORY_NAME = "limiter.restli";
public static final String RESTLI_SERVICE_NAME = "throttling";
public static final String SERVICE_IDENTIFIER_KEY = "serviceId";
+ public static final String PERMIT_REQUEST_TIMEOUT =
"permitRequestTimeoutMillis";
@Override
public String getName() {
@@ -72,12 +73,16 @@ public class RestliLimiterFactory<S extends ScopeType<S>>
new
SubTaggedMetricContextKey(RestliServiceBasedLimiter.class.getSimpleName() + "_"
+ resourceLimited,
ImmutableMap.of("resourceLimited", resourceLimited));
+ long permitRequestTimeout =
config.getConfig().hasPath(PERMIT_REQUEST_TIMEOUT)
+ ? config.getConfig().getLong(PERMIT_REQUEST_TIMEOUT) : 0L;
+
return new ResourceInstance<>(
RestliServiceBasedLimiter.builder()
.resourceLimited(resourceLimited)
.serviceIdentifier(serviceIdentifier)
.metricContext(broker.getSharedResource(new
MetricContextFactory<S>(), metricContextKey))
.requestSender(broker.getSharedResource(new
RedirectAwareRestClientRequestSender.Factory<S>(), new
SharedRestClientKey(RESTLI_SERVICE_NAME)))
+ .permitRequestTimeoutMillis(permitRequestTimeout)
.build()
);
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
index fc0dc9b..d27577b 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/org/apache/gobblin/util/limiter/RestliServiceBasedLimiter.java
@@ -50,11 +50,12 @@ public class RestliServiceBasedLimiter implements Limiter {
@Builder
private RestliServiceBasedLimiter(String resourceLimited, String
serviceIdentifier,
- MetricContext metricContext, RequestSender requestSender) {
+ MetricContext metricContext, RequestSender requestSender, long
permitRequestTimeoutMillis) {
Preconditions.checkNotNull(requestSender, "Request sender cannot be
null.");
this.bachedPermitsContainer = BatchedPermitsRequester.builder()
-
.resourceId(resourceLimited).requestorIdentifier(serviceIdentifier).requestSender(requestSender).build();
+
.resourceId(resourceLimited).requestorIdentifier(serviceIdentifier).requestSender(requestSender)
+ .maxTimeoutMillis(permitRequestTimeoutMillis).build();
this.metricContext = Optional.fromNullable(metricContext);
if (this.metricContext.isPresent()) {
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
index 0fb487f..cee8f70 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/util/limiter/BatchedPermitsRequesterTest.java
@@ -119,7 +119,8 @@ public class BatchedPermitsRequesterTest {
Queue<RequestAndCallback> queue = Queues.newArrayDeque();
BatchedPermitsRequester container =
BatchedPermitsRequester.builder().resourceId("resource")
- .requestorIdentifier("requestor").requestSender(new
TestRequestSender(queue, false)).build();
+ .requestorIdentifier("requestor").requestSender(new
TestRequestSender(queue, false))
+ .maxTimeoutMillis(1000).build();
try (ParallelRequester requester = new ParallelRequester(container)) {
Future<Boolean> future = requester.request(10);
@@ -143,7 +144,8 @@ public class BatchedPermitsRequesterTest {
Queue<RequestAndCallback> queue = Queues.newArrayDeque();
BatchedPermitsRequester container =
BatchedPermitsRequester.builder().resourceId("resource")
- .requestorIdentifier("requestor").requestSender(new
TestRequestSender(queue, false)).build();
+ .requestorIdentifier("requestor").requestSender(new
TestRequestSender(queue, false))
+ .maxTimeoutMillis(1000).build();
try (ParallelRequester requester = new ParallelRequester(container)) {
Future<Boolean> future = requester.request(10);
@@ -176,6 +178,10 @@ public class BatchedPermitsRequesterTest {
Response<PermitAllocation> response = Mockito.mock(Response.class);
Mockito.when(response.getEntity()).thenReturn(allocation);
+ // Normally the semaphore is reserved during a request. Since we're
mocking a response without ever starting a request,
+ // manually reserve the semaphore
+ Assert.assertTrue(container.reserveSemaphore());
+
callback.onSuccess(response);
Assert.assertEquals((long) mockWaiter.getRequestedSleeps().peek(), 20);
Assert.assertEquals(container.getPermitBatchContainer().getTotalAvailablePermits(),
10);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
index 6f398c6..3ce2724 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/limiter/broker/SharedLimiterFactory.java
@@ -81,6 +81,7 @@ public class SharedLimiterFactory<S extends ScopeType<S>>
implements SharedResou
if (ConfigUtils.getBoolean(config, SKIP_GLOBAL_LIMITER_KEY, false)) {
if (globalLimiterPolicy !=
SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY) {
+ log.info("Skip global limiter is set. Switching to local only
limiter.");
SharedLimiterKey modifiedKey = new
SharedLimiterKey(configView.getKey().getResourceLimitedPath(),
SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY);
return new ResourceCoordinate<>(this, modifiedKey, (S)
configView.getScope());
@@ -98,6 +99,7 @@ public class SharedLimiterFactory<S extends ScopeType<S>>
implements SharedResou
if (!configView.getScope().isLocal() &&
!globalLimiterPolicy.equals(SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY)) {
try {
+ log.info("Looking for Restli Limiter factory.");
Class<?> klazz =
Class.forName("org.apache.gobblin.util.limiter.RestliLimiterFactory");
return new ResourceCoordinate<>((SharedResourceFactory<Limiter,
SharedLimiterKey, S>) klazz.newInstance(),
configView.getKey(), (S) configView.getScope());