Repository: hadoop Updated Branches: refs/heads/trunk 22afae890 -> 03ab24aa0
MAPREDUCE-5932. Provide an option to use a dedicated reduce-side shuffle log. Contributed by Gera Shegalov Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/03ab24aa Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/03ab24aa Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/03ab24aa Branch: refs/heads/trunk Commit: 03ab24aa01ffea1cacf1fa9cbbf73c3f2904d981 Parents: 22afae8 Author: Jason Lowe <[email protected]> Authored: Wed Dec 3 17:02:14 2014 +0000 Committer: Jason Lowe <[email protected]> Committed: Wed Dec 3 17:02:14 2014 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/MapReduceChildJVM.java | 34 +-------- .../v2/app/job/impl/TestMapReduceChildJVM.java | 71 ++++++++++++++++- .../apache/hadoop/mapreduce/v2/util/MRApps.java | 80 +++++++++++++++++--- .../apache/hadoop/mapred/FileOutputFormat.java | 4 +- .../java/org/apache/hadoop/mapred/TaskLog.java | 4 + .../apache/hadoop/mapreduce/MRJobConfig.java | 14 ++++ .../src/main/resources/mapred-default.xml | 28 +++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 9 +-- .../hadoop/yarn/ContainerLogAppender.java | 11 ++- .../yarn/ContainerRollingLogAppender.java | 11 ++- .../hadoop/yarn/TestContainerLogAppender.java | 1 + .../main/resources/container-log4j.properties | 29 ++++++- 13 files changed, 243 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5417c3e..3f34acd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -235,6 +235,9 @@ Release 2.7.0 - UNRELEASED IMPROVEMENTS + MAPREDUCE-5932. Provide an option to use a dedicated reduce-side shuffle + log (Gera Shegalov via jlowe) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index c790c57..817b3a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -20,16 +20,14 @@ package org.apache.hadoop.mapred; import java.net.InetSocketAddress; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.TaskLog.LogName; -import org.apache.hadoop.mapreduce.ID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -52,20 +50,6 @@ public class MapReduceChildJVM { jobConf.get(JobConf.MAPRED_TASK_ENV)); } - private static String getChildLogLevel(JobConf conf, boolean isMap) { - if (isMap) { - return conf.get( - MRJobConfig.MAP_LOG_LEVEL, - JobConf.DEFAULT_LOG_LEVEL.toString() - ); - } else { - return conf.get( - MRJobConfig.REDUCE_LOG_LEVEL, - JobConf.DEFAULT_LOG_LEVEL.toString() - ); - } - } - public static void setVMEnv(Map<String, String> environment, Task task) { @@ -79,7 +63,7 @@ public class MapReduceChildJVM { // streaming) it will have the correct loglevel. environment.put( "HADOOP_ROOT_LOGGER", - getChildLogLevel(conf, task.isMapTask()) + ",console"); + MRApps.getChildLogLevel(conf, task.isMapTask()) + ",console"); // TODO: The following is useful for instance in streaming tasks. Should be // set in ApplicationMaster's env by the RM. @@ -147,15 +131,6 @@ public class MapReduceChildJVM { return adminClasspath + " " + userClasspath; } - private static void setupLog4jProperties(Task task, - Vector<String> vargs, - long logSize, Configuration conf) { - String logLevel = getChildLogLevel(task.conf, task.isMapTask()); - int numBackups = task.conf.getInt(MRJobConfig.TASK_LOG_BACKUPS, - MRJobConfig.DEFAULT_TASK_LOG_BACKUPS); - MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf); - } - public static List<String> getVMCommand( InetSocketAddress taskAttemptListenerAddr, Task task, JVMId jvmID) { @@ -206,10 +181,7 @@ public class MapReduceChildJVM { Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); vargs.add("-Djava.io.tmpdir=" + childTmpDir); - - // Setup the log4j prop - long logSize = TaskLog.getTaskLogLength(conf); - setupLog4jProperties(task, vargs, logSize, conf); + MRApps.addLog4jSystemProperties(task, vargs, conf); if (conf.getProfileEnabled()) { if (conf.getProfileTaskRange(task.isMapTask() http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java index 8e146b9..b1e9cf0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java @@ -61,7 +61,7 @@ public class TestMapReduceChildJVM { " -Dlog4j.configuration=container-log4j.properties" + " -Dyarn.app.container.log.dir=<LOG_DIR>" + " -Dyarn.app.container.log.filesize=0" + - " -Dhadoop.root.logger=INFO,CLA" + + " -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" + " org.apache.hadoop.mapred.YarnChild 127.0.0.1" + " 54321" + " attempt_0_0000_m_000000_0" + @@ -77,6 +77,73 @@ public class TestMapReduceChildJVM { app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS")); Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS")); } + + @Test (timeout = 30000) + public void testReduceCommandLineWithSeparateShuffle() throws Exception { + final Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, true); + testReduceCommandLine(conf); + } + + @Test (timeout = 30000) + public void testReduceCommandLineWithSeparateCRLAShuffle() throws Exception { + final Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, true); + conf.setLong(MRJobConfig.SHUFFLE_LOG_KB, 1L); + conf.setInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, 3); + testReduceCommandLine(conf); + } + + @Test (timeout = 30000) + public void testReduceCommandLine() throws Exception { + final Configuration conf = new Configuration(); + testReduceCommandLine(conf); + } + + private void testReduceCommandLine(Configuration conf) + throws Exception { + + MyMRApp app = new MyMRApp(0, 1, true, this.getClass().getName(), true); + conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true); + Job job = app.submit(conf); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + final long shuffleLogSize = + conf.getLong(MRJobConfig.SHUFFLE_LOG_KB, 0L) * 1024L; + final int shuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, 0); + final String appenderName = shuffleLogSize > 0L && shuffleBackups > 0 + ? "shuffleCRLA" + : "shuffleCLA"; + + Assert.assertEquals( + "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" + + " -Djava.net.preferIPv4Stack=true" + + " -Dhadoop.metrics.log.level=WARN" + + " -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" + + " -Dlog4j.configuration=container-log4j.properties" + + " -Dyarn.app.container.log.dir=<LOG_DIR>" + + " -Dyarn.app.container.log.filesize=0" + + " -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" + + " -Dyarn.app.mapreduce.shuffle.logger=INFO," + appenderName + + " -Dyarn.app.mapreduce.shuffle.logfile=syslog.shuffle" + + " -Dyarn.app.mapreduce.shuffle.log.filesize=" + shuffleLogSize + + " -Dyarn.app.mapreduce.shuffle.log.backups=" + shuffleBackups + + " org.apache.hadoop.mapred.YarnChild 127.0.0.1" + + " 54321" + + " attempt_0_0000_r_000000_0" + + " 0" + + " 1><LOG_DIR>/stdout" + + " 2><LOG_DIR>/stderr ]", app.myCommandLine); + + Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job", + app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER")); + Assert.assertEquals("INFO,console", + app.cmdEnvironment.get("HADOOP_ROOT_LOGGER")); + Assert.assertTrue("HADOOP_CLIENT_OPTS not set for job", + app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS")); + Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS")); + } @Test (timeout = 30000) public void testCommandLineWithLog4JConifg() throws Exception { @@ -99,7 +166,7 @@ public class TestMapReduceChildJVM { " -Dlog4j.configuration=" + testLogPropertieFile + " -Dyarn.app.container.log.dir=<LOG_DIR>" + " -Dyarn.app.container.log.filesize=0" + - " -Dhadoop.root.logger=INFO,CLA" + + " -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" + " org.apache.hadoop.mapred.YarnChild 127.0.0.1" + " 54321" + " attempt_0_0000_m_000000_0" + http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 4484e6a..08b44f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -43,6 +43,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Task; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -59,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.util.ApplicationClassLoader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.ContainerLogAppender; +import org.apache.hadoop.yarn.ContainerRollingLogAppender; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -68,7 +72,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.log4j.RollingFileAppender; /** * Helper class for MR applications @@ -592,18 +595,32 @@ public class MRApps extends Apps { } return result; } + + public static String getChildLogLevel(Configuration conf, boolean isMap) { + if (isMap) { + return conf.get( + MRJobConfig.MAP_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } else { + return conf.get( + MRJobConfig.REDUCE_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } + } /** - * Add the JVM system properties necessary to configure {@link ContainerLogAppender}. - * @param logLevel the desired log level (eg INFO/WARN/DEBUG) - * @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)} - * @param numBackups See {@link RollingFileAppender#setMaxBackupIndex(int)} + * Add the JVM system properties necessary to configure + * {@link ContainerLogAppender} or + * {@link ContainerRollingLogAppender}. + * + * @param task for map/reduce, or null for app master * @param vargs the argument list to append to * @param conf configuration of MR job */ - public static void addLog4jSystemProperties( - String logLevel, long logSize, int numBackups, List<String> vargs, - Configuration conf) { + public static void addLog4jSystemProperties(Task task, + List<String> vargs, Configuration conf) { String log4jPropertyFile = conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, ""); if (log4jPropertyFile.isEmpty()) { @@ -618,11 +635,30 @@ public class MRApps extends Apps { Path log4jPath = new Path(log4jURI); vargs.add("-Dlog4j.configuration="+log4jPath.getName()); } - + + long logSize; + String logLevel; + int numBackups; + + if (task == null) { + logSize = conf.getLong(MRJobConfig.MR_AM_LOG_KB, + MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10; + logLevel = conf.get( + MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); + numBackups = conf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, + MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS); + } else { + logSize = TaskLog.getTaskLogLimitBytes(conf); + logLevel = getChildLogLevel(conf, task.isMapTask()); + numBackups = conf.getInt(MRJobConfig.TASK_LOG_BACKUPS, + MRJobConfig.DEFAULT_TASK_LOG_BACKUPS); + } + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); vargs.add( "-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize); + if (logSize > 0L && numBackups > 0) { // log should be rolled vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "=" @@ -631,6 +667,30 @@ public class MRApps extends Apps { } else { vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); } + vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); + + if ( task != null + && !task.isMapTask() + && conf.getBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, + MRJobConfig.DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG)) { + final int numShuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, + MRJobConfig.DEFAULT_SHUFFLE_LOG_BACKUPS); + final long shuffleLogSize = conf.getLong(MRJobConfig.SHUFFLE_LOG_KB, + MRJobConfig.DEFAULT_SHUFFLE_LOG_KB) << 10; + final String shuffleLogger = logLevel + + (shuffleLogSize > 0L && numShuffleBackups > 0 + ? ",shuffleCRLA" + : ",shuffleCLA"); + + vargs.add("-D" + MRJobConfig.MR_PREFIX + + "shuffle.logger=" + shuffleLogger); + vargs.add("-D" + MRJobConfig.MR_PREFIX + + "shuffle.logfile=" + TaskLog.LogName.SYSLOG + ".shuffle"); + vargs.add("-D" + MRJobConfig.MR_PREFIX + + "shuffle.log.filesize=" + shuffleLogSize); + vargs.add("-D" + MRJobConfig.MR_PREFIX + + "shuffle.log.backups=" + numShuffleBackups); + } } /** @@ -687,7 +747,7 @@ public class MRApps extends Apps { public static String crossPlatformifyMREnv(Configuration conf, Environment env) { boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, - MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); + MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); return crossPlatform ? env.$$() : env.$(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java index 0efcf9d..721c8a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java @@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.Progressable; @@ -287,7 +286,8 @@ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> { "This method can only be called from within a Job"); } - String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r"; + String taskType = conf.getBoolean(JobContext.TASK_ISMAP, + JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r"; NumberFormat numberFormat = NumberFormat.getInstance(); numberFormat.setMinimumIntegerDigits(5); http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java index a86e76a..e07b5be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java @@ -473,6 +473,10 @@ public class TaskLog { * @return the number of bytes to cap the log files at */ public static long getTaskLogLength(JobConf conf) { + return getTaskLogLimitBytes(conf); + } + + public static long getTaskLogLimitBytes(Configuration conf) { return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 691074a..230361c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -177,6 +177,7 @@ public interface MRJobConfig { public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id"; public static final String TASK_ISMAP = "mapreduce.task.ismap"; + public static final boolean DEFAULT_TASK_ISMAP = true; public static final String TASK_PARTITION = "mapreduce.task.partition"; @@ -773,6 +774,18 @@ public interface MRJobConfig { MR_PREFIX + "task.container.log.backups"; public static final int DEFAULT_TASK_LOG_BACKUPS = 0; // don't roll + public static final String REDUCE_SEPARATE_SHUFFLE_LOG = + MR_PREFIX + "shuffle.log.separate"; + public static final boolean DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG = true; + + public static final String SHUFFLE_LOG_BACKUPS = + MR_PREFIX + "shuffle.log.backups"; + public static final int DEFAULT_SHUFFLE_LOG_BACKUPS = 0; // don't roll + + public static final String SHUFFLE_LOG_KB = + MR_PREFIX + "shuffle.log.limit.kb"; + public static final long DEFAULT_SHUFFLE_LOG_KB = 0L; + public static final String WORKFLOW_NAME = "mapreduce.workflow.name"; public static final String WORKFLOW_NODE_NAME = @@ -812,4 +825,5 @@ public interface MRJobConfig { "mapreduce.job.encrypted-intermediate-data.buffer.kb"; public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = 128; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6be62ec..43ddb13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -714,6 +714,34 @@ </property> <property> + <name>yarn.app.mapreduce.shuffle.log.separate</name> + <value>true</value> + <description>If enabled ('true') logging generated by the client-side shuffle + classes in a reducer will be written in a dedicated log file + 'syslog.shuffle' instead of 'syslog'. + </description> +</property> + +<property> + <name>yarn.app.mapreduce.shuffle.log.limit.kb</name> + <value>0</value> + <description>Maximum size of the syslog.shuffle file in kilobytes + (0 for no limit). + </description> +</property> + +<property> + <name>yarn.app.mapreduce.shuffle.log.backups</name> + <value>0</value> + <description>If yarn.app.mapreduce.shuffle.log.limit.kb and + yarn.app.mapreduce.shuffle.log.backups are greater than zero + then a ContainerRollngLogAppender is used instead of ContainerLogAppender + for syslog.shuffle. See + org.apache.log4j.RollingFileAppender.maxBackupIndex + </description> +</property> + +<property> <name>mapreduce.job.maxtaskfailures.per.tracker</name> <value>3</value> <description>The number of task-failures on a node manager of a given job http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 7b2cf53..41dc72f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -392,14 +392,7 @@ public class YARNRunner implements ClientProtocol { vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME) + "/bin/java"); - // TODO: why do we use 'conf' some places and 'jobConf' others? - long logSize = jobConf.getLong(MRJobConfig.MR_AM_LOG_KB, - MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10; - String logLevel = jobConf.get( - MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); - int numBackups = jobConf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, - MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS); - MRApps.addLog4jSystemProperties(logLevel, logSize, numBackups, vargs, conf); + MRApps.addLog4jSystemProperties(null, vargs, conf); // Check for Java Lib Path usage in MAP and REDUCE configs warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java index 0aba8b5..c49a1ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerLogAppender.java @@ -38,6 +38,7 @@ public class ContainerLogAppender extends FileAppender implements Flushable { private String containerLogDir; + private String containerLogFile; //so that log4j can configure it from the configuration(log4j.properties). private int maxEvents; private Queue<LoggingEvent> tail = null; @@ -49,7 +50,7 @@ public class ContainerLogAppender extends FileAppender if (maxEvents > 0) { tail = new LinkedList<LoggingEvent>(); } - setFile(new File(this.containerLogDir, "syslog").toString()); + setFile(new File(this.containerLogDir, containerLogFile).toString()); setAppend(true); super.activateOptions(); } @@ -102,6 +103,14 @@ public class ContainerLogAppender extends FileAppender this.containerLogDir = containerLogDir; } + public String getContainerLogFile() { + return containerLogFile; + } + + public void setContainerLogFile(String containerLogFile) { + this.containerLogFile = containerLogFile; + } + private static final int EVENT_SIZE = 100; public long getTotalLogFileSize() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerRollingLogAppender.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerRollingLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerRollingLogAppender.java index bdf1b09..7dd712e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerRollingLogAppender.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ContainerRollingLogAppender.java @@ -34,11 +34,12 @@ import java.io.Flushable; public class ContainerRollingLogAppender extends RollingFileAppender implements Flushable { private String containerLogDir; + private String containerLogFile; @Override public void activateOptions() { synchronized (this) { - setFile(new File(this.containerLogDir, "syslog").toString()); + setFile(new File(this.containerLogDir, containerLogFile).toString()); setAppend(true); super.activateOptions(); } @@ -62,4 +63,12 @@ public class ContainerRollingLogAppender extends RollingFileAppender public void setContainerLogDir(String containerLogDir) { this.containerLogDir = containerLogDir; } + + public String getContainerLogFile() { + return containerLogFile; + } + + public void setContainerLogFile(String containerLogFile) { + this.containerLogFile = containerLogFile; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLogAppender.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLogAppender.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLogAppender.java index 3b20d18..6b8e537 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLogAppender.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLogAppender.java @@ -30,6 +30,7 @@ public class TestContainerLogAppender { claAppender.setName("testCLA"); claAppender.setLayout(new PatternLayout("%-5p [%t]: %m%n")); claAppender.setContainerLogDir("target/testAppendInClose/logDir"); + claAppender.setContainerLogFile("syslog"); claAppender.setTotalLogFileSize(1000); claAppender.activateOptions(); final Logger claLog = Logger.getLogger("testAppendInClose-catergory"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/03ab24aa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/resources/container-log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/resources/container-log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/resources/container-log4j.properties index 06dec52..cf499b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/resources/container-log4j.properties +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/resources/container-log4j.properties @@ -13,6 +13,7 @@ # # Define some default values that can be overridden by system properties hadoop.root.logger=DEBUG,CLA +yarn.app.mapreduce.shuffle.logger=${hadoop.root.logger} # Define the root logger to the system property "hadoop.root.logger". log4j.rootLogger=${hadoop.root.logger}, EventCounter @@ -30,18 +31,44 @@ yarn.app.container.log.filesize=100 log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.CLA.containerLogFile=${hadoop.root.logfile} log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize} - log4j.appender.CLA.layout=org.apache.log4j.PatternLayout log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n log4j.appender.CRLA=org.apache.hadoop.yarn.ContainerRollingLogAppender log4j.appender.CRLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.CRLA.containerLogFile=${hadoop.root.logfile} log4j.appender.CRLA.maximumFileSize=${yarn.app.container.log.filesize} log4j.appender.CRLA.maxBackupIndex=${yarn.app.container.log.backups} log4j.appender.CRLA.layout=org.apache.log4j.PatternLayout log4j.appender.CRLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n +log4j.appender.shuffleCLA=org.apache.hadoop.yarn.ContainerLogAppender +log4j.appender.shuffleCLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.shuffleCLA.containerLogFile=${yarn.app.mapreduce.shuffle.logfile} +log4j.appender.shuffleCLA.totalLogFileSize=${yarn.app.mapreduce.shuffle.log.filesize} +log4j.appender.shuffleCLA.layout=org.apache.log4j.PatternLayout +log4j.appender.shuffleCLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n + +log4j.appender.shuffleCRLA=org.apache.hadoop.yarn.ContainerRollingLogAppender +log4j.appender.shuffleCRLA.containerLogDir=${yarn.app.container.log.dir} +log4j.appender.shuffleCRLA.containerLogFile=${yarn.app.mapreduce.shuffle.logfile} +log4j.appender.shuffleCRLA.maximumFileSize=${yarn.app.mapreduce.shuffle.log.filesize} +log4j.appender.shuffleCRLA.maxBackupIndex=${yarn.app.mapreduce.shuffle.log.backups} +log4j.appender.shuffleCRLA.layout=org.apache.log4j.PatternLayout +log4j.appender.shuffleCRLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n + +################################################################################ +# Shuffle Logger +# +log4j.logger.org.apache.hadoop.mapreduce.task.reduce=${yarn.app.mapreduce.shuffle.logger} +log4j.additivity.org.apache.hadoop.mapreduce.task.reduce=false +# Merger is used for both map-side and reduce-side spill merging. On the map +# side yarn.app.mapreduce.shuffle.logger == hadoop.root.logger +# +log4j.logger.org.apache.hadoop.mapred.Merger=${yarn.app.mapreduce.shuffle.logger} +log4j.additivity.org.apache.hadoop.mapred.Merger=false # # Event Counter Appender # Sends counts of logging messages at different severity levels to Hadoop Metrics.
