Repository: tez Updated Branches: refs/heads/master 406721ab1 -> 8b412ee66
TEZ-2836. Avoid setting framework/system counters for tasks running in threads. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b412ee6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b412ee6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b412ee6 Branch: refs/heads/master Commit: 8b412ee66fe042db60a567ff71639839af5fa854 Parents: 406721a Author: Siddharth Seth <[email protected]> Authored: Mon Sep 28 16:10:11 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Sep 28 16:10:11 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/launcher/LocalContainerLauncher.java | 2 +- .../tez/service/impl/ContainerRunnerImpl.java | 4 +- .../tez/mapreduce/processor/MapUtils.java | 2 +- .../processor/reduce/TestReduceProcessor.java | 2 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 5 +- .../org/apache/tez/runtime/RuntimeTask.java | 12 ++- .../org/apache/tez/runtime/task/TezChild.java | 14 ++-- .../apache/tez/runtime/task/TezTaskRunner2.java | 8 +- .../TestLogicalIOProcessorRuntimeTask.java | 7 +- .../tez/runtime/task/TestTaskExecution2.java | 77 +++++++++++++++++--- 11 files changed, 99 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 01fa23e..d219127 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2836. Avoid setting framework/system counters for tasks running in threads. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2833. Dont create extra directory during ATS file download TEZ-2834. Make Tez preemption resilient to incorrect free resource reported http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 6cd6fce..9267f00 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -356,7 +356,7 @@ public class LocalContainerLauncher extends ContainerLauncher { TezChild tezChild = TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials, - memAvailable, context.getUser(), tezTaskUmbilicalProtocol); + memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false); return tezChild; } http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index fb4c08f..ad05af9 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -305,7 +305,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getContainerIdString(), request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs, envMap, objectRegistry, pid, - executionContext, credentials, memoryAvailable, request.getUser(), null); + executionContext, credentials, memoryAvailable, request.getUser(), null, false); ContainerExecutionResult result = tezChild.run(); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); @@ -449,7 +449,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memoryAvailable); + executionContext, memoryAvailable, false); boolean shouldDie; try { http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 8841882..71aa87c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -232,7 +232,7 @@ public class MapUtils { serviceConsumerMetadata, envMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory()); + Runtime.getRuntime().maxMemory(), true); return task; } } http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index fcb42b3..db78b6e 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -224,7 +224,7 @@ public class TestReduceProcessor { serviceConsumerMetadata, serviceProviderEnvMap, HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory()); + Runtime.getRuntime().maxMemory(), true); List<Event> destEvents = new LinkedList<Event>(); destEvents.add(dme); http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 5b0e62f..5db96c5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -156,10 +156,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry, - String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException { + String pid, ExecutionContext ExecutionContext, long memAvailable, + boolean updateSysCounters) throws IOException { // Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method. // TODO Remove jobToken from here post TEZ-421 - super(taskSpec, tezConf, tezUmbilical, pid); + super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters); LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec); int numInputs = taskSpec.getInputs().size(); http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 33c0113..c9c6ba1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -56,7 +56,7 @@ public abstract class RuntimeTask { private final TaskStatistics statistics; protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, - TezUmbilical tezUmbilical, String pid) { + TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) { this.taskSpec = taskSpec; this.tezConf = tezConf; this.tezUmbilical = tezUmbilical; @@ -67,7 +67,11 @@ public abstract class RuntimeTask { this.progress = 0.0f; this.taskDone = new AtomicBoolean(false); this.statistics = new TaskStatistics(); - this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid); + if (setupSysCounterUpdater) { + this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid); + } else { + this.counterUpdater = null; + } } protected enum State { @@ -160,7 +164,9 @@ public abstract class RuntimeTask { } public void setFrameworkCounters() { - this.counterUpdater.updateCounters(); + if (counterUpdater != null) { + this.counterUpdater.updateCounters(); + } } protected void setTaskDone() { http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index edc8208..e9b48f4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -109,6 +109,7 @@ public class TezChild { private final long memAvailable; private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; + private final boolean updateSysCounters; private Multimap<String, String> startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -123,8 +124,8 @@ public class TezChild { Map<String, String> serviceProviderEnvMap, ObjectRegistryImpl objectRegistry, String pid, ExecutionContext executionContext, - Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical) - throws IOException, InterruptedException { + Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical, + boolean updateSysCounters) throws IOException, InterruptedException { this.defaultConf = conf; this.containerIdString = containerIdentifier; this.appAttemptNumber = appAttemptNumber; @@ -136,6 +137,7 @@ public class TezChild { this.credentials = credentials; this.memAvailable = memAvailable; this.user = user; + this.updateSysCounters = updateSysCounters; getTaskMaxSleepTime = defaultConf.getInt( TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX, @@ -248,7 +250,7 @@ public class TezChild { TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI, localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, - executor, objectRegistry, pid, executionContext, memAvailable); + executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters); boolean shouldDie; try { TaskRunner2Result result = taskRunner.run(); @@ -433,7 +435,7 @@ public class TezChild { String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory, Map<String, String> serviceProviderEnvMap, @Nullable String pid, ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, - TezTaskUmbilicalProtocol tezUmbilical) + TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters) throws IOException, InterruptedException, TezException { // Pull in configuration specified for the session. @@ -446,7 +448,7 @@ public class TezChild { return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid, - executionContext, credentials, memAvailable, user, tezUmbilical); + executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters); } public static void main(String[] args) throws IOException, InterruptedException, TezException { @@ -482,7 +484,7 @@ public class TezChild { tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()), System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), credentials, Runtime.getRuntime().maxMemory(), System - .getenv(ApplicationConstants.Environment.USER.toString()), null); + .getenv(ApplicationConstants.Environment.USER.toString()), null, true); tezChild.run(); } http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 7fd4c75..4fdc17d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -55,7 +55,8 @@ public class TezTaskRunner2 { private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class); - private final LogicalIOProcessorRuntimeTask task; + @VisibleForTesting + final LogicalIOProcessorRuntimeTask task; private final UserGroupInformation ugi; private final TaskReporterInterface taskReporter; @@ -100,7 +101,8 @@ public class TezTaskRunner2 { Multimap<String, String> startedInputsMap, TaskReporterInterface taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid, - ExecutionContext executionContext, long memAvailable) throws + ExecutionContext executionContext, long memAvailable, + boolean updateSysCounters) throws IOException { this.ugi = ugi; this.taskReporter = taskReporter; @@ -108,7 +110,7 @@ public class TezTaskRunner2 { this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler(); this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, - objectRegistry, pid, executionContext, memAvailable); + objectRegistry, pid, executionContext, memAvailable, updateSysCounters); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index 0acb7b8..0fc3919 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -35,7 +34,6 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -44,7 +42,6 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Reader; @@ -85,7 +82,7 @@ public class TestLogicalIOProcessorRuntimeTask { LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, - "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); + "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true); try { lio1.initialize(); @@ -113,7 +110,7 @@ public class TestLogicalIOProcessorRuntimeTask { tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null, umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null, - "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); + "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true); try { lio2.initialize(); lio2.run(); http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 2123757..989753b 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListeningExecutorService; @@ -49,6 +50,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -58,6 +64,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; @@ -150,7 +157,8 @@ public class TestTaskExecution2 { TaskReporter taskReporter = createTaskReporter(appId, umbilical); TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, - TestProcessor.CONF_EMPTY); + TestProcessor.CONF_EMPTY, true); + LogicalIOProcessorRuntimeTask runtimeTask = taskRunner.task; // Setup the executor Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit( new TaskRunnerCallable2ForTest(taskRunner)); @@ -162,9 +170,12 @@ public class TestTaskExecution2 { umbilical.verifyTaskSuccessEvent(); assertFalse(TestProcessor.wasAborted()); umbilical.resetTrackedEvents(); + TezCounters tezCounters = runtimeTask.getCounters(); + verifySysCounters(tezCounters, 5, 5); taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, - TestProcessor.CONF_EMPTY); + TestProcessor.CONF_EMPTY, false); + runtimeTask = taskRunner.task; // Setup the executor taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); // Signal the processor to go through @@ -174,11 +185,14 @@ public class TestTaskExecution2 { assertNull(taskReporter.currentCallable); umbilical.verifyTaskSuccessEvent(); assertFalse(TestProcessor.wasAborted()); + tezCounters = runtimeTask.getCounters(); + verifySysCounters(tezCounters, -1, -1); } finally { executor.shutdownNow(); } } + // test task failed due to exception in Processor @Test(timeout = 5000) public void testFailedTaskTezException() throws IOException, InterruptedException, TezException, @@ -231,7 +245,7 @@ public class TestTaskExecution2 { TaskReporter taskReporter = createTaskReporter(appId, umbilical); TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, - "NotExitedProcessor", TestProcessor.CONF_EMPTY, false); + "NotExitedProcessor", TestProcessor.CONF_EMPTY, false, true); // Setup the executor Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); @@ -484,6 +498,35 @@ public class TestTaskExecution2 { } } + private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) { + + Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) || + (minTaskCounterCount <= 0 && minFsCounterCount <= 0), + "Both targetCounter counts should be postitive or negative. A mix is not expected"); + + int numTaskCounters = 0; + int numFsCounters = 0; + for (CounterGroup counterGroup : tezCounters) { + if (counterGroup.getName().equals(TaskCounter.class.getName())) { + for (TezCounter ignored : counterGroup) { + numTaskCounters++; + } + } else if (counterGroup.getName().equals(FileSystemCounter.class.getName())) { + for (TezCounter ignored : counterGroup) { + numFsCounters++; + } + } + } + + // If Target <=0, assert counter count is exactly 0 + if (minTaskCounterCount <= 0) { + assertEquals(0, numTaskCounters); + assertEquals(0, numFsCounters); + } else { + assertTrue(numTaskCounters >= minTaskCounterCount); + assertTrue(numFsCounters >= minFsCounterCount); + } + } private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result, EndReason expectedEndReason, Throwable expectedThrowable, @@ -530,10 +573,20 @@ public class TestTaskExecution2 { private TezTaskRunner2 createTaskRunner(ApplicationId appId, TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, TaskReporter taskReporter, - ListeningExecutorService executor, byte[] processorConf) + ListeningExecutorService executor, byte[] processorConf) throws + IOException { + return createTaskRunner(appId, umbilical, taskReporter, executor, processorConf, true); + + } + + private TezTaskRunner2 createTaskRunner(ApplicationId appId, + TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, + TaskReporter taskReporter, + ListeningExecutorService executor, byte[] processorConf, + boolean updateSysCounters) throws IOException { return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(), - processorConf, false); + processorConf, false, updateSysCounters); } private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId, @@ -544,14 +597,15 @@ public class TestTaskExecution2 { throws IOException { return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(), - processorConf, true); + processorConf, true, true); } private TezTaskRunner2 createTaskRunner(ApplicationId appId, TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, - byte[] processorConf, boolean testRunner) throws + byte[] processorConf, boolean testRunner, + boolean updateSysCounters) throws IOException { TezConfiguration tezConf = new TezConfiguration(defaultConf); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -574,13 +628,13 @@ public class TestTaskExecution2 { new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String>create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory()); + Runtime.getRuntime().maxMemory(), updateSysCounters); } else { taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1, new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String>create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), - Runtime.getRuntime().maxMemory()); + Runtime.getRuntime().maxMemory(), updateSysCounters); } return taskRunner; @@ -604,10 +658,11 @@ public class TestTaskExecution2 { ObjectRegistry objectRegistry, String pid, ExecutionContext executionContext, - long memAvailable) throws IOException { + long memAvailable, + boolean updateSysCounters) throws IOException { super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memAvailable); + executionContext, memAvailable, updateSysCounters); }
