Repository: incubator-gobblin Updated Branches: refs/heads/master 725a0829d -> a080ad843
Allow disabling global throttling. Fix a race condition in BatchedPermitsRequester. Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/82b5b2d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/82b5b2d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/82b5b2d1 Branch: refs/heads/master Commit: 82b5b2d12d86ca8819a678bf3403495bfd2683e7 Parents: 5459609 Author: ibuenros <[email protected]> Authored: Wed May 17 10:18:41 2017 -0700 Committer: ibuenros <[email protected]> Committed: Wed May 17 10:18:41 2017 -0700 ---------------------------------------------------------------------- .../util/limiter/BatchedPermitsRequester.java | 11 +- .../RedirectAwareRestClientRequestSender.java | 5 +- .../util/limiter/RestliLimiterFactoryTest.java | 121 +++++++++++++++++++ .../limiter/broker/SharedLimiterFactory.java | 14 ++- 4 files changed, 146 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java index 2e6e97e..9ea8c50 100644 --- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java +++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java @@ -105,6 +105,7 @@ class BatchedPermitsRequester { private final RetryStatus retryStatus; private final SynchronizedAverager permitsOutstanding; private final long targetMillisBetweenRequests; + private final AtomicLong callbackCounter; @Builder private BatchedPermitsRequester(String resourceId, String requestorIdentifier, @@ -131,6 +132,7 @@ class BatchedPermitsRequester { this.restRequestTimer = metricContext == null ? null : metricContext.timer(REST_REQUEST_TIMER); this.restRequestHistogram = metricContext == null ? null : metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM); + this.callbackCounter = new AtomicLong(); } /** @@ -150,8 +152,13 @@ class BatchedPermitsRequester { return true; } if (this.retryStatus.canRetryWithinMillis(10000)) { + long callbackCounterSnap = this.callbackCounter.get(); maybeSendNewPermitRequest(); - this.newPermitsAvailable.await(); + if (this.callbackCounter.get() == callbackCounterSnap) { + // If a callback has happened since we tried to send the new permit request, don't await + // Since some request senders may be synchronous, we would have missed the notification + this.newPermitsAvailable.await(); + } } else { break; } @@ -279,6 +286,7 @@ class BatchedPermitsRequester { @Override public void onSuccess(Response<PermitAllocation> result) { BatchedPermitsRequester.this.retries = 0; + BatchedPermitsRequester.this.callbackCounter.incrementAndGet(); BatchedPermitsRequester.this.lock.lock(); try { PermitAllocation allocation = result.getEntity(); @@ -309,6 +317,7 @@ class BatchedPermitsRequester { private void nonRetriableFail(Throwable exc, String msg) { BatchedPermitsRequester.this.retryStatus.blockRetries(RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION, exc); + BatchedPermitsRequester.this.callbackCounter.incrementAndGet(); BatchedPermitsRequester.this.requestSemaphore.release(); log.error(msg, exc); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java index 752c933..f7bd631 100644 --- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java +++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java @@ -61,14 +61,14 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende * A {@link SharedResourceFactory} that creates {@link RedirectAwareRestClientRequestSender}s. * @param <S> */ - public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RedirectAwareRestClientRequestSender, SharedRestClientKey, S> { + public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RequestSender, SharedRestClientKey, S> { @Override public String getName() { return SharedRestClientFactory.FACTORY_NAME; } @Override - public SharedResourceFactoryResponse<RedirectAwareRestClientRequestSender> createResource( + public SharedResourceFactoryResponse<RequestSender> createResource( SharedResourcesBroker<S> broker, ScopedConfigView<S, SharedRestClientKey> config) throws NotConfiguredException { try { @@ -177,7 +177,6 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende if (this.retries > RedirectAwareRestClientRequestSender.this.connectionPrefixes.size()) { this.underlying.onError(new NonRetriableException("Failed to connect to all available connection prefixes.")); } - log.info("Retries " + this.retries + " this " + hashCode()); updateRestClient(getNextConnectionPrefix(), "Failed to communicate with " + getCurrentServerPrefix()); this.exponentialBackoff.awaitNextRetry(); sendRequest(this.originalRequest, this); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java new file mode 100644 index 0000000..951612f --- /dev/null +++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gobblin.util.limiter; + +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.linkedin.common.callback.Callback; +import com.linkedin.restli.client.Response; +import com.typesafe.config.ConfigFactory; + +import gobblin.broker.BrokerConfigurationKeyGenerator; +import gobblin.broker.SharedResourcesBrokerFactory; +import gobblin.broker.SimpleScopeType; +import gobblin.broker.iface.SharedResourcesBroker; +import gobblin.restli.SharedRestClientKey; +import gobblin.restli.throttling.PermitAllocation; +import gobblin.restli.throttling.PermitRequest; +import gobblin.util.limiter.broker.SharedLimiterFactory; +import gobblin.util.limiter.broker.SharedLimiterKey; + + +public class RestliLimiterFactoryTest { + + @Test + public void testFactory() throws Exception { + SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance()); + + MyRequestSender requestSender = new MyRequestSender(); + + broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(), + new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender); + RestliServiceBasedLimiter limiter = + broker.getSharedResource(new RestliLimiterFactory<>(), new SharedLimiterKey("my/resource")); + + Assert.assertNotNull(limiter.acquirePermits(10)); + Assert.assertEquals(requestSender.requestList.size(), 1); + + broker.close(); + } + + @Test + public void testRestliLimiterCalledByLimiterFactory() throws Exception { + SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance()); + + MyRequestSender requestSender = new MyRequestSender(); + + broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(), + new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender); + Limiter limiter = + broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource")); + + Assert.assertNotNull(limiter.acquirePermits(10)); + Assert.assertEquals(requestSender.requestList.size(), 1); + + broker.close(); + } + + @Test + public void testSkipGlobalLimiterOnLimiterFactory() throws Exception { + Map<String, String> configMap = ImmutableMap.of( + BrokerConfigurationKeyGenerator.generateKey(new SharedLimiterFactory(), null, null, SharedLimiterFactory.SKIP_GLOBAL_LIMITER_KEY), "true" + ); + + SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + ConfigFactory.parseMap(configMap), SimpleScopeType.GLOBAL.defaultScopeInstance()); + + MyRequestSender requestSender = new MyRequestSender(); + + broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(), + new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL, requestSender); + Limiter limiter = + broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource")); + + Assert.assertNotNull(limiter.acquirePermits(10)); + Assert.assertEquals(requestSender.requestList.size(), 0); + + broker.close(); + } + + public static class MyRequestSender implements RequestSender { + List<PermitRequest> requestList = Lists.newArrayList(); + + @Override + public void sendRequest(PermitRequest request, Callback<Response<PermitAllocation>> callback) { + this.requestList.add(request); + + PermitAllocation permitAllocation = new PermitAllocation(); + permitAllocation.setPermits(request.getPermits()); + permitAllocation.setExpiration(Long.MAX_VALUE); + + Response<PermitAllocation> response = Mockito.mock(Response.class); + Mockito.when(response.getEntity()).thenReturn(permitAllocation); + callback.onSuccess(response); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java index 0d0c5f1..0ae45a6 100644 --- a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java +++ b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java @@ -32,6 +32,7 @@ import gobblin.broker.iface.SharedResourcesBroker; import gobblin.broker.iface.SharedResourceFactoryResponse; import gobblin.broker.ResourceCoordinate; import gobblin.util.ClassAliasResolver; +import gobblin.util.ConfigUtils; import gobblin.util.limiter.Limiter; import gobblin.util.limiter.LimiterFactory; import gobblin.util.limiter.MultiLimiter; @@ -57,6 +58,11 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements SharedResou public static final String NAME = "limiter"; public static final String LIMITER_CLASS_KEY = "class"; public static final String FAIL_IF_NO_GLOBAL_LIMITER_KEY = "failIfNoGlobalLimiter"; + /** + * Skip use of global limiter. In general, this should not be used, but it is provided to easily disable global limiters + * in case of issues with the coordination server. + */ + public static final String SKIP_GLOBAL_LIMITER_KEY = "skipGlobalLimiter"; public static final String FAIL_ON_UNKNOWN_RESOURCE_ID = "faiOnUnknownResourceId"; private static final ClassAliasResolver<LimiterFactory> RESOLVER = new ClassAliasResolver<>(LimiterFactory.class); @@ -74,7 +80,13 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements SharedResou Config config = configView.getConfig(); SharedLimiterKey.GlobalLimiterPolicy globalLimiterPolicy = configView.getKey().getGlobalLimiterPolicy(); - if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && + if (ConfigUtils.getBoolean(config, SKIP_GLOBAL_LIMITER_KEY, false)) { + if (globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY) { + SharedLimiterKey modifiedKey = new SharedLimiterKey(configView.getKey().getResourceLimitedPath(), + SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY); + return new ResourceCoordinate<>(this, modifiedKey, (S) configView.getScope()); + } + } else if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.USE_GLOBAL) { // if user has specified FAIL_IF_NO_GLOBAL_LIMITER_KEY, promote the policy from USE_GLOBAL_IF_CONFIGURED to USE_GLOBAL // e.g. fail if no GLOBAL configuration is present
