Author: dhruba
Date: Mon Dec 14 22:17:19 2009
New Revision: 890502
URL: http://svn.apache.org/viewvc?rev=890502&view=rev
Log:
HADOOP-6433. Introduce asychronous deletion of files via a pool of
threads. This can be used to delete files in the Distributed Cache.
(Zheng Shao via dhruba)
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/util/AsyncDiskService.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java
Modified:
hadoop/common/trunk/CHANGES.txt
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=890502&r1=890501&r2=890502&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon Dec 14 22:17:19 2009
@@ -22,6 +22,10 @@
HADOOP-6323. Add comparators to the serialization API.
(Aaron Kimball via cutting)
+ HADOOP-6433. Introduce asychronous deletion of files via a pool of
+ threads. This can be used to delete files in the Distributed
+ Cache. (Zheng Shao via dhruba)
+
IMPROVEMENTS
HADOOP-6283. Improve the exception messages thrown by
Added: hadoop/common/trunk/src/java/org/apache/hadoop/util/AsyncDiskService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/util/AsyncDiskService.java?rev=890502&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/util/AsyncDiskService.java
(added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/util/AsyncDiskService.java
Mon Dec 14 22:17:19 2009
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * This class is a container of multiple thread pools, each for a volume,
+ * so that we can schedule async disk operations easily.
+ *
+ * Examples of async disk operations are deletion of files.
+ * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * deleting it, to make sure the caller can run it faster.
+ */
+public class AsyncDiskService {
+
+ public static final Log LOG = LogFactory.getLog(AsyncDiskService.class);
+
+ // ThreadPool core pool size
+ private static final int CORE_THREADS_PER_VOLUME = 1;
+ // ThreadPool maximum pool size
+ private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
+ // ThreadPool keep-alive time for threads over core pool size
+ private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
+ private final ThreadGroup threadGroup = new ThreadGroup("async disk
service");
+
+ private ThreadFactory threadFactory;
+
+ private HashMap<String, ThreadPoolExecutor> executors
+ = new HashMap<String, ThreadPoolExecutor>();
+
+ /**
+ * Create a AsyncDiskServices with a set of volumes (specified by their
+ * root directories).
+ *
+ * The AsyncDiskServices uses one ThreadPool per volume to do the async
+ * disk operations.
+ *
+ * @param volumes The roots of the file system volumes.
+ */
+ public AsyncDiskService(String[] volumes) throws IOException {
+
+ threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(threadGroup, r);
+ }
+ };
+
+ // Create one ThreadPool per volume
+ for (int v = 0 ; v < volumes.length; v++) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ executors.put(volumes[v], executor);
+ }
+
+ }
+
+ /**
+ * Execute the task sometime in the future, using ThreadPools.
+ */
+ public synchronized void execute(String root, Runnable task) {
+ ThreadPoolExecutor executor = executors.get(root);
+ if (executor == null) {
+ throw new RuntimeException("Cannot find root " + root
+ + " for execution of task " + task);
+ } else {
+ executor.execute(task);
+ }
+ }
+
+ /**
+ * Gracefully start the shut down of all ThreadPools.
+ */
+ public synchronized void shutdown() {
+
+ LOG.info("Shutting down all AsyncDiskService threads...");
+
+ for (Map.Entry<String, ThreadPoolExecutor> e
+ : executors.entrySet()) {
+ e.getValue().shutdown();
+ }
+ }
+
+ /**
+ * Wait for the termination of the thread pools.
+ *
+ * @param milliseconds The number of milliseconds to wait
+ * @return true if all thread pools are terminated without time limit
+ * @throws InterruptedException
+ */
+ public synchronized boolean awaitTermination(long milliseconds)
+ throws InterruptedException {
+
+ long end = System.currentTimeMillis() + milliseconds;
+ for (Map.Entry<String, ThreadPoolExecutor> e:
+ executors.entrySet()) {
+ ThreadPoolExecutor executor = e.getValue();
+ if (!executor.awaitTermination(
+ Math.max(end - System.currentTimeMillis(), 0),
+ TimeUnit.MILLISECONDS)) {
+ LOG.warn("AsyncDiskService awaitTermination timeout.");
+ return false;
+ }
+ }
+ LOG.info("All AsyncDiskService threads are terminated.");
+ return true;
+ }
+
+ /**
+ * Shut down all ThreadPools immediately.
+ */
+ public synchronized List<Runnable> shutdownNow() {
+
+ LOG.info("Shutting down all AsyncDiskService threads immediately...");
+
+ List<Runnable> list = new ArrayList<Runnable>();
+ for (Map.Entry<String, ThreadPoolExecutor> e
+ : executors.entrySet()) {
+ list.addAll(e.getValue().shutdownNow());
+ }
+ return list;
+ }
+
+}
Added:
hadoop/common/trunk/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java?rev=890502&view=auto
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java
(added)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java
Mon Dec 14 22:17:19 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.AsyncDiskService;
+import org.junit.Test;
+
+/**
+ * A test for AsyncDiskService.
+ */
+public class TestAsyncDiskService extends TestCase {
+
+ public static final Log LOG = LogFactory.getLog(TestAsyncDiskService.class);
+
+ // Access by multiple threads from the ThreadPools in AsyncDiskService.
+ volatile int count;
+
+ /** An example task for incrementing a counter.
+ */
+ class ExampleTask implements Runnable {
+
+ ExampleTask() {
+ }
+
+ @Override
+ public void run() {
+ synchronized (TestAsyncDiskService.this) {
+ count ++;
+ }
+ }
+ };
+
+
+ /**
+ * This test creates some ExampleTasks and runs them.
+ */
+ @Test
+ public void testAsyncDiskService() throws Throwable {
+
+ String[] vols = new String[]{"/0", "/1"};
+ AsyncDiskService service = new AsyncDiskService(vols);
+
+ int total = 100;
+
+ for (int i = 0; i < total; i++) {
+ service.execute(vols[i%2], new ExampleTask());
+ }
+
+ Exception e = null;
+ try {
+ service.execute("no_such_volume", new ExampleTask());
+ } catch (RuntimeException ex) {
+ e = ex;
+ }
+ assertNotNull("Executing a task on a non-existing volume should throw an "
+ + "Exception.", e);
+
+ service.shutdown();
+ if (!service.awaitTermination(5000)) {
+ fail("AsyncDiskService didn't shutdown in 5 seconds.");
+ }
+
+ assertEquals(total, count);
+ }
+}