This is an automated email from the ASF dual-hosted git repository.
cnauroth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81bda42503b MAPREDUCE-7531. TestMRJobs.testThreadDumpOnTaskTimeout
flaky due to thread dump delayed write.
81bda42503b is described below
commit 81bda42503be15c7742076f4dfe3faeddde1103a
Author: slfan1989 <[email protected]>
AuthorDate: Mon Feb 23 17:14:03 2026 +0000
MAPREDUCE-7531. TestMRJobs.testThreadDumpOnTaskTimeout flaky due to thread
dump delayed write.
Closes #8263
Signed-off-by: Chris Nauroth <[email protected]>
---
.../org/apache/hadoop/mapreduce/v2/TestMRJobs.java | 176 ++++++++++++++-------
.../org/apache/hadoop/mapreduce/v2/TestUberAM.java | 5 +
2 files changed, 125 insertions(+), 56 deletions(-)
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 8f1c1cf4dbd..2bdd0d8860e 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
@@ -33,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarOutputStream;
+import java.util.concurrent.TimeoutException;
import java.util.zip.ZipEntry;
import org.apache.commons.io.FileUtils;
@@ -85,6 +87,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.JarFinder;
@@ -421,7 +424,7 @@ private void testSleepJobInternal(Configuration sleepConf,
}
@Test
- @Timeout(value = 3000)
+ @Timeout(value = 300)
public void testJobWithChangePriority() throws Exception {
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// Assumption can be removed when FS priority support is implemented
@@ -1233,73 +1236,132 @@ public void testThreadDumpOnTaskTimeout() throws
IOException,
final String syslogGlob = appIdStr
+ Path.SEPARATOR + containerGlob
+ Path.SEPARATOR + TaskLog.LogName.SYSLOG;
- int numAppMasters = 0;
- int numMapTasks = 0;
+ final int expectedMapTasks = sleepConf.getBoolean(
+ MRJobConfig.JOB_UBERTASK_ENABLE, false) ? 0 : 1;
+ final int expectedAppMasters = 1;
+ // Thread dumps are written asynchronously; poll up to 30s to avoid flakes.
+ final class ThreadDumpScan {
+ int numAppMasters;
+ int numMapTasks;
+ boolean missingMapDump;
+ boolean unexpectedAmDump;
+ boolean missingUberAmDump;
+
+ void reset() {
+ numAppMasters = 0;
+ numMapTasks = 0;
+ missingMapDump = false;
+ unexpectedAmDump = false;
+ missingUberAmDump = false;
+ }
- for (int i = 0; i < NUM_NODE_MGRS; i++) {
- final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
- for (String logDir :
- nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
- final Path absSyslogGlob =
- new Path(logDir + Path.SEPARATOR + syslogGlob);
- LOG.info("Checking for glob: " + absSyslogGlob);
- for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
- boolean foundAppMaster = false;
- boolean foundThreadDump = false;
-
- // Determine the container type
- final BufferedReader syslogReader = new BufferedReader(
- new InputStreamReader(localFs.open(syslog.getPath())));
- try {
- for (String line; (line = syslogReader.readLine()) != null; ) {
- if (line.contains(MRAppMaster.class.getName())) {
- foundAppMaster = true;
- break;
- }
- }
- } finally {
- syslogReader.close();
- }
+ boolean countsReady() {
+ return numAppMasters == expectedAppMasters
+ && numMapTasks == expectedMapTasks;
+ }
- // Check for thread dump in stdout
- final Path stdoutPath = new Path(syslog.getPath().getParent(),
- TaskLog.LogName.STDOUT.toString());
- final BufferedReader stdoutReader = new BufferedReader(
- new InputStreamReader(localFs.open(stdoutPath)));
- try {
- for (String line; (line = stdoutReader.readLine()) != null; ) {
- if (line.contains("Full thread dump")) {
- foundThreadDump = true;
- break;
- }
- }
- } finally {
- stdoutReader.close();
- }
+ boolean dumpsReady() {
+ return !missingMapDump && !unexpectedAmDump && !missingUberAmDump;
+ }
+ }
- if (foundAppMaster) {
- numAppMasters++;
- if (this instanceof TestUberAM) {
- assertTrue(foundThreadDump, "No thread dump");
- } else {
- assertFalse(foundThreadDump, "Unexpected thread dump");
+ final ThreadDumpScan scan = new ThreadDumpScan();
+ // Re-scan logs until expected containers and dumps are observed (or
timeout).
+ try {
+ GenericTestUtils.waitFor(() -> {
+ scan.reset();
+ try {
+ for (int i = 0; i < NUM_NODE_MGRS; i++) {
+ final Configuration nmConf =
mrCluster.getNodeManager(i).getConfig();
+ for (String logDir :
+ nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
+ final Path absSyslogGlob =
+ new Path(logDir + Path.SEPARATOR + syslogGlob);
+ LOG.info("Checking for glob: " + absSyslogGlob);
+ for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
+ boolean foundAppMaster = false;
+ boolean foundThreadDump = false;
+
+ // Determine the container type and look for thread dump
markers.
+ final BufferedReader syslogReader = new BufferedReader(
+ new InputStreamReader(localFs.open(syslog.getPath())));
+ try {
+ for (String line;
+ (line = syslogReader.readLine()) != null; ) {
+ if (line.contains(MRAppMaster.class.getName())) {
+ foundAppMaster = true;
+ }
+ if (line.contains("Full thread dump")
+ || line.contains("Process Thread Dump")) {
+ foundThreadDump = true;
+ }
+ }
+ } finally {
+ syslogReader.close();
+ }
+
+ // Thread dump may be emitted to stdout depending on logging.
+ final Path stdoutPath = new Path(syslog.getPath().getParent(),
+ TaskLog.LogName.STDOUT.toString());
+ final BufferedReader stdoutReader = new BufferedReader(
+ new InputStreamReader(localFs.open(stdoutPath)));
+ try {
+ for (String line;
+ (line = stdoutReader.readLine()) != null; ) {
+ if (line.contains("Full thread dump")
+ || line.contains("Process Thread Dump")) {
+ foundThreadDump = true;
+ break;
+ }
+ }
+ } finally {
+ stdoutReader.close();
+ }
+
+ if (foundAppMaster) {
+ scan.numAppMasters++;
+ if (this instanceof TestUberAM) {
+ if (!foundThreadDump) {
+ scan.missingUberAmDump = true;
+ }
+ } else if (foundThreadDump) {
+ scan.unexpectedAmDump = true;
+ }
+ } else {
+ scan.numMapTasks++;
+ if (!foundThreadDump) {
+ scan.missingMapDump = true;
+ }
+ }
+ }
}
- } else {
- numMapTasks++;
- assertTrue(foundThreadDump, "No thread dump");
}
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
- }
+ // Both the container counts and expected dump presence must be
satisfied.
+ return scan.countsReady() && scan.dumpsReady();
+ }, 1000, 30_000, "thread dump logs not ready");
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out waiting for thread dump logs", e);
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
}
// Make sure we checked non-empty set
//
- assertEquals(1, numAppMasters, "No AppMaster log found!");
- if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
- assertSame(0, numMapTasks, "MapTask log with uber found!");
+ assertEquals(expectedAppMasters, scan.numAppMasters,
+ "No AppMaster log found!");
+ assertSame(expectedMapTasks, scan.numMapTasks,
+ sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)
+ ? "MapTask log with uber found!"
+ : "No MapTask log found!");
+ if (this instanceof TestUberAM) {
+ assertFalse(scan.missingUberAmDump, "No thread dump");
} else {
- assertSame(1, numMapTasks, "No MapTask log found!");
+ assertFalse(scan.unexpectedAmDump, "Unexpected thread dump");
}
+ assertFalse(scan.missingMapDump, "No thread dump");
}
private Path createTempFile(String filename, String contents)
@@ -1368,6 +1430,7 @@ private void createAndAddJarToJar(JarOutputStream jos,
File jarFile)
}
@Test
+ @Timeout(value = 300)
public void testSharedCache() throws Exception {
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
@@ -1450,6 +1513,7 @@ protected void setup(Context context)
}
@Test
+ @Timeout(value = 300)
public void testSleepJobName() throws IOException {
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
index 7ecc856ee84..8b6b5f74c50 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public static void setup() throws IOException {
@Override
@Test
+ @Timeout(value = 300)
public void testSleepJob()
throws Exception {
numSleepReducers = 1;
@@ -60,6 +62,7 @@ public void testSleepJob()
}
@Test
+ @Timeout(value = 300)
public void testSleepJobWithMultipleReducers()
throws Exception {
numSleepReducers = 3;
@@ -81,6 +84,7 @@ protected void verifySleepJobCounters(Job job) throws
InterruptedException,
@Override
@Test
+ @Timeout(value = 300)
public void testRandomWriter()
throws IOException, InterruptedException, ClassNotFoundException {
super.testRandomWriter();
@@ -99,6 +103,7 @@ protected void verifyRandomWriterCounters(Job job)
@Override
@Test
+ @Timeout(value = 300)
public void testFailingMapper()
throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting uberized testFailingMapper().");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]