Repository: tez Updated Branches: refs/heads/master 8c4407798 -> 51972efec
TEZ-3691. Setup fetchers to use shared executor. (harishjp) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/51972efe Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/51972efe Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/51972efe Branch: refs/heads/master Commit: 51972efece985ae1ad7e3ca5250f2d38a768c528 Parents: 8c44077 Author: Harish JP <[email protected]> Authored: Wed May 24 11:56:09 2017 +0530 Committer: Harish JP <[email protected]> Committed: Wed May 24 11:56:09 2017 +0530 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 8 +++ .../common/shuffle/impl/ShuffleManager.java | 17 ++++--- .../orderedgrouped/ShuffleScheduler.java | 14 ++++-- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../impl/TestShuffleInputEventHandlerImpl.java | 36 +++++++++++-- .../common/shuffle/impl/TestShuffleManager.java | 53 +++++++++++++++++--- .../shuffle/orderedgrouped/TestShuffle.java | 32 ++++++++++-- ...tShuffleInputEventHandlerOrderedGrouped.java | 26 +++++++++- .../orderedgrouped/TestShuffleScheduler.java | 50 +++++++++++++++++- 10 files changed, 213 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 2eec276..23f1f9b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -427,6 +427,13 @@ public class TezRuntimeConfiguration { public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT = false; + @Private + @Unstable + @ConfigurationProperty(type = "boolean") + public static final String TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL = TEZ_RUNTIME_PREFIX + + "shuffle.fetcher.use-shared-pool"; + public static final boolean TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT = false; + @ConfigurationProperty(type = "float") public static final String TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX + "task.input.post-merge.buffer.percent"; @@ -601,6 +608,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 8716b92..d9cdae4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -233,13 +233,18 @@ public class ShuffleManager implements FetcherCallback { TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT); this.numFetchers = Math.min(maxConfiguredFetchers, numInputs); - - ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool( - numFetchers, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build()); + + final ExecutorService fetcherRawExecutor; + if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) { + fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers, + "Fetcher_B {" + srcNameTrimmed + "} #%d"); + } else { + fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build()); + } this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); - + ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build()); this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor); http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 39f2138..0e05bd8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -54,6 +54,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; @@ -374,9 +375,15 @@ class ShuffleScheduler { .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret); - ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); + final ExecutorService fetcherRawExecutor; + if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) { + fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers, + "Fetcher_O {" + srcNameTrimmed + "} #%d"); + } else { + fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Fetcher_O {" + srcNameTrimmed + "} #%d").build()); + } this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5); @@ -1340,7 +1347,6 @@ class ShuffleScheduler { @Override protected Void callInternal() throws InterruptedException { - outer: while (!isShutdown.get() && remainingMaps.get() > 0) { synchronized (ShuffleScheduler.this) { if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) { http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 8e653ed..6c12a99 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -392,6 +392,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); confKeys.add(TezRuntimeConfiguration .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index c085844..9a46cbd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -268,6 +268,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index af52f90..d8f2e25 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -26,6 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,13 +35,16 @@ import java.util.BitSet; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; @@ -47,14 +52,17 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; -import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.protobuf.ByteString; @@ -65,6 +73,18 @@ public class TestShuffleInputEventHandlerImpl { private static final String PATH_COMPONENT = "attempttmp"; private final Configuration conf = new Configuration(); + private TezExecutors sharedExecutor; + + @Before + public void setup() { + sharedExecutor = new TezSharedExecutor(conf); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); + } + @Test(timeout = 5000) public void testSimple() throws IOException { InputContext inputContext = mock(InputContext.class); @@ -159,6 +179,7 @@ public class TestShuffleInputEventHandlerImpl { DataOutputBuffer port_dob = new DataOutputBuffer(); port_dob.writeInt(PORT); final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + port_dob.close(); ExecutionContext executionContext = mock(ExecutionContext.class); doReturn(HOST).when(executionContext).getHostName(); @@ -169,10 +190,18 @@ public class TestShuffleInputEventHandlerImpl { doReturn(shuffleMetaData).when(inputContext) .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); doReturn(executionContext).when(inputContext).getExecutionContext(); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer<ExecutorService>() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } - @SuppressWarnings("unchecked") private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException { Path outDirBase = new Path(".", "outDir"); String[] outDirs = new String[] { outDirBase.toString() }; @@ -180,7 +209,8 @@ public class TestShuffleInputEventHandlerImpl { conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, inputContext.getWorkDirs()); DataOutputBuffer out = new DataOutputBuffer(); - Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), new JobTokenSecretManager(null)); + Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(), + new JobTokenSecretManager(null)); token.write(out); doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).getServiceConsumerMetaData( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index a5608ef..a6a5274 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -20,19 +20,23 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; +import java.util.concurrent.ExecutorService; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -40,7 +44,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; @@ -49,6 +55,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; @@ -57,6 +64,8 @@ import org.apache.tez.runtime.library.common.shuffle.FetchResult; import org.apache.tez.runtime.library.common.shuffle.InputHost; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -68,6 +77,17 @@ public class TestShuffleManager { private static final int PORT = 8080; private static final String PATH_COMPONENT = "attempttmp"; private final Configuration conf = new Configuration(); + private TezExecutors sharedExecutor; + + @Before + public void setup() { + sharedExecutor = new TezSharedExecutor(conf); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); + } /** * One reducer fetches multiple partitions from each mapper. @@ -136,6 +156,7 @@ public class TestShuffleManager { port_dob.writeInt(PORT); final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); + port_dob.close(); ExecutionContext executionContext = mock(ExecutionContext.class); doReturn(FETCHER_HOST).when(executionContext).getHostName(); @@ -146,10 +167,30 @@ public class TestShuffleManager { doReturn(shuffleMetaData).when(inputContext) .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); doReturn(executionContext).when(inputContext).getExecutionContext(); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer<ExecutorService>() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } - @SuppressWarnings("unchecked") + @Test(timeout=5000) + public void testUseSharedExecutor() throws Exception { + InputContext inputContext = createInputContext(); + createShuffleManager(inputContext, 2); + verify(inputContext, times(0)).createTezFrameworkExecutorService(anyInt(), anyString()); + + inputContext = createInputContext(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, true); + createShuffleManager(inputContext, 2); + verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString()); + } + private ShuffleManagerForTest createShuffleManager( InputContext inputContext, int expectedNumOfPhysicalInputs) throws IOException { @@ -160,7 +201,7 @@ public class TestShuffleManager { inputContext.getWorkDirs()); DataOutputBuffer out = new DataOutputBuffer(); - Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), + Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(), new JobTokenSecretManager(null)); token.write(out); doReturn(ByteBuffer.wrap(out.getData())).when(inputContext). @@ -199,9 +240,9 @@ public class TestShuffleManager { conf)); final FetchResult mockFetcherResult = mock(FetchResult.class); try { - doAnswer(new Answer() { + doAnswer(new Answer<FetchResult>() { @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public FetchResult answer(InvocationOnMock invocation) throws Throwable { for(InputAttemptIdentifier input : fetcher.getSrcAttempts()) { ShuffleManagerForTest.this.fetchSucceeded( fetcher.getHost(), input, new TestFetchedInput(input), 0, 0, http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java index 855aedf..cad9523 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java @@ -16,6 +16,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -26,11 +27,15 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; @@ -40,14 +45,26 @@ import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.Constants; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestShuffle { - private static final Logger LOG = LoggerFactory.getLogger(TestShuffle.class); + private TezExecutors sharedExecutor; + + @Before + public void setup() { + sharedExecutor = new TezSharedExecutor(new Configuration()); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); + } @Test(timeout = 10000) public void testSchedulerTerminatesOnException() throws IOException, InterruptedException { @@ -107,6 +124,15 @@ public class TestShuffle { new JobTokenSecretManager()); ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken); doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString()); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer<ExecutorService>() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } } http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 695a307..384a982 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -6,6 +6,8 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; @@ -18,8 +20,11 @@ import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; import java.nio.ByteBuffer; @@ -27,9 +32,11 @@ import java.util.BitSet; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutorService; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -65,6 +72,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { private ShuffleScheduler realScheduler; private MergeManager mergeManager; + private TezExecutors sharedExecutor; + private InputContext createTezInputContext() throws IOException { ApplicationId applicationId = ApplicationId.newInstance(1, 1); InputContext inputContext = mock(InputContext.class); @@ -79,6 +88,15 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new JobTokenSecretManager()); ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken); doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString()); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer<ExecutorService>() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } @@ -134,7 +152,13 @@ public class TestShuffleInputEventHandlerOrderedGrouped { @Before public void setup() throws Exception { - setupScheduler(2); + sharedExecutor = new TezSharedExecutor(new Configuration()); + setupScheduler(2); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); } private void setupScheduler(int numInputs) throws Exception { http://git-wip-us.apache.org/repos/asf/tez/blob/51972efe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 31da4d0..7b30bf3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -17,6 +17,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -41,6 +42,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; @@ -50,12 +53,25 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestShuffleScheduler { + private TezExecutors sharedExecutor; + + @Before + public void setup() { + sharedExecutor = new TezSharedExecutor(new Configuration()); + } + + @After + public void cleanup() { + sharedExecutor.shutdownNow(); + } @Test (timeout = 10000) public void testNumParallelScheduledFetchers() throws IOException, InterruptedException { @@ -106,6 +122,27 @@ public class TestShuffleScheduler { } } + @Test(timeout=5000) + public void testUseSharedExecutor() throws Exception { + InputContext inputContext = createTezInputContext(); + Configuration conf = new TezConfiguration(); + int numInputs = 10; + Shuffle shuffle = mock(Shuffle.class); + MergeManager mergeManager = mock(MergeManager.class); + + ShuffleSchedulerForTest scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs, + shuffle, mergeManager, mergeManager, System.currentTimeMillis(), null, false, 0, "srcName"); + verify(inputContext, times(0)).createTezFrameworkExecutorService(anyInt(), anyString()); + scheduler.close(); + + inputContext = createTezInputContext(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, true); + scheduler = new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager, + mergeManager, System.currentTimeMillis(), null, false, 0, "srcName"); + verify(inputContext).createTezFrameworkExecutorService(anyInt(), anyString()); + scheduler.close(); + } + @Test(timeout = 5000) public void testSimpleFlow() throws Exception { InputContext inputContext = createTezInputContext(); @@ -909,6 +946,15 @@ public class TestShuffleScheduler { new JobTokenSecretManager()); ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData(sessionToken); doReturn(tokenBuffer).when(inputContext).getServiceConsumerMetaData(anyString()); + when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer( + new Answer<ExecutorService>() { + @Override + public ExecutorService answer(InvocationOnMock invocation) throws Throwable { + return sharedExecutor.createExecutorService( + invocation.getArgumentAt(0, Integer.class), + invocation.getArgumentAt(1, String.class)); + } + }); return inputContext; } @@ -948,9 +994,9 @@ public class TestShuffleScheduler { FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { numFetchersCreated.incrementAndGet(); FetcherOrderedGrouped mockFetcher = mock(FetcherOrderedGrouped.class); - doAnswer(new Answer() { + doAnswer(new Answer<Void>() { @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public Void answer(InvocationOnMock invocation) throws Throwable { if (fetcherShouldWait) { Thread.sleep(100000l); }
