Repository: incubator-gobblin Updated Branches: refs/heads/master b6e88fd06 -> 0e5561519
[GOBBLIN-287] Adding a service name to the resource name for multiple throttling quotas Closes #2142 from jdintruff/gobblin-287 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0e556151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0e556151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0e556151 Branch: refs/heads/master Commit: 0e556151964217332f52e88bb28940095341a6d4 Parents: b6e88fd Author: Jack Dintruff <[email protected]> Authored: Tue Nov 7 08:55:19 2017 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Tue Nov 7 08:55:19 2017 -0800 ---------------------------------------------------------------------- .../src/main/webapp/WEB-INF/web.xml | 12 +++++++++++- .../gobblin/util/RateControlledFileSystem.java | 2 +- .../util/filesystem/FileSystemLimiterKey.java | 10 ++++++++-- .../util/filesystem/ThrottledFileSystem.java | 16 ++++++++++++---- .../util/filesystem/ThrottledFileSystemTest.java | 5 +++-- 5 files changed, 35 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e556151/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/webapp/WEB-INF/web.xml b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/webapp/WEB-INF/web.xml index 72cdde4..ce67a17 100644 --- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/webapp/WEB-INF/web.xml +++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/main/webapp/WEB-INF/web.xml @@ -30,7 +30,7 @@ </filter-mapping> <listener> - <listener-class>gobblin.restli.throttling.ThrottlingGuiceServletConfig</listener-class> + <listener-class>org.apache.gobblin.restli.throttling.ThrottlingGuiceServletConfig</listener-class> </listener> <context-param> @@ -43,4 +43,14 @@ <param-value>100000000</param-value> </context-param> + <context-param> + <param-name>gobblin.broker.limiter.filesystem.localhost.localhost.other.qps</param-name> + <param-value>10</param-value> + </context-param> + + <context-param> + <param-name>gobblin.broker.limiter.filesystem.localhost.localhost.etl.qps</param-name> + <param-value>5</param-value> + </context-param> + </web-app> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e556151/gobblin-utility/src/main/java/org/apache/gobblin/util/RateControlledFileSystem.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/RateControlledFileSystem.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/RateControlledFileSystem.java index f9c1b7d..69e620a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/RateControlledFileSystem.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/RateControlledFileSystem.java @@ -78,7 +78,7 @@ public class RateControlledFileSystem extends ThrottledFileSystem { } public RateControlledFileSystem(FileSystem fs, final long limitPerSecond) { - super(fs, null); + super(fs, null, null); this.limitPerSecond = limitPerSecond; this.callableLimiter = new Callable<RateBasedLimiter>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e556151/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemLimiterKey.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemLimiterKey.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemLimiterKey.java index 601cbb3..551ec20 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemLimiterKey.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/FileSystemLimiterKey.java @@ -19,10 +19,10 @@ package org.apache.gobblin.util.filesystem; import java.net.URI; +import com.google.common.base.Strings; import org.apache.gobblin.util.ClustersNames; import org.apache.gobblin.util.limiter.broker.SharedLimiterKey; - /** * {@link SharedLimiterKey} used for NameNode throttling. */ @@ -30,10 +30,16 @@ public class FileSystemLimiterKey extends SharedLimiterKey { public static final String RESOURCE_LIMITED_PREFIX = "filesystem"; private final URI uri; + public final String serviceName; public FileSystemLimiterKey(URI uri) { - super(RESOURCE_LIMITED_PREFIX + "/" + getFSIdentifier(uri)); + this(uri, null); + } + + public FileSystemLimiterKey(URI uri, String serviceName) { + super(RESOURCE_LIMITED_PREFIX + "/" + getFSIdentifier(uri) + (Strings.isNullOrEmpty(serviceName) ? "" : "/" + serviceName)); this.uri = uri; + this.serviceName = serviceName; } private static String getFSIdentifier(URI uri) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e556151/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ThrottledFileSystem.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ThrottledFileSystem.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ThrottledFileSystem.java index 491de82..ab225cb 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ThrottledFileSystem.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ThrottledFileSystem.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -53,13 +54,14 @@ public class ThrottledFileSystem extends FileSystemInstrumentation { * Factory for {@link ThrottledFileSystem}. */ public static class Factory<S extends ScopeType<S>> extends FileSystemInstrumentationFactory<S> { + private static final String SERVICE_NAME_CONF_KEY = "gobblin.broker.filesystem.limiterServiceName"; @Override public FileSystem instrumentFileSystem(FileSystem fs, SharedResourcesBroker<S> broker, ConfigView<S, FileSystemKey> config) { try { - Limiter limiter = - broker.getSharedResource(new SharedLimiterFactory<S>(), new FileSystemLimiterKey(config.getKey().getUri())); - return new ThrottledFileSystem(fs, limiter); + String serviceName = ConfigUtils.getString(config.getConfig(), SERVICE_NAME_CONF_KEY, ""); + Limiter limiter = broker.getSharedResource(new SharedLimiterFactory<S>(), new FileSystemLimiterKey(config.getKey().getUri())); + return new ThrottledFileSystem(fs, limiter, serviceName); } catch (NotConfiguredException nce) { throw new RuntimeException(nce); } @@ -72,10 +74,12 @@ public class ThrottledFileSystem extends FileSystemInstrumentation { public static final int LISTING_FILES_PER_PERMIT = 100; private final Limiter limiter; + private final String serviceName; - public ThrottledFileSystem(FileSystem fs, Limiter limiter) { + public ThrottledFileSystem(FileSystem fs, Limiter limiter, String serviceName) { super(fs); this.limiter = limiter; + this.serviceName = serviceName; } @Override @@ -185,6 +189,10 @@ public class ThrottledFileSystem extends FileSystemInstrumentation { return this.limiter; } + public String getServiceName() { + return this.serviceName; + } + @Override public void close() throws IOException { getRateLimiter().stop(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e556151/gobblin-utility/src/test/java/org/apache/gobblin/util/filesystem/ThrottledFileSystemTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/filesystem/ThrottledFileSystemTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/filesystem/ThrottledFileSystemTest.java index f8a863f..f0d33d5 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/filesystem/ThrottledFileSystemTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/filesystem/ThrottledFileSystemTest.java @@ -40,7 +40,7 @@ public class ThrottledFileSystemTest { Limiter limiter = new CountBasedLimiter(2); - ThrottledFileSystem throttledFileSystem = new ThrottledFileSystem(fs, limiter); + ThrottledFileSystem throttledFileSystem = new ThrottledFileSystem(fs, limiter, "testService"); Assert.assertNotNull(throttledFileSystem.getFileStatus(new Path("/myFile"))); Assert.assertNotNull(throttledFileSystem.getFileStatus(new Path("/myFile"))); @@ -74,7 +74,8 @@ public class ThrottledFileSystemTest { Limiter limiter = new CountBasedLimiter(5); - ThrottledFileSystem throttledFileSystem = new ThrottledFileSystem(fs, limiter); + ThrottledFileSystem throttledFileSystem = new ThrottledFileSystem(fs, limiter, "testService"); + Assert.assertEquals(throttledFileSystem.getServiceName(), "testService"); Assert.assertNotNull(throttledFileSystem.listStatus(new Path("/files/99"))); // use 1 permit Assert.assertNotNull(throttledFileSystem.listStatus(new Path("/files/250"))); // use 3 permits
