Repository: hadoop Updated Branches: refs/heads/trunk 438c1d333 -> f86c81d92
Revert "HADOOP-15039. Move SemaphoredDelegatingExecutor to hadoop-common. Contributed by Genmao Yu" This reverts commit 479d6a5792262c977025c26fd4960574b0db6847 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28792b6b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28792b6b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28792b6b Branch: refs/heads/trunk Commit: 28792b6b7f137df1db58496f27de23bbe99cdfd6 Parents: 438c1d3 Author: Kai Zheng <[email protected]> Authored: Thu Dec 14 11:03:56 2017 +0800 Committer: Kai Zheng <[email protected]> Committed: Thu Dec 14 11:05:11 2017 +0800 ---------------------------------------------------------------------- .../fs/s3a/BlockingThreadPoolExecutorService.java | 7 ++++--- .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 -- .../hadoop/fs/s3a/SemaphoredDelegatingExecutor.java | 15 +++++++++------ .../s3a/ITestBlockingThreadPoolExecutorService.java | 2 -- 4 files changed, 13 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/28792b6b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java index 404eea9..f13942d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.util; + +package org.apache.hadoop.fs.s3a; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -41,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience; * this s4 threadpool</a> */ @InterfaceAudience.Private -public final class BlockingThreadPoolExecutorService +final class BlockingThreadPoolExecutorService extends SemaphoredDelegatingExecutor { private static final Logger LOG = LoggerFactory @@ -85,7 +86,7 @@ public final class BlockingThreadPoolExecutorService * @return a thread factory that creates named, daemon threads with * the supplied exception handler and normal priority */ - public static ThreadFactory newDaemonThreadFactory(final String prefix) { + static ThreadFactory newDaemonThreadFactory(final String prefix) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/28792b6b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e927758..63a4349 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -112,10 +112,8 @@ import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; http://git-wip-us.apache.org/repos/asf/hadoop/blob/28792b6b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java index bcc19e3..6b21912 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.util; +package org.apache.hadoop.fs.s3a; import com.google.common.util.concurrent.ForwardingListeningExecutorService; import com.google.common.util.concurrent.Futures; @@ -42,13 +42,17 @@ import java.util.concurrent.TimeoutException; * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code * contains the thread pool logic, whereas this isolates the semaphore * and submit logic for use with other thread pools and delegation models. + * In particular, it <i>permits multiple per stream executors to share a + * single per-FS-instance executor; the latter to throttle overall + * load from the the FS, the others to limit the amount of load which + * a single output stream can generate.</i> * <p> * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> * this s4 threadpool</a> */ @SuppressWarnings("NullableProblems") @InterfaceAudience.Private -public class SemaphoredDelegatingExecutor extends +class SemaphoredDelegatingExecutor extends ForwardingListeningExecutorService { private final Semaphore queueingPermits; @@ -61,8 +65,7 @@ public class SemaphoredDelegatingExecutor extends * @param permitCount number of permits into the queue permitted * @param fair should the semaphore be "fair" */ - public SemaphoredDelegatingExecutor( - ListeningExecutorService executorDelegatee, + SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee, int permitCount, boolean fair) { this.permitCount = permitCount; @@ -187,7 +190,7 @@ public class SemaphoredDelegatingExecutor extends private Runnable delegatee; - RunnableWithPermitRelease(Runnable delegatee) { + public RunnableWithPermitRelease(Runnable delegatee) { this.delegatee = delegatee; } @@ -209,7 +212,7 @@ public class SemaphoredDelegatingExecutor extends private Callable<T> delegatee; - CallableWithPermitRelease(Callable<T> delegatee) { + public CallableWithPermitRelease(Callable<T> delegatee) { this.delegatee = delegatee; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/28792b6b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index 3dfe286..b1b8240 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -19,8 +19,6 @@ package org.apache.hadoop.fs.s3a; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.StopWatch; import org.junit.AfterClass; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
