TEZ-2774. Improvements and cleanup of logging for the AM and parts of the runtime. Contributed by Siddharth Seth and Bikas Saha.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f785ce8d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f785ce8d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f785ce8d Branch: refs/heads/master Commit: f785ce8d8653a469c8c6e6a9bbcfcff40c6e1289 Parents: d93bdc7 Author: Siddharth Seth <[email protected]> Authored: Tue Sep 15 13:20:37 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue Sep 15 13:20:37 2015 -0700 ---------------------------------------------------------------------- .../org/apache/tez/common/AsyncDispatcher.java | 14 +- .../tez/common/AsyncDispatcherConcurrent.java | 9 +- .../org/apache/tez/common/TezUtilsInternal.java | 14 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 27 +++- .../tez/dag/app/TaskCommunicatorManager.java | 7 +- .../app/dag/RootInputInitializerManager.java | 2 +- .../java/org/apache/tez/dag/app/dag/Vertex.java | 2 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 34 +++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 128 +++++++++--------- .../app/launcher/TezContainerLauncherImpl.java | 8 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 19 +-- .../dag/app/rm/YarnTaskSchedulerService.java | 132 +++++++++++-------- .../app/rm/container/AMContainerHelpers.java | 10 +- .../dag/app/rm/container/AMContainerImpl.java | 18 +-- .../dag/app/rm/node/PerSourceNodeTracker.java | 2 + .../tez/dag/history/HistoryEventHandler.java | 8 +- .../events/TaskAttemptFinishedEvent.java | 12 +- .../history/events/TaskAttemptStartedEvent.java | 4 +- .../impl/SimpleHistoryLoggingService.java | 4 +- .../dag/history/recovery/RecoveryService.java | 8 +- .../resources/tez-container-log4j.properties | 2 +- .../mapreduce/committer/MROutputCommitter.java | 3 +- .../common/MRInputAMSplitGenerator.java | 31 ++--- .../common/MRInputSplitDistributor.java | 7 +- .../tez/mapreduce/hadoop/MRInputHelpers.java | 14 +- .../tez/mapreduce/partition/MRPartitioner.java | 18 ++- .../logging/ats/ATSHistoryLoggingService.java | 9 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 50 ++++--- .../runtime/api/impl/TezInputContextImpl.java | 4 +- .../runtime/api/impl/TezOutputContextImpl.java | 4 +- .../api/impl/TezProcessorContextImpl.java | 4 +- .../common/resources/MemoryDistributor.java | 76 +++++++++-- .../tez/runtime/metrics/TaskCounterUpdater.java | 4 +- .../tez/runtime/task/ContainerReporter.java | 2 +- .../org/apache/tez/runtime/task/TezChild.java | 38 +++--- .../runtime/library/common/TezRuntimeUtils.java | 1 - .../WeightedScalingMemoryDistributor.java | 8 +- .../tez/mapreduce/examples/RPCLoadGen.java | 2 - 39 files changed, 449 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index 4319f4f..159ccd9 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -130,7 +130,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { @Override protected void serviceStart() throws Exception { eventHandlingThread = new Thread(createThread()); - eventHandlingThread.setName("Dispatcher thread: " + name); + eventHandlingThread.setName("Dispatcher thread {" + name + "}"); eventHandlingThread.start(); //start all the components @@ -211,7 +211,7 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { private void checkForExistingConcurrentDispatcher(Class<? extends Enum> eventType) { AsyncDispatcherConcurrent concurrentDispatcher = concurrentEventDispatchers.get(eventType); - Preconditions.checkState(concurrentDispatcher == null, + Preconditions.checkState(concurrentDispatcher == null, "Multiple concurrent dispatchers cannot be registered for: " + eventType.getName()); } @@ -259,7 +259,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { /* check to see if we have a listener registered */ checkForExistingDispatchers(true, eventType); - LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass()); + LOG.info( + "Registering " + eventType + " for independent dispatch using: " + handler.getClass()); AsyncDispatcher dispatcher = new AsyncDispatcher(dispatcherName); dispatcher.register(eventType, handler); eventDispatchers.put(eventType, dispatcher); @@ -272,7 +273,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { /* check to see if we have a listener registered */ checkForExistingDispatchers(true, eventType); - LOG.info("Registering " + eventType + " for concurrent dispatch using: " + handler.getClass()); + LOG.info( + "Registering " + eventType + " for concurrent dispatch using: " + handler.getClass()); AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads); dispatcher.register(eventType, handler); concurrentEventDispatchers.put(eventType, dispatcher); @@ -286,8 +288,8 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { /* check to see if we have a listener registered */ checkForExistingDispatchers(true, eventType); - LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: " - + handler.getClass()); + LOG.info("Registering " + eventType + " with existing concurrent dispatch using: " + + handler.getClass()); dispatcher.register(eventType, handler); concurrentEventDispatchers.put(eventType, dispatcher); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java index d19bf9e..321ea8b 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcherConcurrent.java @@ -136,7 +136,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa @Override protected void serviceStart() throws Exception { execService = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Dispatcher [" + this.name + "] #%d").build()); + .setNameFormat("Dispatcher {" + this.name + "} #%d").build()); for (int i=0; i<numThreads; ++i) { eventQueues.add(new LinkedBlockingQueue<Event>()); } @@ -215,7 +215,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa private void checkForExistingDispatcher(Class<? extends Enum> eventType) { AsyncDispatcherConcurrent registeredDispatcher = eventDispatchers.get(eventType); - Preconditions.checkState(registeredDispatcher == null, + Preconditions.checkState(registeredDispatcher == null, "Multiple dispatchers cannot be registered for: " + eventType.getName()); } @@ -263,7 +263,8 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa /* check to see if we have a listener registered */ checkForExistingDispatchers(true, eventType); - LOG.info("Registering " + eventType + " for independent dispatch using: " + handler.getClass()); + LOG.info( + "Registering " + eventType + " for independent dispatch using: " + handler.getClass()); AsyncDispatcherConcurrent dispatcher = new AsyncDispatcherConcurrent(dispatcherName, numThreads); dispatcher.register(eventType, handler); eventDispatchers.put(eventType, dispatcher); @@ -278,7 +279,7 @@ public class AsyncDispatcherConcurrent extends CompositeService implements Dispa /* check to see if we have a listener registered */ checkForExistingDispatchers(true, eventType); LOG.info("Registering " + eventType + " wit existing concurrent dispatch using: " - + handler.getClass()); + + handler.getClass()); dispatcher.register(eventType, handler); eventDispatchers.put(eventType, dispatcher); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index d6ef901..c2a50f5 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -109,13 +109,10 @@ public class TezUtilsInternal { public static byte[] compressBytes(byte[] inBytes) throws IOException { - Stopwatch sw = null; - if (LOG.isDebugEnabled()) { - sw = new Stopwatch().start(); - } + Stopwatch sw = new Stopwatch().start(); byte[] compressed = compressBytesInflateDeflate(inBytes); + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length + ", CompressTime: " + sw.elapsedMillis()); } @@ -123,13 +120,10 @@ public class TezUtilsInternal { } public static byte[] uncompressBytes(byte[] inBytes) throws IOException { - Stopwatch sw = null; - if (LOG.isDebugEnabled()) { - sw = new Stopwatch().start(); - } + Stopwatch sw = new Stopwatch().start(); byte[] uncompressed = uncompressBytesInflateDeflate(inBytes); + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length + ", UncompressTimeTaken: " + sw.elapsedMillis()); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index fee13c1..c713435 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -467,7 +467,6 @@ public class DAGAppMaster extends AbstractService { // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. - LOG.info("Adding session token to jobTokenSecretManager for application"); jobTokenSecretManager.addTokenForJob( appAttemptID.getApplicationId().toString(), sessionToken); @@ -495,8 +494,11 @@ public class DAGAppMaster extends AbstractService { dispatcher.register(DAGAppMasterEventType.class, new DAGAppMasterEventHandler()); dispatcher.register(DAGEventType.class, dagEventDispatcher); dispatcher.register(VertexEventType.class, vertexEventDispatcher); - if (!conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, - TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT)) { + boolean useConcurrentDispatcher = + conf.getBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, + TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT); + LOG.info("Using concurrent dispatcher: " + useConcurrentDispatcher); + if (!useConcurrentDispatcher) { dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); } else { @@ -560,7 +562,7 @@ public class DAGAppMaster extends AbstractService { currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, appAttemptID.getAttemptId()); if (LOG.isDebugEnabled()) { - LOG.info("Stage directory information for AppAttemptId :" + this.appAttemptID + LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir + " recoveryAttemptDir :" + currentRecoveryDataDir); } @@ -926,7 +928,7 @@ public class DAGAppMaster extends AbstractService { try { if (LOG.isDebugEnabled()) { - LOG.info("JSON dump for submitted DAG, dagId=" + dagId.toString() + LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString() + ", json=" + DAGUtils.generateSimpleJSONPlan(dagPB).toString()); } @@ -2102,6 +2104,7 @@ public class DAGAppMaster extends AbstractService { public static void main(String[] args) { try { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + final String pid = System.getenv().get("JVM_PID"); String containerIdStr = System.getenv(Environment.CONTAINER_ID.name()); String nodeHostString = System.getenv(Environment.NM_HOST.name()); @@ -2141,6 +2144,18 @@ public class DAGAppMaster extends AbstractService { false, "Run Tez Application Master in Session mode"); CommandLine cliParser = new GnuParser().parse(opts, args); + boolean sessionModeCliOption = cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION); + + LOG.info("Creating DAGAppMaster for " + + "applicationId=" + applicationAttemptId.getApplicationId() + + ", attemptNum=" + applicationAttemptId.getAttemptId() + + ", AMContainerId=" + containerId + + ", jvmPid=" + pid + + ", userFromEnv=" + jobUserName + + ", cliSessionOption=" + sessionModeCliOption + + ", pwd=" + System.getenv(Environment.PWD.name()) + + ", localDirs=" + System.getenv(Environment.LOCAL_DIRS.name()) + + ", logDirs=" + System.getenv(Environment.LOG_DIRS.name())); // TODO Does this really need to be a YarnConfiguration ? Configuration conf = new Configuration(new YarnConfiguration()); @@ -2161,7 +2176,7 @@ public class DAGAppMaster extends AbstractService { new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, - cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION), + sessionModeCliOption, System.getenv(Environment.PWD.name()), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())), http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 2cc6ae2..0bc02dc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -155,13 +155,13 @@ public class TaskCommunicatorManager extends AbstractService implements @VisibleForTesting TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - LOG.info("Using Default Task Communicator"); + LOG.info("Creating Default Task Communicator"); return new TezTaskCommunicatorImpl(taskCommunicatorContext); } @VisibleForTesting TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { - LOG.info("Using Default Local Task Communicator"); + LOG.info("Creating Default Local Task Communicator"); return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext); } @@ -169,7 +169,7 @@ public class TaskCommunicatorManager extends AbstractService implements TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, NamedEntityDescriptor taskCommDescriptor) throws TezException { - LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), + LOG.info("Creating TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName()); Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils @@ -322,7 +322,6 @@ public class TaskCommunicatorManager extends AbstractService implements */ // @Override public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { - LOG.info("Commit go/no-go request from " + taskAttemptId.toString()); // An attempt is asking if it can commit its output. This can be decided // only by the task which is managing the multiple attempts. So redirect the // request there. http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 4a8a286..13128f8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -100,7 +100,7 @@ public class RootInputInitializerManager { this.vertex = vertex; this.eventHandler = appContext.getEventHandler(); this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build()); + .setDaemon(true).setNameFormat("InputInitializer {" + this.vertex.getName() + "} #%d").build()); this.executor = MoreExecutors.listeningDecorator(rawExecutor); this.dagUgi = dagUgi; this.entityStateTracker = stateTracker; http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index c5a3c35..d2801e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -182,6 +182,8 @@ public interface Vertex extends Comparable<Vertex> { public Configuration getConf(); + public boolean isSpeculationEnabled(); + public int getTaskSchedulerIdentifier(); public int getContainerLauncherIdentifier(); public int getTaskCommunicatorIdentifier(); http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index abcd98d..2f228bd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -782,10 +782,12 @@ public class TaskAttemptImpl implements TaskAttempt, ); } if (oldState != getInternalState()) { - LOG.info(attemptId + " TaskAttempt Transitioned from " - + oldState + " to " - + getInternalState() + " due to event " - + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug(attemptId + " TaskAttempt Transitioned from " + + oldState + " to " + + getInternalState() + " due to event " + + event.getType()); + } } } finally { writeLock.unlock(); @@ -1116,7 +1118,9 @@ public class TaskAttemptImpl implements TaskAttempt, TaskSpec remoteTaskSpec; try { remoteTaskSpec = ta.createRemoteTaskSpec(); - LOG.info("remoteTaskSpec:" + remoteTaskSpec); + if (LOG.isDebugEnabled()) { + LOG.debug("remoteTaskSpec:" + remoteTaskSpec); + } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta; LOG.error(msg, e); http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 4d449d4..2f304c8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -497,7 +497,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { int toEventId = actualMax + fromEventId; events = new ArrayList<TezEvent>(tezEventsForTaskAttempts.subList(fromEventId, toEventId)); LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId - + "-" + toEventId + ")"); + + "-" + toEventId + ")."); // currently not modifying the events so that we dont have to create // copies of events. e.g. if we have to set taskAttemptId into the TezEvent // destination metadata then we will need to create a copy of the TezEvent @@ -756,12 +756,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public boolean canCommit(TezTaskAttemptID taskAttemptID) { writeLock.lock(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("Commit go/no-go request from " + taskAttemptID); + } TaskState state = getState(); if (state == TaskState.SCHEDULED) { // the actual running task ran and is done and asking for commit. we are still stuck // in the scheduled state which indicates a backlog in event processing. lets wait for the // backlog to clear. returning false will make the attempt come back to us. - LOG.debug("Event processing delay. " + LOG.info( + "Event processing delay. " + "Attempt committing before state machine transitioned to running : Task {}", taskId); return false; } @@ -792,7 +796,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } else { if (commitAttempt.equals(taskAttemptID)) { - LOG.info(taskAttemptID + " given a go for committing the task output."); + if (LOG.isDebugEnabled()) { + LOG.debug(taskAttemptID + " already given a go for committing the task output."); + } return true; } // Don't think this can be a pluggable decision, so simply raise an @@ -800,9 +806,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // Wait for commit attempt to succeed. Dont kill this. If commit // attempt fails then choose a different committer. When commit attempt // succeeds then this and others will be killed - LOG.info(commitAttempt - + " is current committer. Commit waiting for: " - + taskAttemptID); + if (LOG.isDebugEnabled()) { + LOG.debug(commitAttempt + " is current committer. Commit waiting for: " + taskAttemptID); + } return false; } @@ -810,7 +816,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { writeLock.unlock(); } } - + TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext, @@ -895,9 +901,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { internalError(event.getType()); } if (oldState != getInternalState()) { - LOG.info(taskId + " Task Transitioned from " + oldState + " to " - + getInternalState() + " due to event " - + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug(taskId + " Task Transitioned from " + oldState + " to " + + getInternalState() + " due to event " + + event.getType()); + } } } finally { writeLock.unlock(); @@ -1108,7 +1116,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { // other reasons. !attempt.isFinished()) { LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " + - task.successfulAttempt + " has succeeded"); + task.successfulAttempt + " has succeeded"); String diagnostics = null; TaskAttemptTerminationCause errCause = null; if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) { @@ -1466,7 +1474,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } else { // nothing to do LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " + - task.successfulAttempt + " is already successful"); + task.successfulAttempt + " is already successful"); return TaskStateInternal.SUCCEEDED; } } @@ -1509,7 +1517,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) { if (commitAttempt != null && commitAttempt.equals(attempt.getID())) { - LOG.info("Removing commit attempt: " + commitAttempt); + LOG.info("Unsetting commit attempt: " + commitAttempt + " since attempt is being killed"); commitAttempt = null; } if (attempt != null && !attempt.isFinished()) { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 946ec19..c9b4205 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -913,6 +913,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.clock = clock; this.appContext = appContext; this.commitVertexOutputs = commitVertexOutputs; + this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]"; this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface; this.taskHeartbeatHandler = thh; @@ -971,6 +972,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl this.containerContext = new ContainerContext(this.localResources, appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this); + LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource); if (vertexPlan.getInputsCount() > 0) { setAdditionalInputs(vertexPlan.getInputsList()); @@ -993,7 +995,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl speculator = new LegacySpeculator(vertexConf, getAppContext(), this); } - logIdentifier = this.getVertexId() + " [" + this.getName() + "]"; + // This "this leak" is okay because the retained pointer is in an // instance variable. @@ -1033,16 +1035,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName + - ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName); - - taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); - taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName); - containerLauncherIdentifier = appContext.getContainerLauncherIdentifier(containerLauncherName); - - Preconditions.checkNotNull(taskSchedulerIdentifier, "Unknown taskScheduler: " + taskSchedulerName); - Preconditions.checkNotNull(taskCommunicatorIdentifier, "Unknown taskCommunicator: " + containerLauncherName); - Preconditions.checkNotNull(containerLauncherIdentifier, "Unknown containerLauncher: " + taskCommName); + try { + taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); + } catch (Exception e) { + LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName); + throw e; + } + try { + taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName); + } catch (Exception e) { + LOG.error("Failed to get index for taskCommunicator: " + taskCommName); + throw e; + } + try { + containerLauncherIdentifier = + appContext.getContainerLauncherIdentifier(containerLauncherName); + } catch (Exception e) { + LOG.error("Failed to get index for containerLauncher: " + containerLauncherName); + throw e; + } StringBuilder sb = new StringBuilder(); sb.append("Running vertex: ").append(logIdentifier).append(" : ") @@ -1076,7 +1087,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return this.taskCommunicatorIdentifier; } - private boolean isSpeculationEnabled() { + @Override + public boolean isSpeculationEnabled() { return isSpeculationEnabled; } @@ -2110,29 +2122,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } + private static String constructCheckTasksForCompletionLog(VertexImpl vertex) { + String logLine = vertex.logIdentifier + + ", tasks=" + vertex.numTasks + + ", failed=" + vertex.failedTaskCount + + ", killed=" + vertex.killedTaskCount + + ", success=" + vertex.succeededTaskCount + + ", completed=" + vertex.completedTaskCount + + ", commits=" + vertex.commitFutures.size() + + ", err=" + vertex.terminationCause; + return logLine; + } + // triggered by task_complete static VertexState checkTasksForCompletion(final VertexImpl vertex) { - - LOG.info("Checking tasks for vertex completion for " - + vertex.logIdentifier - + ", numTasks=" + vertex.numTasks - + ", failedTaskCount=" + vertex.failedTaskCount - + ", killedTaskCount=" + vertex.killedTaskCount - + ", successfulTaskCount=" + vertex.succeededTaskCount - + ", completedTaskCount=" + vertex.completedTaskCount - + ", commitInProgress=" + vertex.commitFutures.size() - + ", terminationCause=" + vertex.terminationCause); - + // this log helps quickly count the completion count for a vertex. + // grepping and counting for attempts and handling re-tries is time consuming + LOG.info("Task Completion: " + constructCheckTasksForCompletionLog(vertex)); //check for vertex failure first if (vertex.completedTaskCount > vertex.tasks.size()) { LOG.error("task completion accounting issue: completedTaskCount > nTasks:" - + " for vertex " + vertex.logIdentifier - + ", numTasks=" + vertex.numTasks - + ", failedTaskCount=" + vertex.failedTaskCount - + ", killedTaskCount=" + vertex.killedTaskCount - + ", successfulTaskCount=" + vertex.succeededTaskCount - + ", completedTaskCount=" + vertex.completedTaskCount - + ", terminationCause=" + vertex.terminationCause); + + constructCheckTasksForCompletionLog(vertex)); } if (vertex.completedTaskCount == vertex.tasks.size()) { @@ -2141,7 +2151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl //Only succeed if tasks complete successfully and no terminationCause is registered. if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) { - LOG.info("All tasks are succeeded, vertex:" + vertex.logIdentifier); + LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier); if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) { // start commit if there're commits or just finish if no commits return commitOrFinish(vertex); @@ -2159,16 +2169,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl //triggered by commit_complete static VertexState checkCommitsForCompletion(final VertexImpl vertex) { - LOG.info("Checking commits for vertex completion for " - + vertex.logIdentifier - + ", numTasks=" + vertex.numTasks - + ", failedTaskCount=" + vertex.failedTaskCount - + ", killedTaskCount=" + vertex.killedTaskCount - + ", successfulTaskCount=" + vertex.succeededTaskCount - + ", completedTaskCount=" + vertex.completedTaskCount - + ", commitInProgress=" + vertex.commitFutures.size() - + ", terminationCause=" + vertex.terminationCause); - + LOG.info("Commits completion: " + + constructCheckTasksForCompletionLog(vertex)); // terminationCause is null mean commit is succeeded, otherwise terminationCause will be set. if (vertex.terminationCause == null) { Preconditions.checkState(vertex.getState() == VertexState.COMMITTING, @@ -2289,20 +2291,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private void initializeCommitters() throws Exception { if (!this.additionalOutputSpecs.isEmpty()) { - LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier); + LOG.info("Setting up committers for vertex " + logIdentifier + ", numAdditionalOutputs=" + + additionalOutputs.size()); for (Entry<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> entry: additionalOutputs.entrySet()) { final String outputName = entry.getKey(); final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> od = entry.getValue(); if (od.getControllerDescriptor() == null || od.getControllerDescriptor().getClassName() == null) { - LOG.info("Ignoring committer as none specified for output=" - + outputName - + ", vertexId=" + logIdentifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring committer as none specified for output=" + + outputName + + ", vertexId=" + logIdentifier); + } continue; } LOG.info("Instantiating committer for output=" + outputName - + ", vertexId=" + logIdentifier + + ", vertex=" + logIdentifier + ", committerClass=" + od.getControllerDescriptor().getClassName()); dagUgi.doAs(new PrivilegedExceptionAction<Void>() { @@ -2319,12 +2324,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl .createClazzInstance(od.getControllerDescriptor().getClassName(), new Class[]{OutputCommitterContext.class}, new Object[]{outputCommitterContext}); - LOG.info("Invoking committer init for output=" + outputName - + ", vertexId=" + logIdentifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Invoking committer init for output=" + outputName + + ", vertex=" + logIdentifier); + } outputCommitter.initialize(); outputCommitters.put(outputName, outputCommitter); - LOG.info("Invoking committer setup for output=" + outputName - + ", vertexId=" + logIdentifier); + if (LOG.isDebugEnabled()) { + LOG.debug("Invoking committer setup for output=" + outputName + + ", vertex=" + logIdentifier); + } outputCommitter.setupOutput(); return null; } @@ -4034,8 +4043,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } boolean forceTransitionToKillWait = false; vertex.completedTaskCount++; - LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : " - + vertex.completedTaskCount); VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event; Task task = vertex.tasks.get(taskEvent.getTaskID()); if (taskEvent.getState() == TaskState.SUCCEEDED) { @@ -4350,10 +4357,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl int numEventsSent = events.size() - numPreRoutedEvents; if (numEventsSent > 0) { StringBuilder builder = new StringBuilder(); - builder.append("Sending ").append(attemptID).append(" numEvents: ").append(numEventsSent) - .append(" from: ").append(fromEventId).append(" to: ").append(nextFromEventId) - .append(" out of ").append(currEventCount).append(" on-demand events in vertex: ") - .append(getLogIdentifier()); + builder.append("Sending ").append(attemptID).append(" ") + .append(numEventsSent) + .append(" events [").append(fromEventId).append(",").append(nextFromEventId) + .append(") total ").append(currEventCount).append(" ") + .append(getLogIdentifier()); LOG.info(builder.toString()); } } @@ -4644,9 +4652,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl for (String inputName : inputsWithInitializers) { inputList.add(rootInputDescriptors.get(inputName)); } - LOG.info("Vertex will initialize via inputInitializers " - + logIdentifier + ". Starting root input initializers: " - + inputsWithInitializers.size()); + LOG.info("Starting " + inputsWithInitializers.size() + " inputInitializers for vertex " + + logIdentifier); initWaitsForRootInitializers = true; rootInputInitializerManager.runInputInitializers(inputList); // Send pending rootInputInitializerEvents @@ -4730,6 +4737,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public void setAdditionalInputs(List<RootInputLeafOutputProto> inputs) { + LOG.info("Setting " + inputs.size() + " additional inputs for vertex" + this.logIdentifier); this.rootInputDescriptors = Maps.newHashMapWithExpectedSize(inputs.size()); for (RootInputLeafOutputProto input : inputs) { addIO(input.getName()); @@ -4774,7 +4782,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) { - LOG.info("setting additional outputs for vertex " + this.vertexName); + LOG.info("Setting " + outputs.size() + " additional outputs for vertex " + this.logIdentifier); this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size()); this.outputCommitters = Maps.newHashMapWithExpectedSize(outputs.size()); for (RootInputLeafOutputProto output : outputs) { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java index ab74382..d384aef 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java @@ -127,7 +127,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { @SuppressWarnings("unchecked") public synchronized void launch(ContainerLaunchRequest event) { - LOG.info("Launching Container with Id: " + event.getContainerId()); + LOG.info("Launching " + event.getContainerId()); if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { state = ContainerState.DONE; sendContainerLaunchFailedMsg(event.getContainerId(), @@ -185,8 +185,7 @@ public class TezContainerLauncherImpl extends ContainerLauncher { if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; } else { - LOG.info("Sending a stop request to the NM for ContainerId: " - + containerID); + LOG.info("Stopping " + containerID); ContainerManagementProtocolProxyData proxy = null; try { @@ -353,6 +352,9 @@ public class TezContainerLauncherImpl extends ContainerLauncher { // Load ContainerManager tokens before creating a connection. // TODO: Do it only once per NodeManager. ContainerId containerID = event.getBaseOperation().getContainerId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing ContainerOperation {}", event); + } Container c = getContainer(event); switch(event.getOpType()) { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 04d7089..dbf8e38 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -203,7 +203,9 @@ public class TaskSchedulerManager extends AbstractService implements } public synchronized void handleEvent(AMSchedulerEvent sEvent) { - LOG.info("Processing the event " + sEvent.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing the event " + sEvent.toString()); + } switch (sEvent.getType()) { case S_TA_LAUNCH_REQUEST: handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent); @@ -219,7 +221,7 @@ public class TaskSchedulerManager extends AbstractService implements handleTASucceeded(event); break; default: - throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState()); + throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState()); } break; case S_CONTAINER_DEALLOCATE: @@ -366,8 +368,8 @@ public class TaskSchedulerManager extends AbstractService implements event); return; } - LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity - + " but no locality information exists for it. Ignoring hint."); + LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt " + + taskAttempt.getID() + " Ignoring."); // fall through with null hosts/racks } else { hosts = (locationHint.getHosts() != null) ? locationHint @@ -422,7 +424,8 @@ public class TaskSchedulerManager extends AbstractService implements @VisibleForTesting TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) { - LOG.info("Creating TaskScheduler: Local TaskScheduler"); + LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}", + taskSchedulerContext.getCustomClusterIdentifier()); return new LocalTaskSchedulerService(taskSchedulerContext); } @@ -430,8 +433,8 @@ public class TaskSchedulerManager extends AbstractService implements TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) throws TezException { - LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(), - taskSchedulerDescriptor.getClassName()); + LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", taskSchedulerDescriptor.getEntityName(), + taskSchedulerDescriptor.getClassName(), taskSchedulerContext.getCustomClusterIdentifier()); return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(), new Class[]{TaskSchedulerContext.class}, new Object[]{taskSchedulerContext}); @@ -450,8 +453,6 @@ public class TaskSchedulerManager extends AbstractService implements } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); } - LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" + - customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i); taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]); http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index a1c4753..aaa6165 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -35,6 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.tez.common.TezUtils; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -329,7 +330,7 @@ public class YarnTaskSchedulerService extends TaskScheduler "Heartbeats between preemptions should be >=1"); delayedContainerManager = new DelayedContainerManager(); - LOG.info("TaskScheduler initialized with configuration: " + + LOG.info("YarnTaskScheduler initialized with configuration: " + "maxRMHeartbeatInterval: " + heartbeatIntervalMax + ", containerReuseEnabled: " + shouldReuseContainers + ", reuseRackLocal: " + reuseRackLocal + @@ -407,8 +408,11 @@ public class YarnTaskSchedulerService extends TaskScheduler @Override public void onContainersCompleted(List<ContainerStatus> statuses) { if (isStopStarted.get()) { - for (ContainerStatus status : statuses) { - LOG.info("Container " + status.getContainerId() + " is completed"); + if (LOG.isDebugEnabled()) { + for (ContainerStatus status : statuses) { + LOG.debug("Container " + status.getContainerId() + " is completed with ContainerStatus=" + + status); + } } return; } @@ -429,8 +433,10 @@ public class YarnTaskSchedulerService extends TaskScheduler // being released // completion of a container we had released earlier // an allocated container completed. notify app - LOG.info("Released container completed:" + completedId + - " last allocated to task: " + task); + if (LOG.isDebugEnabled()) { + LOG.debug("Released container completed:" + completedId + + " last allocated to task: " + task); + } appContainerStatus.put(task, containerStatus); continue; } @@ -446,9 +452,9 @@ public class YarnTaskSchedulerService extends TaskScheduler } if(task != null) { // completion of a container we have allocated currently - // an allocated container completed. notify app - LOG.info("Allocated container completed:" + completedId + - " last allocated to task: " + task); + // an allocated container completed. notify app. This will cause attempt to get killed + LOG.info( + "Allocated container completed:" + completedId + " last allocated to task: " + task); appContainerStatus.put(task, containerStatus); continue; } @@ -467,9 +473,13 @@ public class YarnTaskSchedulerService extends TaskScheduler @Override public void onContainersAllocated(List<Container> containers) { if (isStopStarted.get()) { - for (Container container : containers) { - LOG.info("Release container:" + container.getId() + ", because it is shutting down."); - releaseContainer(container.getId()); + LOG.info("Ignoring container allocations because application is shutting down. Num " + + containers.size()); + if (LOG.isDebugEnabled()) { + for (Container container : containers) { + LOG.debug("Release container:" + container.getId() + ", because App is shutting down."); + releaseContainer(container.getId()); + } } return; } @@ -528,6 +538,9 @@ public class YarnTaskSchedulerService extends TaskScheduler } // Release any unassigned containers given by the RM + if (containers.iterator().hasNext()) { + LOG.info("Releasing newly assigned containers which could not be allocated"); + } releaseUnassignedContainers(containers); return assignedContainers; @@ -581,15 +594,15 @@ public class YarnTaskSchedulerService extends TaskScheduler boolean isNew = heldContainer.isNew(); if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign a delayed container" - + ", containerId=" + heldContainer.getContainer().getId() - + ", nextScheduleTime=" + heldContainer.getNextScheduleTime() - + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime() - + ", AMState=" + state - + ", matchLevel=" + heldContainer.getLocalityMatchLevel() - + ", taskRequestsCount=" + taskRequests.size() - + ", heldContainers=" + heldContainers.size() - + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() - + ", isNew=" + isNew); + + ", containerId=" + heldContainer.getContainer().getId() + + ", nextScheduleTime=" + heldContainer.getNextScheduleTime() + + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime() + + ", AMState=" + state + + ", matchLevel=" + heldContainer.getLocalityMatchLevel() + + ", taskRequestsCount=" + taskRequests.size() + + ", heldContainers=" + heldContainers.size() + + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() + + ", isNew=" + isNew); } if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) { @@ -637,7 +650,7 @@ public class YarnTaskSchedulerService extends TaskScheduler + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() + ", isNew=" + isNew); releaseUnassignedContainers( - Lists.newArrayList(heldContainer.getContainer())); + Collections.singletonList((heldContainer.getContainer()))); } else { // no outstanding work and container idle timeout not expired if (LOG.isDebugEnabled()) { @@ -690,7 +703,7 @@ public class YarnTaskSchedulerService extends TaskScheduler assignReUsedContainerWithLocation(containerToAssign, NODE_LOCAL_ASSIGNER, assignedContainers, true); if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { - LOG.info("Failed to assign tasks to delayed container using node" + LOG.debug("Failed to assign tasks to delayed container using node" + ", containerId=" + heldContainer.getContainer().getId()); } } @@ -706,7 +719,7 @@ public class YarnTaskSchedulerService extends TaskScheduler assignReUsedContainerWithLocation(containerToAssign, RACK_LOCAL_ASSIGNER, assignedContainers, false); if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { - LOG.info("Failed to assign tasks to delayed container using rack" + LOG.debug("Failed to assign tasks to delayed container using rack" + ", containerId=" + heldContainer.getContainer().getId()); } } @@ -722,7 +735,7 @@ public class YarnTaskSchedulerService extends TaskScheduler assignReUsedContainerWithLocation(containerToAssign, NON_LOCAL_ASSIGNER, assignedContainers, false); if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) { - LOG.info("Failed to assign tasks to delayed container using non-local" + LOG.debug("Failed to assign tasks to delayed container using non-local" + ", containerId=" + heldContainer.getContainer().getId()); } } @@ -744,10 +757,10 @@ public class YarnTaskSchedulerService extends TaskScheduler if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime && idleContainerTimeoutMin != -1) { LOG.info("Container's idle timeout expired. Releasing container" - + ", containerId=" + heldContainer.container.getId() - + ", containerExpiryTime=" - + heldContainer.getContainerExpiryTime() - + ", idleTimeoutMin=" + idleContainerTimeoutMin); + + ", containerId=" + heldContainer.container.getId() + + ", containerExpiryTime=" + + heldContainer.getContainerExpiryTime() + + ", idleTimeoutMin=" + idleContainerTimeoutMin); releaseUnassignedContainers( Lists.newArrayList(heldContainer.container)); } else { @@ -794,11 +807,11 @@ public class YarnTaskSchedulerService extends TaskScheduler if (safeToRelease && (!taskRequests.isEmpty() || !getContext().isSession())) { LOG.info("Releasing held container as either there are pending but " - + " unmatched requests or this is not a session" - + ", containerId=" + heldContainer.container.getId() - + ", pendingTasks=" + taskRequests.size() - + ", isSession=" + getContext().isSession() - + ". isNew=" + isNew); + + " unmatched requests or this is not a session" + + ", containerId=" + heldContainer.container.getId() + + ", pendingTasks=" + taskRequests.size() + + ", isSession=" + getContext().isSession() + + ". isNew=" + isNew); releaseUnassignedContainers( Lists.newArrayList(heldContainer.container)); } else { @@ -873,8 +886,8 @@ public class YarnTaskSchedulerService extends TaskScheduler // TODO this will not handle dynamic changes in resources totalResources = Resources.clone(getAvailableResources()); LOG.info("App total resource memory: " + totalResources.getMemory() + - " cpu: " + totalResources.getVirtualCores() + - " taskAllocations: " + taskAllocations.size()); + " cpu: " + totalResources.getVirtualCores() + + " taskAllocations: " + taskAllocations.size()); } numHeartbeats++; @@ -973,9 +986,9 @@ public class YarnTaskSchedulerService extends TaskScheduler // See if any of the delayedContainers can be used for this task. delayedContainerManager.triggerScheduling(true); LOG.info("Allocation request for task: " + task + - " with request: " + request + - " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") + - " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null")); + " with request: " + request + + " host: " + ((hosts != null && hosts.length > 0) ? hosts[0] : "null") + + " rack: " + ((racks != null && racks.length > 0) ? racks[0] : "null")); } /** @@ -1008,8 +1021,10 @@ public class YarnTaskSchedulerService extends TaskScheduler LOG.info("Ignoring removal of unknown task: " + task); return false; } else { - LOG.info("Deallocated task: " + task + " from container: " - + container.getId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Deallocated task: " + task + " from container: " + + container.getId()); + } if (!taskSucceeded || !shouldReuseContainers) { if (LOG.isDebugEnabled()) { @@ -1029,6 +1044,7 @@ public class YarnTaskSchedulerService extends TaskScheduler } assignedContainers = assignDelayedContainer(heldContainer); } else { + // this is a non standard situation LOG.info("Skipping container after task deallocate as container is" + " no longer running, containerId=" + container.getId()); } @@ -1047,8 +1063,9 @@ public class YarnTaskSchedulerService extends TaskScheduler public synchronized Object deallocateContainer(ContainerId containerId) { Object task = unAssignContainer(containerId, true); if(task != null) { + // non-standard case for the app layer to deallocate container LOG.info("Deallocated container: " + containerId + - " from task: " + task); + " from task: " + task); return task; } @@ -1058,9 +1075,9 @@ public class YarnTaskSchedulerService extends TaskScheduler @Override public synchronized void initiateStop() { - LOG.info("Initiate stop to YarnTaskScheduler"); + LOG.info("Initiating stop of YarnTaskScheduler"); // release held containers - LOG.info("Release held containers"); + LOG.info("Releasing held containers"); isStopStarted.set(true); // Create a new list for containerIds to iterate, otherwise it would cause ConcurrentModificationException // because method releaseContainer will change heldContainers. @@ -1073,7 +1090,7 @@ public class YarnTaskSchedulerService extends TaskScheduler } // remove taskRequest from AMRMClient to avoid allocating new containers in the next heartbeat - LOG.info("Remove all the taskRequests"); + LOG.info("Removing all pending taskRequests"); // Create a new list for tasks to avoid ConcurrentModificationException List<Object> tasks = new ArrayList<Object>(taskRequests.size()); for (Object task : taskRequests.keySet()) { @@ -1634,8 +1651,9 @@ public class YarnTaskSchedulerService extends TaskScheduler private void releaseUnassignedContainers(Iterable<Container> containers) { for (Container container : containers) { - LOG.info("Releasing unused container: " - + container.getId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing unused container: " + container.getId()); + } releaseContainer(container.getId()); } } @@ -1704,19 +1722,17 @@ public class YarnTaskSchedulerService extends TaskScheduler Object task = getTask(assigned); assert task != null; - LOG.info("Assigning container to task" - + ", container=" + container + LOG.info("Assigning container to task: " + + "containerId=" + container.getId() + ", task=" + task - + ", containerHost=" + container.getNodeId().getHost() + + ", containerHost=" + container.getNodeId() + + ", containerPriority= " + container.getPriority() + + ", containerResources=" + container.getResource() + ", localityMatchType=" + locality + ", matchedLocation=" + matchedLocation + ", honorLocalityFlags=" + honorLocalityFlags - + ", reusedContainer=" - + containerAssignments.containsKey(container.getId()) - + ", delayedContainers=" + delayedContainerManager.delayedContainers.size() - + ", containerResourceMemory=" + container.getResource().getMemory() - + ", containerResourceVCores=" - + container.getResource().getVirtualCores()); + + ", reusedContainer=" + containerAssignments.containsKey(container.getId()) + + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()); assignContainer(task, container, assigned); } @@ -1904,6 +1920,7 @@ public class YarnTaskSchedulerService extends TaskScheduler heldContainers.get(delayedContainer.getContainer().getId())) { assignedContainers = assignDelayedContainer(delayedContainer); } else { + // non standard scenario LOG.info("Skipping delayed container as container is no longer" + " running, containerId=" + delayedContainer.getContainer().getId()); @@ -1958,9 +1975,10 @@ public class YarnTaskSchedulerService extends TaskScheduler HeldContainer delayedContainer = iter.next(); if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) { // this container is no longer held by us + // non standard scenario LOG.info("AssignAll - Skipping delayed container as container is no longer" - + " running, containerId=" - + delayedContainer.getContainer().getId()); + + " running, containerId=" + + delayedContainer.getContainer().getId()); iter.remove(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 470fa56..11b5006 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -113,8 +113,10 @@ public class AMContainerHelpers { // correctly, even though they may not be used by all tasks which will run // on this container. - LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #" - + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container"); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding #" + credentials.numberOfTokens() + " tokens and #" + + credentials.numberOfSecretKeys() + " secret keys for NM use for launching container in common CLC"); + } containerCredentials.addAll(credentials); DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); @@ -123,7 +125,9 @@ public class AMContainerHelpers { containerTokens_dob.getLength()); // Add shuffle token - LOG.info("Putting shuffle token in serviceData"); + if (LOG.isDebugEnabled()) { + LOG.debug("Putting shuffle token in serviceData in common CLC"); + } serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials))); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 69c21d4..d37d106 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -421,9 +421,11 @@ public class AMContainerImpl implements AMContainer { // TODO Can't set state to COMPLETED. Add a default error state. } if (oldState != getState()) { - LOG.info("AMContainer " + this.containerId + " transitioned from " - + oldState + " to " + getState() - + " via event " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("AMContainer " + this.containerId + " transitioned from " + + oldState + " to " + getState() + + " via event " + event.getType()); + } } } finally { writeLock.unlock(); @@ -474,8 +476,10 @@ public class AMContainerImpl implements AMContainer { // task is not told to die since the TAL does not know about the container. container.registerWithTAListener(); container.sendStartRequestToNM(clc); - LOG.info("Sending Launch Request for Container with id: " + - container.container.getId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending Launch Request for Container with id: " + + container.container.getId()); + } } } @@ -533,7 +537,7 @@ public class AMContainerImpl implements AMContainer { public void transition(AMContainerImpl container, AMContainerEvent cEvent) { super.transition(container, cEvent); container.deAllocate(); - LOG.info( + LOG.warn( "Unexpected event type: " + cEvent.getType() + " while in state: " + container.getState() + ". Event: " + cEvent); @@ -597,8 +601,6 @@ public class AMContainerImpl implements AMContainer { } } - LOG.info("Assigned taskAttempt + [" + container.currentAttempt + - "] to container: [" + container.getContainerId() + "]"); AMContainerTask amContainerTask = new AMContainerTask( event.getRemoteTaskSpec(), container.additionalLocalResources, container.credentialsChanged ? container.credentials : null, container.credentialsChanged, http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java index 3264708..b1c81af 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java @@ -95,6 +95,8 @@ public class PerSourceNodeTracker { AMNode amNode = nodeMap.get(nodeId); if (amNode == null) { LOG.info("Ignoring RM Health Update for unknown node: " + nodeId); + // This implies that the node exists on the cluster, but is not running a container for + // this application. } else { amNode.handle(rEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 9e275a2..e17a4d4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -47,8 +47,6 @@ public class HistoryEventHandler extends CompositeService { @Override public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing HistoryEventHandler"); - this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT); @@ -56,6 +54,10 @@ public class HistoryEventHandler extends CompositeService { TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT); + LOG.info("Initializing HistoryEventHandler with" + + "recoveryEnabled=" + recoveryEnabled + + ", historyServiceClassName=" + historyServiceClassName); + historyLoggingService = ReflectionUtils.createClazzInstance(historyServiceClassName); historyLoggingService.setAppContext(context); @@ -66,11 +68,11 @@ public class HistoryEventHandler extends CompositeService { addService(recoveryService); } super.serviceInit(conf); + } @Override public void serviceStart() throws Exception { - LOG.info("Starting HistoryEventHandler"); super.serviceStart(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java index 9f24151..7d83db2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java @@ -177,6 +177,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { @Override public String toString() { + String counterStr = ""; + if (state != TaskAttemptState.SUCCEEDED) { + counterStr = ", counters=" + ( tezCounters == null ? "null" : + tezCounters.toString() + .replaceAll("\\n", ", ").replaceAll("\\s+", " ")); + } return "vertexName=" + vertexName + ", taskAttemptId=" + taskAttemptId + ", creationTime=" + creationTime @@ -187,11 +193,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { + ", status=" + state.name() + ", errorEnum=" + (error != null ? error.name() : "") + ", diagnostics=" + diagnostics - + ", lastDataEventSourceTA=" + - ((dataEvents==null) ? 0:dataEvents.size()) - + ", counters=" + (tezCounters == null ? "null" : - tezCounters.toString() - .replaceAll("\\n", ", ").replaceAll("\\s+", " ")); + + counterStr; } public TezTaskAttemptID getTaskAttemptID() { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java index a58b49e..71d4419 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java @@ -110,9 +110,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent { + ", taskAttemptId=" + taskAttemptId + ", startTime=" + launchTime + ", containerId=" + containerId - + ", nodeId=" + nodeId - + ", inProgressLogs=" + inProgressLogsUrl - + ", completedLogs=" + completedLogsUrl; + + ", nodeId=" + nodeId; } public TezTaskAttemptID getTaskAttemptID() { http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java index 8852e02..4372d8e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java @@ -148,7 +148,9 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService { if (loggingDisabled) { return; } - LOG.info("Writing event " + event.getHistoryEvent().getEventType() + " to history file"); + if (LOG.isTraceEnabled()) { + LOG.trace("Writing event " + event.getHistoryEvent().getEventType() + " to history file"); + } try { try { JSONObject eventJson = HistoryEventJsonConversion.convertToJson(event.getHistoryEvent()); http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java index d870645..2fe0e6d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java @@ -106,7 +106,6 @@ public class RecoveryService extends AbstractService { @Override public void serviceInit(Configuration conf) throws Exception { - LOG.info("Initializing RecoveryService"); recoveryPath = appContext.getCurrentRecoveryDir(); recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf); bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE, @@ -120,11 +119,16 @@ public class RecoveryService extends AbstractService { drainEventsFlag = conf.getBoolean( TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED_DEFAULT); + + LOG.info("RecoveryService initialized with " + + "recoveryPath=" + recoveryPath + + ", bufferSize(bytes)=" + bufferSize + + ", flushInterval(s)=" + flushInterval + + ", maxUnflushedEvents=" + maxUnflushedEvents); } @Override public void serviceStart() { - LOG.info("Starting RecoveryService"); lastFlushTime = appContext.getClock().getTime(); eventHandlingThread = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-dag/src/main/resources/tez-container-log4j.properties ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties index 7a2aeab..c53994e 100644 --- a/tez-dag/src/main/resources/tez-container-log4j.properties +++ b/tez-dag/src/main/resources/tez-container-log4j.properties @@ -28,7 +28,7 @@ log4j.appender.CLA=org.apache.tez.common.TezContainerLogAppender log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir} log4j.appender.CLA.layout=org.apache.log4j.PatternLayout -log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}: %m%n +log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t]|| %c{2} %m%n: # # Event Counter Appender http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 69237d4..1b66c8e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -117,8 +117,9 @@ public class MROutputCommitter extends OutputCommitter { if (jobConf.getBoolean("mapred.reducer.new-api", false) || jobConf.getBoolean("mapred.mapper.new-api", false)) { newApiCommitter = true; - LOG.info("Using mapred newApiCommitter."); } + LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() + + " using " + (newApiCommitter ? "new" : "old") + "mapred API"); if (newApiCommitter) { TaskAttemptID taskAttemptID = new TaskAttemptID( http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index dbc7748..b93e4ba 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -69,35 +69,30 @@ public class MRInputAMSplitGenerator extends InputInitializer { @Override public List<Event> initialize() throws Exception { - Stopwatch sw = null; - if (LOG.isDebugEnabled()) { - sw = new Stopwatch().start(); - } + Stopwatch sw = new Stopwatch().start(); MRInputUserPayloadProto userPayloadProto = MRInputHelpers .parseMRInputPayload(getContext().getInputUserPayload()); + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " + sw.elapsedMillis()); } - if (LOG.isDebugEnabled()) { - sw.reset().start(); - } + sw.reset().start(); Configuration conf = TezUtils.createConfFromByteString(userPayloadProto .getConfigurationBytes()); sendSerializedEvents = conf.getBoolean( MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD, MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLOAD_DEFAULT); - LOG.info("Emitting serialized splits: " + sendSerializedEvents); + + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); + LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " + + getContext().getInputName()); LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis()); } - if (LOG.isDebugEnabled()) { - sw.reset().start(); - } + sw.reset().start(); int totalResource = getContext().getTotalAvailableResource().getMemory(); int taskResource = getContext().getVertexTaskResource().getMemory(); @@ -107,24 +102,26 @@ public class MRInputAMSplitGenerator extends InputInitializer { int numTasks = (int)((totalResource*waves)/taskResource); + + + boolean groupSplits = userPayloadProto.getGroupingEnabled(); LOG.info("Input " + getContext().getInputName() + " asking for " + numTasks + " tasks. Headroom: " + totalResource + " Task Resource: " - + taskResource + " waves: " + waves); + + taskResource + " waves: " + waves + ", groupingEnabled: " + groupSplits); // Read all credentials into the credentials instance stored in JobConf. JobConf jobConf = new JobConf(conf); jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); InputSplitInfoMem inputSplitInfo = null; - boolean groupSplits = userPayloadProto.getGroupingEnabled(); + if (groupSplits) { - LOG.info("Grouping input splits"); inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, true, numTasks); } else { inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0); } + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); LOG.debug("Time to create splits to mem: " + sw.elapsedMillis()); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index e6b70d2..28d108e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -69,14 +69,11 @@ public class MRInputSplitDistributor extends InputInitializer { @Override public List<Event> initialize() throws IOException { - Stopwatch sw = null; - if (LOG.isDebugEnabled()) { - sw = new Stopwatch().start(); - } + Stopwatch sw = new Stopwatch().start(); MRInputUserPayloadProto userPayloadProto = MRInputHelpers .parseMRInputPayload(getContext().getInputUserPayload()); + sw.stop(); if (LOG.isDebugEnabled()) { - sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " + sw.elapsedMillis()); } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 7f5e0e3..30e4a8c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -284,21 +284,27 @@ public class MRInputHelpers { InputSplitInfoMem splitInfoMem = null; JobConf jobConf = new JobConf(conf); if (jobConf.getUseNewMapper()) { - LOG.info("Generating mapreduce api input splits"); + if (LOG.isDebugEnabled()) { + LOG.debug("Generating mapreduce api input splits"); + } Job job = Job.getInstance(conf); org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(job, groupSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, job.getCredentials(), job.getConfiguration()); } else { - LOG.info("Generating mapred api input splits"); + if (LOG.isDebugEnabled()) { + LOG.debug("Generating mapred api input splits"); + } org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf, groupSplits, targetTasks); splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, jobConf.getCredentials(), jobConf); } - LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: " - + splitInfoMem.getSplitsProto().getSerializedSize()); + if (LOG.isDebugEnabled()) { + LOG.debug("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: " + + splitInfoMem.getSplitsProto().getSerializedSize()); + } return splitInfoMem; } http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java index 720af50..80828d4 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java @@ -51,11 +51,13 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti if (useNewApi) { oldPartitioner = null; if (partitions > 1) { + Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>> clazz = + (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf + .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR, + org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class); + LOG.info("Using newApi, MRpartitionerClass=" + clazz.getName()); newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils - .newInstance( - (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf - .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR, - org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf); + .newInstance(clazz, conf); } else { newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() { @Override @@ -67,10 +69,12 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti } else { newPartitioner = null; if (partitions > 1) { - oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance( + Class<? extends org.apache.hadoop.mapred.Partitioner> clazz = (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass( - "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class), - new JobConf(conf)); + "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class); + LOG.info("Using oldApi, MRpartitionerClass=" + clazz.getName()); + oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance( + clazz, new JobConf(conf)); } else { oldPartitioner = new org.apache.hadoop.mapred.Partitioner() { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/f785ce8d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index d0e935f..6ea21e2 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -95,7 +95,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false"); return; } - LOG.info("Initializing ATSService"); if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { @@ -124,7 +123,12 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { } sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID); - LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs"); + LOG.info("Initializing " + ATSHistoryLoggingService.class.getSimpleName() + " with " + + "maxEventsPerBatch=" + maxEventsPerBatch + + ", maxPollingTime(ms)=" + maxPollingTimeMillis + + ", waitTimeForShutdown(ms)=" + maxTimeToWaitOnShutdown + + ", TimelineACLManagerClass=" + atsHistoryACLManagerClassName); + try { historyACLPolicyManager = ReflectionUtils.createClazzInstance( atsHistoryACLManagerClassName); @@ -146,7 +150,6 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { if (!historyLoggingEnabled || timelineClient == null) { return; } - LOG.info("Starting ATSService"); timelineClient.start(); eventHandlingThread = new Thread(new Runnable() {
