TEZ-32. Fix logging support for MapReduce tasks and AM. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/268f3ac9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/268f3ac9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/268f3ac9 Branch: refs/heads/master Commit: 268f3ac97b081294fcdc22f7258890f219285fca Parents: 36198a6 Author: Siddharth Seth <[email protected]> Authored: Thu May 23 11:17:43 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu May 23 11:17:43 2013 -0700 ---------------------------------------------------------------------- .../org/apache/tez/dag/api/TezConfiguration.java | 4 +- .../apache/tez/dag/utils/TezEngineChildJVM.java | 1 - .../java/org/apache/tez/mapreduce/YARNRunner.java | 113 ++++++++------- 3 files changed, 61 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 4a407b3..3018ea1 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -45,7 +45,9 @@ public class TezConfiguration extends Configuration { // TODO Should not be required once all tokens are handled via AppSubmissionContext public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir"; public static final String APPLICATION_TOKENS_FILE = "appTokens"; - + public static final String DAG_APPLICATION_MASTER_CLASS = + "org.apache.tez.dag.app.DAGAppMaster"; + public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = TEZ_PREFIX + "task.listener.thread-count"; public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java index 727ded5..bc92ed8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.app.rm.container.AMContainerHelpers; import org.apache.tez.dag.records.TezVertexID; public class TezEngineChildJVM { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 7582a5e..9ece649 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -45,8 +44,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskLog; -import org.apache.hadoop.mapred.TaskLog.LogName; import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.Counters; @@ -70,7 +67,6 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.v2.LogParams; -import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; @@ -93,11 +89,10 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.Edge; @@ -120,8 +115,6 @@ public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); private ResourceMgrDelegate resMgrDelegate; private ClientCache clientCache; private Configuration conf; @@ -172,6 +165,7 @@ public class YARNRunner implements ClientProtocol { } } + @VisibleForTesting @Private /** * Used for testing mostly. @@ -210,12 +204,6 @@ public class YARNRunner implements ClientProtocol { return resMgrDelegate.getClusterMetrics(); } - @VisibleForTesting - Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) - throws IOException, InterruptedException { - throw new UnsupportedOperationException("No HistoryServer for Tez"); - } - @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException { @@ -384,17 +372,6 @@ public class YARNRunner implements ClientProtocol { MRJobConfig.MAPRED_ADMIN_USER_ENV, MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)); - // FIXME is this really required? - // Add stdout/stderr env - environment.put( - MRJobConfig.STDOUT_LOGFILE_ENV, - getTaskLogFile(TaskLog.LogName.STDOUT) - ); - environment.put( - MRJobConfig.STDERR_LOGFILE_ENV, - getTaskLogFile(TaskLog.LogName.STDERR) - ); - } private static String getChildEnv(Configuration jobConf, boolean isMap) { @@ -418,19 +395,7 @@ public class YARNRunner implements ClientProtocol { } } - private static String getTaskLogFile(LogName filter) { - return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR - + filter.toString(); - } - - private static void setupLog4jProperties(Configuration jobConf, - boolean isMap, - Vector<String> vargs, - long logSize) { - String logLevel = getChildLogLevel(jobConf, isMap); - MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); - } - + private void setupMapReduceEnv(Configuration jobConf, Map<String, String> environment, boolean isMap) throws IOException { @@ -474,13 +439,10 @@ public class YARNRunner implements ClientProtocol { // FIXME: don't think this is also needed given we already set java // properties. // TODO Change this not to use JobConf. - long logSize = TaskLog.getTaskLogLength(new JobConf(jobConf)); - Vector<String> logProps = new Vector<String>(4); - setupLog4jProperties(jobConf, isMap, logProps, logSize); - Iterator<String> it = logProps.iterator(); + String log4jCmdLineProperties = getLog4jCmdLineProperties(jobConf, isMap); StringBuffer buffer = new StringBuffer(); - while (it.hasNext()) { - buffer.append(" " + it.next()); + if (log4jCmdLineProperties != null && log4jCmdLineProperties != "") { + buffer.append(" " + log4jCmdLineProperties); } // FIXME supposedly required for streaming, should we remove it and let @@ -492,7 +454,7 @@ public class YARNRunner implements ClientProtocol { hadoopClientOpts = hadoopClientOpts + " "; } hadoopClientOpts = hadoopClientOpts + buffer.toString(); - // environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); + //environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); // FIXME for this to work, we need YARN-561 and the task runtime changed // to use YARN-561 @@ -730,7 +692,7 @@ public class YARNRunner implements ClientProtocol { ApplicationId applicationId = resMgrDelegate.getApplicationId(); // Setup resource requirements - Resource capability = recordFactory.newRecordInstance(Resource.class); + Resource capability = Records.newRecord(Resource.class); capability.setMemory( conf.getInt(TezConfiguration.DAG_AM_RESOURCE_MEMORY_MB, TezConfiguration.DEFAULT_DAG_AM_RESOURCE_MEMORY_MB)); @@ -757,9 +719,10 @@ public class YARNRunner implements ClientProtocol { // LOG.error(" !!!!!!!!!"); // vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y"); - // FIXME set up logging related properties // TODO -Dtez.root.logger?? - // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); + String amLogLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL, + MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); + addLog4jSystemProperties(amLogLevel, vargs); // FIXME admin command opts and user command opts for tez? String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, @@ -775,7 +738,7 @@ public class YARNRunner implements ClientProtocol { MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV); vargs.add(mrAppMasterUserOptions); - vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); + vargs.add(TezConfiguration.DAG_APPLICATION_MASTER_CLASS); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + @@ -859,8 +822,9 @@ public class YARNRunner implements ClientProtocol { environment, vargsFinal, null, securityTokens, acls); // Set up the ApplicationSubmissionContext - ApplicationSubmissionContext appContext = - recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + ApplicationSubmissionContext appContext = Records + .newRecord(ApplicationSubmissionContext.class); + appContext.setApplicationId(applicationId); // ApplicationId appContext.setResource(capability); // resource appContext.setQueue( // Queue name @@ -975,7 +939,7 @@ public class YARNRunner implements ClientProtocol { private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException { - LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); + LocalResource rsrc = Records.newRecord(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); @@ -1127,6 +1091,7 @@ public class YARNRunner implements ClientProtocol { } } + @SuppressWarnings("deprecation") private String getMapJavaOpts(Configuration jobConf) { // follows pattern from YARN MapReduceChildJVM.java String adminOpts = ""; @@ -1142,9 +1107,11 @@ public class YARNRunner implements ClientProtocol { JobConf.MAPRED_TASK_JAVA_OPTS, JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); - return adminOpts + " " + userOpts; + return adminOpts.trim() + " " + userOpts.trim() + " " + + getLog4jCmdLineProperties(jobConf, true); } + @SuppressWarnings("deprecation") private String getReduceJavaOpts(Configuration jobConf) { // follows pattern from YARN MapReduceChildJVM.java String adminOpts = ""; @@ -1159,6 +1126,42 @@ public class YARNRunner implements ClientProtocol { jobConf.get( JobConf.MAPRED_TASK_JAVA_OPTS, JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); - return adminOpts + " " + userOpts; + + return adminOpts.trim() + " " + userOpts.trim() + " " + + getLog4jCmdLineProperties(jobConf, false); } -} + + private static String getLog4jCmdLineProperties(Configuration jobConf, + boolean isMap) { + Vector<String> logProps = new Vector<String>(4); + addLog4jSystemProperties(getChildLogLevel(jobConf, isMap), logProps); + StringBuilder sb = new StringBuilder(); + for (String str : logProps) { + sb.append(str).append(" "); + } + return sb.toString(); + } + + /** + * Add the JVM system properties necessary to configure + * {@link ContainerLogAppender}. + * + * @param logLevel + * the desired log level (eg INFO/WARN/DEBUG) + * @param vargs + * the argument list to append to + */ + private static void addLog4jSystemProperties(String logLevel, + List<String> vargs) { + vargs.add("-Dlog4j.configuration=container-log4j.properties"); + // TODO Fix the remaining properties after YARN-720 is fixed. + // May need to introduce a log4j properties file for tez tasks. + vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "=" + + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + // Setting this to 0 to avoid log size restrictions. + // Should be enforced by YARN. + vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + 0); + vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); + } + +} \ No newline at end of file
