This is an automated email from the ASF dual-hosted git repository. epayne pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 48a8d76 MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein) 48a8d76 is described below commit 48a8d7691ead49911a8ff56ba678a093342cabdd Author: Eric E Payne <er...@verizonmedia.com> AuthorDate: Mon Apr 13 19:47:27 2020 +0000 MAPREDUCE-7272. TaskAttemptListenerImpl excessive log messages. Contributed by Ahmed Hussein (ahussein) --- .../hadoop/mapred/TaskAttemptListenerImpl.java | 94 ++++++++++++++++++++-- .../mapred/TestTaskAttemptFinishingMonitor.java | 2 +- .../hadoop/mapred/TestTaskAttemptListenerImpl.java | 2 + .../apache/hadoop/mapreduce/v2/app/TestFail.java | 1 + .../mapreduce/v2/app/TestTaskHeartbeatHandler.java | 2 + .../org/apache/hadoop/mapreduce/MRJobConfig.java | 23 ++++++ .../hadoop/mapreduce/util/MRJobConfUtil.java | 72 +++++++++++++++++ 7 files changed, 189 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 8151286..6d0e781 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.util.MRJobConfUtil; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; @@ -56,6 +57,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @@ -89,6 +91,11 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap<TaskAttemptId, AtomicReference<TaskAttemptStatus>> attemptIdToStatus = new ConcurrentHashMap<>(); + /** + * A Map to keep track of the history of logging each task attempt. + */ + private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair> + taskAttemptLogProgressStamps = new ConcurrentHashMap<>(); private Set<WrappedJvmID> launchedJVMs = Collections .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); @@ -110,10 +117,12 @@ public class TaskAttemptListenerImpl extends CompositeService @Override protected void serviceInit(Configuration conf) throws Exception { - registerHeartbeatHandler(conf); - commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, - MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); - super.serviceInit(conf); + registerHeartbeatHandler(conf); + commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, + MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); + // initialize the delta threshold for logging the task progress. + MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf); + super.serviceInit(conf); } @Override @@ -164,6 +173,9 @@ public class TaskAttemptListenerImpl extends CompositeService @Override protected void serviceStop() throws Exception { stopRpcServer(); + if (taskAttemptLogProgressStamps != null) { + taskAttemptLogProgressStamps.clear(); + } super.serviceStop(); } @@ -359,8 +371,15 @@ public class TaskAttemptListenerImpl extends CompositeService taskAttemptStatus.id = yarnAttemptID; // Task sends the updated progress to the TT. taskAttemptStatus.progress = taskStatus.getProgress(); - LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : " - + taskStatus.getProgress()); + // log the new progress + TaskProgressLogPair logPair = + taskAttemptLogProgressStamps.get(taskAttemptID); + if (logPair == null) { + taskAttemptLogProgressStamps.putIfAbsent(taskAttemptID, + new TaskProgressLogPair(taskAttemptID)); + logPair = taskAttemptLogProgressStamps.get(taskAttemptID); + } + logPair.update(taskStatus.getProgress()); // Task sends the updated state-string to the TT. taskAttemptStatus.stateString = taskStatus.getStateString(); // Task sends the updated phase to the TT. @@ -574,4 +593,67 @@ public class TaskAttemptListenerImpl extends CompositeService AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() { return attemptIdToStatus; } + + /** + * Entity to keep track of the taskAttempt, last time it was logged, + * and the progress that has been logged. + */ + class TaskProgressLogPair { + + /** + * The taskAttemptId of that history record. + */ + private final TaskAttemptID taskAttemptID; + /** + * Timestamp of last time the progress was logged. + */ + private volatile long logTimeStamp; + /** + * Snapshot of the last logged progress. + */ + private volatile double prevProgress; + + TaskProgressLogPair(final TaskAttemptID attemptID) { + taskAttemptID = attemptID; + prevProgress = 0.0; + logTimeStamp = 0; + } + + private void resetLog(final boolean doLog, + final float progress, final double processedProgress, + final long timestamp) { + if (doLog) { + prevProgress = processedProgress; + logTimeStamp = timestamp; + LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : " + + progress); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : " + + progress); + } + } + } + + public void update(final float progress) { + final double processedProgress = + MRJobConfUtil.convertTaskProgressToFactor(progress); + final double diffProgress = processedProgress - prevProgress; + final long currentTime = Time.monotonicNow(); + boolean result = + (Double.compare(diffProgress, + MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0); + if (!result) { + // check if time has expired. + result = ((currentTime - logTimeStamp) + >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold()); + } + // It is helpful to log the progress when it reaches 1.0F. + if (Float.compare(progress, 1.0f) == 0) { + result = true; + taskAttemptLogProgressStamps.remove(taskAttemptID); + } + resetLog(result, progress, processedProgress, currentTime); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java index 32e6867..46b40bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java @@ -50,7 +50,7 @@ public class TestTaskAttemptFinishingMonitor { Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); - + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); AppContext appCtx = mock(AppContext.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class); RMHeartbeatHandler rmHeartbeatHandler = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 53a2ba0..023ca4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -423,6 +423,8 @@ public class TestTaskAttemptListenerImpl { Configuration conf = new Configuration(); conf.setLong(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, 1); tal.init(conf); tal.start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 75c78fd..81cad82 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -268,6 +268,7 @@ public class TestFail { protected void serviceInit(Configuration conf) throws Exception { conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); super.serviceInit(conf); } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java index 5d86479..ca03958 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestTaskHeartbeatHandler.java @@ -58,6 +58,7 @@ public class TestTaskHeartbeatHandler { // so that TASK_TIMEOUT is not overridden conf.setLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 5); conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); hb.init(conf); hb.start(); @@ -117,6 +118,7 @@ public class TestTaskHeartbeatHandler { new TaskHeartbeatHandler(mockHandler, clock, 1); Configuration conf = new Configuration(); conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1); + conf.setDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, 0.01); hb.init(conf); hb.start(); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 37be797..d22b9ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -346,6 +346,29 @@ public interface MRJobConfig { public static final int TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT = 20 * 1000; + /** + * TaskAttemptListenerImpl will log the task progress when the delta progress + * is larger than or equal the defined value. + * The double value has to be between 0, and 1 with two decimals. + */ + String TASK_LOG_PROGRESS_DELTA_THRESHOLD = + "mapreduce.task.log.progress.delta.threshold"; + /** + * Default delta progress is set to 5%. + */ + double TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT = 0.05; + /** + * TaskAttemptListenerImpl will log the task progress when the defined value + * in seconds expires. + * This helps to debug task attempts that are doing very slow progress. + */ + String TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS = + "mapreduce.task.log.progress.wait.interval-seconds"; + /** + * Default period to log the task attempt progress is 60 seconds. + */ + long TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT = 60L; + public static final String TASK_ID = "mapreduce.task.id"; public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java index afedef3..4e4e78e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapreduce.util; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -58,4 +59,75 @@ public final class MRJobConfUtil { } public static final float TASK_REPORT_INTERVAL_TO_TIMEOUT_RATIO = 0.01f; + + /** + * Configurations to control the frequency of logging of task Attempt. + */ + public static final double PROGRESS_MIN_DELTA_FACTOR = 100.0; + private static volatile Double progressMinDeltaThreshold = null; + private static volatile Long progressMaxWaitDeltaTimeThreshold = null; + + /** + * load the values defined from a configuration file including the delta + * progress and the maximum time between each log message. + * @param conf + */ + public static void setTaskLogProgressDeltaThresholds( + final Configuration conf) { + if (progressMinDeltaThreshold == null) { + progressMinDeltaThreshold = + new Double(PROGRESS_MIN_DELTA_FACTOR + * conf.getDouble(MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD, + MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT)); + } + + if (progressMaxWaitDeltaTimeThreshold == null) { + progressMaxWaitDeltaTimeThreshold = + TimeUnit.SECONDS.toMillis(conf + .getLong( + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS, + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT)); + } + } + + /** + * Retrieves the min delta progress required to log the task attempt current + * progress. + * @return the defined threshold in the conf. + * returns the default value if + * {@link #setTaskLogProgressDeltaThresholds} has not been called. + */ + public static double getTaskProgressMinDeltaThreshold() { + if (progressMinDeltaThreshold == null) { + return PROGRESS_MIN_DELTA_FACTOR + * MRJobConfig.TASK_LOG_PROGRESS_DELTA_THRESHOLD_DEFAULT; + } + return progressMinDeltaThreshold.doubleValue(); + } + + /** + * Retrieves the min time required to log the task attempt current + * progress. + * @return the defined threshold in the conf. + * returns the default value if + * {@link #setTaskLogProgressDeltaThresholds} has not been called. + */ + public static long getTaskProgressWaitDeltaTimeThreshold() { + if (progressMaxWaitDeltaTimeThreshold == null) { + return TimeUnit.SECONDS.toMillis( + MRJobConfig.TASK_LOG_PROGRESS_WAIT_INTERVAL_SECONDS_DEFAULT); + } + return progressMaxWaitDeltaTimeThreshold.longValue(); + } + + /** + * Coverts a progress between 0.0 to 1.0 to double format used to log the + * task attempt. + * @param progress of the task which is a value between 0.0 and 1.0. + * @return the double value that is less than or equal to the argument + * multiplied by {@link #PROGRESS_MIN_DELTA_FACTOR}. + */ + public static double convertTaskProgressToFactor(final float progress) { + return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org