Repository: tez Updated Branches: refs/heads/master fb0e45bf7 -> a5179d649
TEZ-1187. Add a framework ExecutorService which shares threads (harishjp) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a5179d64 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a5179d64 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a5179d64 Branch: refs/heads/master Commit: a5179d64937244e21694560f8d52d320ebca20c7 Parents: fb0e45b Author: Harish JP <[email protected]> Authored: Thu Apr 20 14:45:44 2017 +0530 Committer: Harish JP <[email protected]> Committed: Thu Apr 20 14:45:44 2017 +0530 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 18 + .../org/apache/tez/runtime/api/TaskContext.java | 17 +- .../org/apache/tez/common/TezExecutors.java | 52 +++ .../apache/tez/common/TezSharedExecutor.java | 338 +++++++++++++++++++ .../tez/common/TestTezSharedExecutor.java | 256 ++++++++++++++ .../tez/service/impl/ContainerRunnerImpl.java | 15 +- .../apache/tez/service/impl/TezTestService.java | 8 +- .../tez/mapreduce/output/TestMROutput.java | 15 +- .../tez/mapreduce/processor/MapUtils.java | 5 +- .../processor/map/TestMapProcessor.java | 29 +- .../processor/reduce/TestReduceProcessor.java | 7 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 14 +- .../runtime/api/impl/TezInputContextImpl.java | 7 +- .../runtime/api/impl/TezOutputContextImpl.java | 7 +- .../api/impl/TezProcessorContextImpl.java | 7 +- .../runtime/api/impl/TezTaskContextImpl.java | 13 +- .../org/apache/tez/runtime/task/TezChild.java | 6 +- .../apache/tez/runtime/task/TezTaskRunner2.java | 31 +- .../TestLogicalIOProcessorRuntimeTask.java | 12 +- .../runtime/api/impl/TestProcessorContext.java | 14 +- .../tez/runtime/task/TestTaskExecution2.java | 14 +- .../tez/runtime/task/TestTezTaskRunner2.java | 8 +- .../output/TestOnFileUnorderedKVOutput.java | 23 +- 23 files changed, 839 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 40f84e6..c0179f8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1790,4 +1790,22 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis"; public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1; + /** + * Int value. Minimum number of threads to be allocated by TezSharedExecutor. + */ + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_SHARED_EXECUTOR_MIN_THREADS = "tez.shared-executor.min-threads"; + public static final int TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT = 0; + + /** + * Int value. Maximum number of threads to be allocated by TezSharedExecutor. If value is negative + * then Integer.MAX_VALUE is used as the limit. + * Default: Integer.MAX_VALUE. + */ + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_SHARED_EXECUTOR_MAX_THREADS = "tez.shared-executor.max-threads"; + public static final int TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT = -1; + } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java index b5e42bc..dd2951a 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.api; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; @@ -234,5 +235,19 @@ public interface TaskContext { * @return the execution context */ public ExecutionContext getExecutionContext(); - + + /** + * Create a new ExecutorService with the given parallelism and thread name format. The parallelism + * might not be guaranteed. The service returned works with tez framework, currently it provides + * thread reuse across tasks. + * Note: This is an unstable api, and is not recommended to be used by external users. Please wait + * until API and code is stablized by use in Tez processors, input and outputs. + * @param parallelism The expected parallelism for for this ExecutorService. + * @param threadNameFormat The thread name format, format will be given one parameter, threadId. + * @return An ExecutorService instance. + */ + @Private + @Unstable + public ExecutorService createTezFrameworkExecutorService( + int parallelism, String threadNameFormat); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java new file mode 100644 index 0000000..a74c8ad --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/TezExecutors.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Interface to capture factory of ExecutorService. + */ +@Private +@Unstable +public interface TezExecutors { + + /** + * Create a ExecutorService with the given parameters. + * + * @param parallelism Represents total number of tasks to be executed in parallel. + * @param threadNameFormat The name the thread should take when executing tasks from this executor + * @return An ExecutorService. + */ + ExecutorService createExecutorService(int parallelism, String threadNameFormat); + + /** + * Shutdown all the ExecutorService created using this factory. + */ + void shutdown(); + + /** + * Shutdown all the ExecutorService created using this factory. It will discard any tasks which + * are not running and interrupt the running tasks. + */ + void shutdownNow(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java new file mode 100644 index 0000000..93bf3cc --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/TezSharedExecutor.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezConfiguration; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * An ExecutorService factory which shares threads between executors created using this service. + */ +@Private +@Unstable +public class TezSharedExecutor implements TezExecutors { + + // The shared executor service which will be used to execute all the tasks. + private final ThreadPoolExecutor service; + + private final DelayedExecutionPoller poller; + + public TezSharedExecutor(Configuration conf) { + // The default value is 0. We could start with a few threads so that thread pool is never empty. + int minThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS, + TezConfiguration.TEZ_SHARED_EXECUTOR_MIN_THREADS_DEFAULT); + // The default value is Integer.MAX_VALUE, but ExecutorServiceInternal will do the rate limiting + // of total numbers of tasks and hence the num threads will be bounded. + int maxThreads = conf.getInt(TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS, + TezConfiguration.TEZ_SHARED_EXECUTOR_MAX_THREADS_DEFAULT); + if (maxThreads < 0) { + maxThreads = Integer.MAX_VALUE; + } + this.service = new ThreadPoolExecutor( + minThreads, maxThreads, + // The timeout is to give thread a chance to be re-used instead of being cleaned up. + 60, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TezSharedExecutor: %d").build()); + + // Setup polling thread to pick new tasks from the underlying executors. + poller = new DelayedExecutionPoller(service); + poller.start(); + } + + public ExecutorService createExecutorService(int poolSize, String threadName) { + return new ExecutorServiceInternal(poolSize, threadName); + } + + // Should we allow a shared service shutdown, once this shutdown is complete, all the executors + // are in shutdown mode and will throw exception if we try to submit new tasks. And already + // submitted tasks in the ExecutorServiceInternal which are not yet submitted to the shared + // service will not be executed. That break contracts, we can fix this by tracking that the + // service is shutdown and wait until all the dependent. + public void shutdown() { + service.shutdown(); + poller.interrupt(); + } + + public void shutdownNow() { + service.shutdownNow(); + poller.interrupt(); + } + + @Override + protected void finalize() { + this.shutdown(); + } + + private static class DelayedExecutionPoller extends Thread { + // Store service reference in this static class to prevent a reference of TezSharedExecutor from + // being held inside a non static class which prevents cleanup via GC. + private final ThreadPoolExecutor service; + + // A queue which contains instances which have tasks to be executed. + private final LinkedBlockingQueue<ExecutorServiceInternal> executeQueue = + new LinkedBlockingQueue<>(); + + DelayedExecutionPoller(ThreadPoolExecutor service) { + super("DelayedExecutionPoller"); + this.setDaemon(true); + this.service = service; + } + + void add(ExecutorServiceInternal es) { + executeQueue.add(es); + } + + @Override + public void run() { + while (!service.isShutdown()) { + try { + executeQueue.take().tryExecute(); + } catch (InterruptedException e) { + } + } + } + } + + /* + * The internal shared executor service which delegates all the execution to the shared service. + * It allows managing a given instance of ExecutorService independently of other instances created + * in the same service. + * + * - It stores a queue of submitted tasks and submits only the configured poolSize number of tasks + * into the shared executor service. + * - Stores a list of futures used implement shutdownNow and awaitTermination. + */ + private class ExecutorServiceInternal extends AbstractExecutorService { + // This contains all the tasks which are submitted through this ExecutorService and has not + // finished, we use this to implement shutdownNow and awaitForTermination. + // Note: This should have been an Set, but we do not have a concurrent set. + private final ConcurrentHashMap<ManagedFutureTask<?>, Boolean> futures = + new ConcurrentHashMap<>(); + + // Number of tasks currently submitted by this executor to the common executor service. + private final AtomicInteger numTasksSubmitted = new AtomicInteger(); + + // The list of pending tasks to be submitted on behalf of this service. + private final LinkedBlockingQueue<ManagedFutureTask<?>> pendingTasks = + new LinkedBlockingQueue<>(); + + // Set to 0 when shutdown is complete, a CountDownLatch is used to enable wait for shutdown in + // awaitTermination. + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + // The thread name to be used for threads executing tasks of this executor. + private final String threadName; + + // Total number of threads to be used. + private final int poolSize; + + ExecutorServiceInternal(int poolSize, String threadName) { + Preconditions.checkArgument(poolSize > 0, "Expected poolSize > 0"); + this.threadName = threadName; + this.poolSize = poolSize; + } + + // A FutureTask which we will use to wrap all the runnable and callable. It adds and removes + // from the futures set above. And also notifies TezSharedExecutor to pick new tasks from the + // current ExecutorServiceInternal instance. + private class ManagedFutureTask<V> extends FutureTask<V> { + // Set to true if this task was submitted to the shared ExecutorService. + private boolean submitted = false; + + ManagedFutureTask(Runnable runnable, V value) { + super(runnable, value); + addFuture(this); + } + + ManagedFutureTask(Callable<V> callable) { + super(callable); + addFuture(this); + } + + @Override + public void run() { + Thread thisThread = Thread.currentThread(); + String savedThreadName = null; + if (threadName != null) { + savedThreadName = thisThread.getName(); + thisThread.setName(String.format(threadName, thisThread.getId())); + } + try { + super.run(); + } finally { + if (threadName != null) { + thisThread.setName(savedThreadName); + } + } + } + + // There is a race b/w cancel and submit hence the synchronization. + synchronized void submit() { + submitted = true; + service.execute(this); + } + + @Override + public void done() { + removeFuture(this); + synchronized (this) { + if (submitted) { // Decrement only if this task was submitted. + numTasksSubmitted.decrementAndGet(); + } + } + // Add internal executor service to poller to schedule another task if available. + // We do this instead of invoking tryExecute here, to give a chance for this thread to be + // reused. But its still possible that a new thread is created. + poller.add(ExecutorServiceInternal.this); + } + } + + private void addFuture(ManagedFutureTask<?> future) { + futures.put(future, Boolean.TRUE); + // If already shutdown, reject this task. + if (isShutdown()) { + service.getRejectedExecutionHandler().rejectedExecution(future, service); + } + } + + private void removeFuture(ManagedFutureTask<?> future) { + futures.remove(future); + } + + // Return our internal future task so that all the tasks submitted are tracked and cleaned up. + @SuppressWarnings("unchecked") + @Override + protected <T> ManagedFutureTask<T> newTaskFor(Runnable runnable, T value) { + if (runnable instanceof ManagedFutureTask) { + return (ManagedFutureTask<T>)runnable; + } + return new ManagedFutureTask<T>(runnable, value); + } + + @Override + protected <T> ManagedFutureTask<T> newTaskFor(Callable<T> callable) { + return new ManagedFutureTask<T>(callable); + } + + @Override + public void shutdown() { + shutdownLatch.countDown(); + } + + @Override + public List<Runnable> shutdownNow() { + shutdownLatch.countDown(); + List<Runnable> pending = new ArrayList<>(pendingTasks.size()); + pendingTasks.drainTo(pending); + // cancel all futures, interrupt if its running. + for (ManagedFutureTask<?> future : futures.keySet()) { + future.cancel(true); + } + return pending; + } + + @Override + public boolean isShutdown() { + return shutdownLatch.getCount() == 0 || service.isShutdown(); + } + + @Override + public boolean isTerminated() { + return isShutdown() && futures.isEmpty(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout); + // Wait for shutdown to be invoked. + if (!shutdownLatch.await(timeout, unit)) { + return false; + } + // Wait for the remaining futures to finish. + for (ManagedFutureTask<?> future : futures.keySet()) { + long nanosLeft = deadline - System.nanoTime(); + if (nanosLeft <= 0) { + return false; + } + try { + future.get(nanosLeft, TimeUnit.NANOSECONDS); + } catch (ExecutionException | CancellationException ignore) { + } catch (TimeoutException e) { + return false; + } + } + return true; + } + + // Submit a task if task is available and poolSize has not been reached. + private void tryExecute() { + while (!pendingTasks.isEmpty()) { + int numTasks = numTasksSubmitted.get(); + if (numTasks >= poolSize) { + return; + } + if (numTasksSubmitted.compareAndSet(numTasks, numTasks + 1)) { + ManagedFutureTask<?> task = pendingTasks.poll(); + // This breaks a contract unfortunately. If a task is submitted and it ends up in a + // queue and then the shared service is shutdown then this job cannot be executed, which + // is not the contract, ideally it should execute the task. + if (task == null || task.isCancelled() || service.isShutdown()) { + numTasksSubmitted.decrementAndGet(); + } else { + task.submit(); + } + } + } + } + + @Override + public void execute(Runnable command) { + this.pendingTasks.add(newTaskFor(command, null)); + this.tryExecute(); + } + + @Override + protected void finalize() { + this.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java new file mode 100644 index 0000000..8d87846 --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestTezSharedExecutor { + + private static class Sleep implements Runnable { + private final long sleepTime; + Sleep(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override + public void run() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static class Wait implements Runnable { + private final Object ref; + Wait(Object ref) { + this.ref = ref == null ? this : ref; + } + @Override + public void run() { + try { + synchronized (ref) { + ref.wait(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static class Counter implements Runnable { + private final AtomicInteger counter; + Counter(ConcurrentHashMap<String, AtomicInteger> map, String tag) { + if (!map.contains(tag)) { + map.putIfAbsent(tag, new AtomicInteger(0)); + } + this.counter = map.get(tag); + } + @Override + public void run() { + counter.getAndIncrement(); + } + } + + private static class Appender<T> implements Runnable { + private final Collection<T> collection; + private final T obj; + Appender(Collection<T> collection, T obj) { + this.collection = collection; + this.obj = obj; + } + @Override + public void run() { + collection.add(obj); + } + } + + private static class Runner implements Runnable { + private Runnable[] runnables; + Runner(Runnable ... runnables) { + this.runnables = runnables; + } + @Override + public void run() { + for (Runnable runnable : runnables) { + runnable.run(); + } + } + } + + private void _notify(Object obj) { + synchronized (obj) { + obj.notify(); + } + } + + private TezSharedExecutor sharedExecutor; + + @Before + public void setup() { + sharedExecutor = new TezSharedExecutor(new Configuration()); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); + sharedExecutor = null; + } + + @Test(timeout=2000) + public void testSimpleExecution() throws Exception { + ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>(); + + ExecutorService service = sharedExecutor.createExecutorService(1, "simple-test"); + + // Test runnable + service.submit(new Counter(map, "test")).get(); + Assert.assertEquals(1, map.get("test").get()); + + // Test runnable with a result + final Object expected = new Object(); + Object val = service.submit(new Counter(map, "test"), expected).get(); + Assert.assertEquals(expected, val); + Assert.assertEquals(2, map.get("test").get()); + + // Test callable. + val = service.submit(new Callable<Object>() { + @Override + public Object call() throws Exception { + return expected; + } + }).get(); + Assert.assertEquals(expected, val); + + // Tasks should be rejected after a shutdown. + service.shutdown(); + + try { + service.submit(new Counter(map, "test")); + Assert.fail("Expected rejected execution exception."); + } catch (RejectedExecutionException e) { + } + } + + @Test(timeout=5000) + public void testAwaitTermination() throws Exception { + ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination"); + + final Runnable runnable = new Wait(null); + service.submit(runnable); + service.shutdown(); + + // No notify sent hence it should fail. + Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS)); + Assert.assertFalse(service.isTerminated()); + Assert.assertTrue(service.isShutdown()); + + Timer timer = new Timer(true); + timer.schedule(new TimerTask() { + @Override + public void run() { + _notify(runnable); + } + }, 100); + + // Highly unlikely that there are intermittent failures, but a possiblity :-(. + Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS)); + Assert.assertTrue(service.isTerminated()); + Assert.assertTrue(service.isShutdown()); + + timer.cancel(); + } + + @Test(timeout=2000) + public void testSerialExecution() throws Exception { + ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test"); + + // Since it is serial we should never get concurrent modification exception too. + List<Integer> list = new ArrayList<>(); + Future<?> f1 = service.submit(new Wait(list)); + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + futures.add(service.submit(new Appender<Integer>(list, i))); + } + + // This shutdown does not prevent already submitted tasks from completing. + service.shutdown(); + + // Until we notify nothing moves forward. + Assert.assertEquals(0, list.size()); + _notify(list); + f1.get(); + + // Wait for all futures to finish. + for (Future<?> f : futures) { + f.get(); + } + Assert.assertEquals(10, list.size()); + Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list); + } + + @Test(timeout=5000) + public void testParallelExecution() throws Exception { + ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>(); + + List<Future<?>> futures = new ArrayList<>(); + ExecutorService[] services = { + sharedExecutor.createExecutorService(2, "parallel-1"), + sharedExecutor.createExecutorService(2, "parallel-2") + }; + int[] expectedCounts = {0, 0}; + Random random = new Random(); + for (int i = 0; i < 200; ++i) { + int serviceIndex = random.nextInt(2); + expectedCounts[serviceIndex] += 1; + futures.add(services[serviceIndex].submit( + new Runner(new Sleep(10), new Counter(map, "test" + serviceIndex)))); + } + for (Future<?> future : futures) { + future.get(); + } + Assert.assertEquals(expectedCounts[0], map.get("test0").get()); + Assert.assertEquals(expectedCounts[1], map.get("test1").get()); + + // Even if one service is shutdown the other should work. + services[0].shutdown(); + services[1].submit(new Counter(map, "test1")).get(); + Assert.assertEquals(expectedCounts[1] + 1, map.get("test1").get()); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index f9de995..3317bbf 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -90,6 +91,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun private final Map<String, String> localEnv = new HashMap<String, String>(); private volatile FileSystem localFs; private final long memoryPerExecutor; + + private final TezExecutors sharedExecutor; // TODO Support for removing queued containers, interrupting / killing specific containers - when preemption is supported @@ -97,7 +100,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun public ContainerRunnerImpl(int numExecutors, String[] localDirsBase, AtomicReference<InetSocketAddress> localAddress, - long totalMemoryAvailableBytes) { + long totalMemoryAvailableBytes, + TezExecutors sharedExecutor) { super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); @@ -117,6 +121,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun "memoryPerExecutorDerived=" + memoryPerExecutor + ", numExecutors=" + numExecutors ); + this.sharedExecutor = sharedExecutor; } @Override @@ -262,7 +267,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - workingDir, credentials, memoryPerExecutor); + workingDir, credentials, memoryPerExecutor, sharedExecutor); ListenableFuture<ContainerExecutionResult> future = executorService.submit(callable); Futures.addCallback(future, new TaskRunnerCallback(request, callable)); } @@ -385,12 +390,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun private volatile TezTaskRunner2 taskRunner; private volatile TaskReporter taskReporter; private TezTaskUmbilicalProtocol umbilical; + private final TezExecutors sharedExecutor; TaskRunnerCallable(SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, String workingDir, Credentials credentials, - long memoryAvailable) { + long memoryAvailable, TezExecutors sharedExecutor) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -405,6 +411,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun .setDaemon(true) .setNameFormat("TezTaskRunner_" + request.getTaskSpec().getTaskAttemptIdString()).build()); executor = MoreExecutors.listeningDecorator(executorReal); + this.sharedExecutor = sharedExecutor; } @Override @@ -452,7 +459,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memoryAvailable, false, new DefaultHadoopShim()); + executionContext, memoryAvailable, false, new DefaultHadoopShim(), sharedExecutor); boolean shouldDie; try { http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java index 322be00..17eb88c 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java @@ -23,6 +23,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.TezException; import org.apache.tez.service.ContainerRunner; import org.apache.tez.shufflehandler.ShuffleHandler; @@ -46,6 +48,8 @@ public class TezTestService extends AbstractService implements ContainerRunner { private final AtomicReference<InetSocketAddress> address = new AtomicReference<InetSocketAddress>(); + private final TezExecutors sharedExecutor; + public TezTestService(Configuration conf, int numExecutors, long memoryAvailable, String[] localDirs) { super(TezTestService.class.getSimpleName()); this.numExecutors = numExecutors; @@ -73,8 +77,9 @@ public class TezTestService extends AbstractService implements ContainerRunner { this.shuffleHandlerConf.set(ShuffleHandler.SHUFFLE_HANDLER_LOCAL_DIRS, StringUtils.arrayToString(localDirs)); this.server = new TezTestServiceProtocolServerImpl(this, address); + this.sharedExecutor = new TezSharedExecutor(conf); this.containerRunner = new ContainerRunnerImpl(numExecutors, localDirs, address, - memoryAvailableBytes); + memoryAvailableBytes, sharedExecutor); } @Override @@ -95,6 +100,7 @@ public class TezTestService extends AbstractService implements ContainerRunner { containerRunner.stop(); server.stop(); ShuffleHandler.get().stop(); + sharedExecutor.shutdownNow(); } public InetSocketAddress getListenerAddress() { http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 8b52cc9..f3403e6 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -235,7 +237,7 @@ public class TestMROutput { public static LogicalIOProcessorRuntimeTask createLogicalTask( Configuration conf, TezUmbilical umbilical, String dagName, - String vertexName) throws Exception { + String vertexName, TezExecutors sharedExecutor) throws Exception { ProcessorDescriptor procDesc = ProcessorDescriptor.create(TestProcessor.class.getName()); List<InputSpec> inputSpecs = Lists.newLinkedList(); List<OutputSpec> outputSpecs = Lists.newLinkedList(); @@ -263,9 +265,9 @@ public class TestMROutput { null, new HashMap<String, String>(), HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor); } - + public static class TestOutputCommitter extends OutputCommitter { @Override @@ -395,10 +397,13 @@ public class TestMROutput { @Ignore @Test public void testPerf() throws Exception { - LogicalIOProcessorRuntimeTask task = createLogicalTask(new Configuration(), - new TestUmbilical(), "dag", "vertex"); + Configuration conf = new Configuration(); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf); + LogicalIOProcessorRuntimeTask task = createLogicalTask(conf, new TestUmbilical(), "dag", + "vertex", sharedExecutor); task.initialize(); task.run(); task.close(); + sharedExecutor.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index b69dc0c..29f9ca9 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.tez.common.MRFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.mapreduce.TezTestUtils; @@ -200,7 +201,7 @@ public class MapUtils { JobConf jobConf, int mapId, Path mapInput, TezUmbilical umbilical, String dagName, String vertexName, List<InputSpec> inputSpecs, - List<OutputSpec> outputSpecs) throws Exception { + List<OutputSpec> outputSpecs, TezSharedExecutor sharedExecutor) throws Exception { jobConf.setInputFormat(SequenceFileInputFormat.class); ProcessorDescriptor mapProcessorDesc = ProcessorDescriptor.create( @@ -234,7 +235,7 @@ public class MapUtils { serviceConsumerMetadata, envMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor); return task; } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 7c5e2a7..eb30841 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -21,22 +21,10 @@ package org.apache.tez.mapreduce.processor.map; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.db.FloatSplitter; -import org.apache.hadoop.mapreduce.split.JobSplit; -import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +40,7 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.tez.common.MRFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.UserPayload; @@ -86,8 +75,6 @@ public class TestMapProcessor { private static FileSystem localFs = null; private static Path workDir = null; static float progressUpdate = 0.0f; - final private static FsPermission JOB_FILE_PERMISSION = FsPermission - .createImmutable((short) 0644); static { try { defaultConf.set("fs.defaultFS", "file:///"); @@ -163,15 +150,17 @@ public class TestMapProcessor { OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf); LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, - Collections.singletonList(mapInputSpec), - Collections.singletonList(mapOutputSpec)); - + Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec), + sharedExecutor); + task.initialize(); task.run(); task.close(); - + sharedExecutor.shutdownNow(); + OutputContext outputContext = task.getOutputContexts().iterator().next(); TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier()); @@ -236,11 +225,12 @@ public class TestMapProcessor { OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf); final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask (localFs, workDir, jobConf, 0, new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName, Collections.singletonList(mapInputSpec), - Collections.singletonList(mapOutputSpec)); + Collections.singletonList(mapOutputSpec), sharedExecutor); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); Thread monitorProgress = new Thread(new Runnable() { @@ -259,5 +249,6 @@ public class TestMapProcessor { Assert.assertTrue("Progress Updates should be captured!", progressUpdate > 0.0f && progressUpdate < 1.0f); task.close(); + sharedExecutor.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index ca3792f..42ea4f7 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.tez.common.MRFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.InputDescriptor; @@ -155,10 +156,11 @@ public class TestReduceProcessor { TestUmbilical testUmbilical = new TestUmbilical(); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf); LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0, mapInput, testUmbilical, dagName, mapVertexName, Collections.singletonList(mapInputSpec), - Collections.singletonList(mapOutputSpec)); + Collections.singletonList(mapOutputSpec), sharedExecutor); mapTask.initialize(); mapTask.run(); @@ -225,7 +227,7 @@ public class TestReduceProcessor { serviceConsumerMetadata, serviceProviderEnvMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); + Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor); List<Event> destEvents = new LinkedList<Event>(); destEvents.add(dme); @@ -235,6 +237,7 @@ public class TestReduceProcessor { sortedOut.handleEvents(destEvents); task.run(); task.close(); + sharedExecutor.shutdownNow(); // MRTask mrTask = (MRTask)t.getProcessor(); // TODO NEWTEZ Verify the partitioner has not been created http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index e49791f..5c2ab77 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -56,6 +56,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.RunnableWithNdc; +import org.apache.tez.common.TezExecutors; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -158,13 +159,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final boolean initializeProcessorFirst; private final boolean initializeProcessorIOSerially; + private final TezExecutors sharedExecutor; public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry, String pid, ExecutionContext ExecutionContext, long memAvailable, - boolean updateSysCounters, HadoopShim hadoopShim) throws IOException { + boolean updateSysCounters, HadoopShim hadoopShim, + TezExecutors sharedExecutor) throws IOException { // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method. // TODO Remove jobToken from here post TEZ-421 super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters); @@ -217,6 +220,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.hadoopShim = hadoopShim; this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG, TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT); + this.sharedExecutor = sharedExecutor; } /** @@ -596,7 +600,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputSpec.getInputDescriptor().getUserPayload(), this, serviceConsumerMetadata, envMap, initialMemoryDistributor, inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry, - ExecutionContext, memAvailable); + ExecutionContext, memAvailable, sharedExecutor); return inputContext; } @@ -611,7 +615,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputSpec.getOutputDescriptor().getUserPayload(), this, serviceConsumerMetadata, envMap, initialMemoryDistributor, outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext, - memAvailable); + memAvailable, sharedExecutor); return outputContext; } @@ -622,8 +626,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { taskSpec.getVertexParallelism(), taskSpec.getTaskAttemptID(), processorDescriptor.getUserPayload(), this, - serviceConsumerMetadata, envMap, initialMemoryDistributor, - processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable); + serviceConsumerMetadata, envMap, initialMemoryDistributor, processorDescriptor, + inputReadyTracker, objectRegistry, ExecutionContext, memAvailable, sharedExecutor); return processorContext; } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index afb78d9..15a6485 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -94,12 +95,13 @@ public class TezInputContextImpl extends TezTaskContextImpl Map<String, String> auxServiceEnv, MemoryDistributor memDist, InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry, - ExecutionContext ExecutionContext, long memAvailable) { + ExecutionContext ExecutionContext, long memAvailable, + TezExecutors sharedExecutor) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, vertexParallelism, taskAttemptID, wrapCounters(runtimeTask, taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor, - objectRegistry, ExecutionContext, memAvailable); + objectRegistry, ExecutionContext, memAvailable, sharedExecutor); checkNotNull(inputIndex, "inputIndex is null"); checkNotNull(sourceVertexName, "sourceVertexName is null"); checkNotNull(inputs, "input map is null"); @@ -153,7 +155,6 @@ public class TezInputContextImpl extends TezTaskContextImpl return sourceVertexName; } - @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 1bd78d3..41e8d41 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -89,12 +90,13 @@ public class TezOutputContextImpl extends TezTaskContextImpl Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry, - ExecutionContext ExecutionContext, long memAvailable) { + ExecutionContext executionContext, long memAvailable, TezExecutors sharedExecutor) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, vertexParallelism, taskAttemptID, wrapCounters(runtimeTask, taskVertexName, destinationVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, - auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable); + auxServiceEnv, memDist, outputDescriptor, objectRegistry, executionContext, memAvailable, + sharedExecutor); checkNotNull(outputIndex, "outputIndex is null"); checkNotNull(destinationVertexName, "destinationVertexName is null"); this.userPayload = userPayload; @@ -138,7 +140,6 @@ public class TezOutputContextImpl extends TezTaskContextImpl return destinationVertexName; } - @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index d03f48e..beae693 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -32,6 +32,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezExecutors; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -63,10 +64,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry, - ExecutionContext ExecutionContext, long memAvailable) { + ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) { super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID, runtimeTask.addAndGetTezCounter(vertexName), runtimeTask, tezUmbilical, serviceConsumerMetadata, - auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable); + auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable, + sharedExecutor); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); this.userPayload = userPayload; this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR, @@ -98,7 +100,6 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce } } - @SuppressWarnings("deprecation") @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 35abd1e..5a6a405 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EntityDescriptor; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -67,6 +69,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { private final int vertexParallelism; private final ExecutionContext ExecutionContext; private final long memAvailable; + private final TezExecutors sharedExecutor; @Private public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, @@ -75,7 +78,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry, - ExecutionContext ExecutionContext, long memAvailable) { + ExecutionContext ExecutionContext, long memAvailable, TezExecutors sharedExecutor) { checkNotNull(conf, "conf is null"); checkNotNull(dagName, "dagName is null"); checkNotNull(taskVertexName, "taskVertexName is null"); @@ -85,6 +88,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { checkNotNull(auxServiceEnv, "auxServiceEnv is null"); checkNotNull(memDist, "memDist is null"); checkNotNull(descriptor, "descriptor is null"); + checkNotNull(sharedExecutor, "sharedExecutor is null"); this.dagName = dagName; this.taskVertexName = taskVertexName; this.taskAttemptID = taskAttemptID; @@ -106,6 +110,7 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { this.vertexParallelism = vertexParallelism; this.ExecutionContext = ExecutionContext; this.memAvailable = memAvailable; + this.sharedExecutor = sharedExecutor; } @Override @@ -237,6 +242,12 @@ public abstract class TezTaskContextImpl implements TaskContext, Closeable { return this.ExecutionContext; } + @Override + public ExecutorService createTezFrameworkExecutorService( + int parallelism, String threadNameFormat) { + return sharedExecutor.createExecutorService(parallelism, threadNameFormat); + } + private int generateId() { return ID_GEN.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index e8e7391..bc911c3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -52,7 +52,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezLocalResource; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.Limits; @@ -120,6 +122,7 @@ public class TezChild { private int taskCount = 0; private TezVertexID lastVertexID; private final HadoopShim hadoopShim; + private final TezExecutors sharedExecutor; public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, @@ -141,6 +144,7 @@ public class TezChild { this.user = user; this.updateSysCounters = updateSysCounters; this.hadoopShim = hadoopShim; + this.sharedExecutor = new TezSharedExecutor(defaultConf); getTaskMaxSleepTime = defaultConf.getInt( TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX, @@ -258,7 +262,7 @@ public class TezChild { localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, - hadoopShim); + hadoopShim, sharedExecutor); boolean shouldDie; try { TaskRunner2Result result = taskRunner.run(); http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 96f8474..306f2a7 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -35,6 +35,8 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.hadoop.shim.HadoopShim; @@ -102,6 +104,25 @@ public class TezTaskRunner2 { // The callable which is being used to execute the task. private volatile TaskRunner2Callable taskRunnerCallable; + // This instance is set only if the runner was not configured explicity and will be shutdown + // when this task is finished. + private final TezSharedExecutor localExecutor; + + @Deprecated + public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, + TaskSpec taskSpec, int appAttemptNumber, + Map<String, ByteBuffer> serviceConsumerMetadata, + Map<String, String> serviceProviderEnvMap, + Multimap<String, String> startedInputsMap, + TaskReporterInterface taskReporter, ExecutorService executor, + ObjectRegistry objectRegistry, String pid, + ExecutionContext executionContext, long memAvailable, + boolean updateSysCounters, HadoopShim hadoopShim) throws IOException { + this(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata, + serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, + pid, executionContext, memAvailable, updateSysCounters, hadoopShim, null); + } + public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, TaskSpec taskSpec, int appAttemptNumber, Map<String, ByteBuffer> serviceConsumerMetadata, @@ -110,7 +131,8 @@ public class TezTaskRunner2 { TaskReporterInterface taskReporter, ExecutorService executor, ObjectRegistry objectRegistry, String pid, ExecutionContext executionContext, long memAvailable, - boolean updateSysCounters, HadoopShim hadoopShim) throws + boolean updateSysCounters, HadoopShim hadoopShim, + TezExecutors sharedExecutor) throws IOException { this.ugi = ugi; this.taskReporter = taskReporter; @@ -125,9 +147,11 @@ public class TezTaskRunner2 { taskConf.set(entry.getKey(), entry.getValue()); } } + localExecutor = sharedExecutor == null ? new TezSharedExecutor(tezConf) : null; this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, - objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim); + objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, + sharedExecutor == null ? localExecutor : sharedExecutor); } /** @@ -258,6 +282,9 @@ public class TezTaskRunner2 { if (taskKillStartTime != 0) { LOG.info("Time taken to interrupt task={}", (System.currentTimeMillis() - taskKillStartTime)); } + if (localExecutor != null) { + localExecutor.shutdown(); + } Thread.interrupted(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index ecfc424..c1bb3a1 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -81,10 +82,11 @@ public class TestLogicalIOProcessorRuntimeTask { TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2); TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1", 10); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf); LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, - new DefaultHadoopShim()); + new DefaultHadoopShim(), sharedExecutor); try { lio1.initialize(); @@ -105,6 +107,7 @@ public class TestLogicalIOProcessorRuntimeTask { assertEquals(30, lio1.getOutputContexts().iterator().next().getVertexParallelism()); } catch(Exception e) { fail(); + sharedExecutor.shutdownNow(); } finally { cleanupAndTest(lio1); } @@ -114,7 +117,7 @@ public class TestLogicalIOProcessorRuntimeTask { LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, - new DefaultHadoopShim()); + new DefaultHadoopShim(), sharedExecutor); try { lio2.initialize(); lio2.run(); @@ -134,6 +137,7 @@ public class TestLogicalIOProcessorRuntimeTask { fail(); } finally { cleanupAndTest(lio2); + sharedExecutor.shutdownNow(); } } @@ -275,7 +279,7 @@ public class TestLogicalIOProcessorRuntimeTask { @Override public void start() throws Exception { startCount++; - this.vertexParallelism = getContext().getVertexParallelism(); + vertexParallelism = getContext().getVertexParallelism(); getContext().notifyProgress(); } @@ -315,7 +319,7 @@ public class TestLogicalIOProcessorRuntimeTask { public void start() throws Exception { System.err.println("Out started"); startCount++; - this.vertexParallelism = getContext().getVertexParallelism(); + vertexParallelism = getContext().getVertexParallelism(); getContext().notifyProgress(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java index d16b880..bf4fdf6 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestProcessorContext.java @@ -27,6 +27,7 @@ import java.util.Map; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -61,10 +62,11 @@ public class TestProcessorContext { TaskSpec mockSpec = mock(TaskSpec.class); when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); - LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask( - mockSpec, 1, - new Configuration(), new String[]{"/"}, - tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim()); + Configuration conf = new Configuration(); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf); + LogicalIOProcessorRuntimeTask runtimeTask = new LogicalIOProcessorRuntimeTask(mockSpec, 1, conf, + new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false, + new DefaultHadoopShim(), sharedExecutor); LogicalIOProcessorRuntimeTask mockTask = spy(runtimeTask); Map<String, ByteBuffer> serviceConsumerMetadata = Maps.newHashMap(); Map<String, String> auxServiceEnv = Maps.newHashMap(); @@ -94,7 +96,8 @@ public class TestProcessorContext { inputReadyTracker, objectRegistry, execContext, - memAvailable); + memAvailable, + sharedExecutor); assertEquals(dagNumber, procContext.getDagIdentifier()); assertEquals(appAttemptNumber, procContext.getDAGAttemptNumber()); @@ -107,5 +110,6 @@ public class TestProcessorContext { // test auto call of notifyProgress procContext.setProgress(0.1f); verify(mockTask, times(1)).notifyProgressInvocation(); + sharedExecutor.shutdown(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index adcbe4a..07b9d33 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TaskCounter; @@ -778,19 +780,21 @@ public class TestTaskExecution2 { new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor, new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null); + TezExecutors sharedExecutor = new TezSharedExecutor(tezConf); TezTaskRunner2 taskRunner; if (testRunner) { taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1, new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String>create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), updateSysCounters); + Runtime.getRuntime().maxMemory(), updateSysCounters, sharedExecutor); } else { taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1, new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String>create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim()); + Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim(), + sharedExecutor); } return taskRunner; @@ -815,10 +819,12 @@ public class TestTaskExecution2 { String pid, ExecutionContext executionContext, long memAvailable, - boolean updateSysCounters) throws IOException { + boolean updateSysCounters, + TezExecutors sharedExecutor) throws IOException { super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim()); + executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(), + sharedExecutor); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java index f58421a..6876df9 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java @@ -20,13 +20,13 @@ package org.apache.tez.runtime.task; import static org.mockito.Mockito.*; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.tez.common.ProtoConverters; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.InputSpec; @@ -52,13 +52,15 @@ public class TestTezTaskRunner2 { List<OutputSpec> outputSpecList = new ArrayList<>(); TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class), inputSpecList, outputSpecList, null, taskConf); + TezExecutors sharedExecutor = new TezSharedExecutor(conf); TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class), localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid", - null, 1000, false, new DefaultHadoopShim()); + null, 1000, false, new DefaultHadoopShim(), sharedExecutor); Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global")); Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override")); Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task")); + sharedExecutor.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a5179d64/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 38a60a2..f549e8f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.UserPayload; @@ -125,7 +126,8 @@ public class TestOnFileUnorderedKVOutput { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); - OutputContext outputContext = createOutputContext(conf); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf); + OutputContext outputContext = createOutputContext(conf, sharedExecutor); UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1); @@ -155,6 +157,7 @@ public class TestOnFileUnorderedKVOutput { assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent()); assertEquals(shufflePort, shufflePayload.getPort()); assertEquals("localhost", shufflePayload.getHost()); + sharedExecutor.shutdownNow(); } @Test(timeout = 30000) @@ -167,7 +170,8 @@ public class TestOnFileUnorderedKVOutput { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 1); - OutputContext outputContext = createOutputContext(conf); + TezSharedExecutor sharedExecutor = new TezSharedExecutor(conf); + OutputContext outputContext = createOutputContext(conf, sharedExecutor); UnorderedKVOutput kvOutput = new UnorderedKVOutput(outputContext, 1); @@ -202,9 +206,11 @@ public class TestOnFileUnorderedKVOutput { assertFalse(shufflePayload.hasEmptyPartitions()); assertEquals(shufflePort, shufflePayload.getPort()); assertEquals("localhost", shufflePayload.getHost()); + sharedExecutor.shutdownNow(); } - private OutputContext createOutputContext(Configuration conf) throws IOException { + private OutputContext createOutputContext(Configuration conf, TezSharedExecutor sharedExecutor) + throws IOException { int appAttemptNumber = 1; TezUmbilical tezUmbilical = mock(TezUmbilical.class); String dagName = "currentDAG"; @@ -219,11 +225,10 @@ public class TestOnFileUnorderedKVOutput { TaskSpec mockSpec = mock(TaskSpec.class); when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); - task = new LogicalIOProcessorRuntimeTask( - mockSpec, appAttemptNumber, - new Configuration(), new String[]{"/"}, - tezUmbilical, null, null, null, null, "", null, 1024, false, new DefaultHadoopShim()); - + task = new LogicalIOProcessorRuntimeTask(mockSpec, appAttemptNumber, new Configuration(), + new String[]{"/"}, tezUmbilical, null, null, null, null, "", null, 1024, false, + new DefaultHadoopShim(), sharedExecutor); + LogicalIOProcessorRuntimeTask runtimeTask = spy(task); Map<String, String> auxEnv = new HashMap<String, String>(); @@ -240,7 +245,7 @@ public class TestOnFileUnorderedKVOutput { appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, -1, taskAttemptID, 0, userPayload, runtimeTask, null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null, - new ExecutionContextImpl("localhost"), 2048); + new ExecutionContextImpl("localhost"), 2048, new TezSharedExecutor(defaultConf)); verify(runtimeTask, times(1)).addAndGetTezCounter(destinationVertexName); verify(runtimeTask, times(1)).getTaskStatistics(); // verify output stats object got created
