This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81bda42503b MAPREDUCE-7531. TestMRJobs.testThreadDumpOnTaskTimeout 
flaky due to thread dump delayed write.
81bda42503b is described below

commit 81bda42503be15c7742076f4dfe3faeddde1103a
Author: slfan1989 <[email protected]>
AuthorDate: Mon Feb 23 17:14:03 2026 +0000

    MAPREDUCE-7531. TestMRJobs.testThreadDumpOnTaskTimeout flaky due to thread 
dump delayed write.
    
    Closes #8263
    
    Signed-off-by: Chris Nauroth <[email protected]>
---
 .../org/apache/hadoop/mapreduce/v2/TestMRJobs.java | 176 ++++++++++++++-------
 .../org/apache/hadoop/mapreduce/v2/TestUberAM.java |   5 +
 2 files changed, 125 insertions(+), 56 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 8f1c1cf4dbd..2bdd0d8860e 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.io.UncheckedIOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
@@ -33,6 +34,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.jar.JarOutputStream;
+import java.util.concurrent.TimeoutException;
 import java.util.zip.ZipEntry;
 
 import org.apache.commons.io.FileUtils;
@@ -85,6 +87,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ApplicationClassLoader;
 import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.JarFinder;
@@ -421,7 +424,7 @@ private void testSleepJobInternal(Configuration sleepConf,
   }
 
   @Test
-  @Timeout(value = 3000)
+  @Timeout(value = 300)
   public void testJobWithChangePriority() throws Exception {
     Configuration sleepConf = new Configuration(mrCluster.getConfig());
     // Assumption can be removed when FS priority support is implemented
@@ -1233,73 +1236,132 @@ public void testThreadDumpOnTaskTimeout() throws 
IOException,
     final String syslogGlob = appIdStr
         + Path.SEPARATOR + containerGlob
         + Path.SEPARATOR + TaskLog.LogName.SYSLOG;
-    int numAppMasters = 0;
-    int numMapTasks = 0;
+    final int expectedMapTasks = sleepConf.getBoolean(
+        MRJobConfig.JOB_UBERTASK_ENABLE, false) ? 0 : 1;
+    final int expectedAppMasters = 1;
+    // Thread dumps are written asynchronously; poll up to 30s to avoid flakes.
+    final class ThreadDumpScan {
+      int numAppMasters;
+      int numMapTasks;
+      boolean missingMapDump;
+      boolean unexpectedAmDump;
+      boolean missingUberAmDump;
+
+      void reset() {
+        numAppMasters = 0;
+        numMapTasks = 0;
+        missingMapDump = false;
+        unexpectedAmDump = false;
+        missingUberAmDump = false;
+      }
 
-    for (int i = 0; i < NUM_NODE_MGRS; i++) {
-      final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
-      for (String logDir :
-               nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
-        final Path absSyslogGlob =
-            new Path(logDir + Path.SEPARATOR + syslogGlob);
-        LOG.info("Checking for glob: " + absSyslogGlob);
-        for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
-          boolean foundAppMaster = false;
-          boolean foundThreadDump = false;
-
-          // Determine the container type
-          final BufferedReader syslogReader = new BufferedReader(
-              new InputStreamReader(localFs.open(syslog.getPath())));
-          try {
-            for (String line; (line = syslogReader.readLine()) != null; ) {
-              if (line.contains(MRAppMaster.class.getName())) {
-                foundAppMaster = true;
-                break;
-              }
-            }
-          } finally {
-            syslogReader.close();
-          }
+      boolean countsReady() {
+        return numAppMasters == expectedAppMasters
+            && numMapTasks == expectedMapTasks;
+      }
 
-          // Check for thread dump in stdout
-          final Path stdoutPath = new Path(syslog.getPath().getParent(),
-              TaskLog.LogName.STDOUT.toString());
-          final BufferedReader stdoutReader = new BufferedReader(
-              new InputStreamReader(localFs.open(stdoutPath)));
-          try {
-            for (String line; (line = stdoutReader.readLine()) != null; ) {
-              if (line.contains("Full thread dump")) {
-                foundThreadDump = true;
-                break;
-              }
-            }
-          } finally {
-            stdoutReader.close();
-          }
+      boolean dumpsReady() {
+        return !missingMapDump && !unexpectedAmDump && !missingUberAmDump;
+      }
+    }
 
-          if (foundAppMaster) {
-            numAppMasters++;
-            if (this instanceof TestUberAM) {
-              assertTrue(foundThreadDump, "No thread dump");
-            } else {
-              assertFalse(foundThreadDump, "Unexpected thread dump");
+    final ThreadDumpScan scan = new ThreadDumpScan();
+    // Re-scan logs until expected containers and dumps are observed (or 
timeout).
+    try {
+      GenericTestUtils.waitFor(() -> {
+        scan.reset();
+        try {
+          for (int i = 0; i < NUM_NODE_MGRS; i++) {
+            final Configuration nmConf = 
mrCluster.getNodeManager(i).getConfig();
+            for (String logDir :
+                     nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
+              final Path absSyslogGlob =
+                  new Path(logDir + Path.SEPARATOR + syslogGlob);
+              LOG.info("Checking for glob: " + absSyslogGlob);
+              for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
+                boolean foundAppMaster = false;
+                boolean foundThreadDump = false;
+
+                // Determine the container type and look for thread dump 
markers.
+                final BufferedReader syslogReader = new BufferedReader(
+                    new InputStreamReader(localFs.open(syslog.getPath())));
+                try {
+                  for (String line;
+                       (line = syslogReader.readLine()) != null; ) {
+                    if (line.contains(MRAppMaster.class.getName())) {
+                      foundAppMaster = true;
+                    }
+                    if (line.contains("Full thread dump")
+                        || line.contains("Process Thread Dump")) {
+                      foundThreadDump = true;
+                    }
+                  }
+                } finally {
+                  syslogReader.close();
+                }
+
+                // Thread dump may be emitted to stdout depending on logging.
+                final Path stdoutPath = new Path(syslog.getPath().getParent(),
+                    TaskLog.LogName.STDOUT.toString());
+                final BufferedReader stdoutReader = new BufferedReader(
+                    new InputStreamReader(localFs.open(stdoutPath)));
+                try {
+                  for (String line;
+                       (line = stdoutReader.readLine()) != null; ) {
+                    if (line.contains("Full thread dump")
+                        || line.contains("Process Thread Dump")) {
+                      foundThreadDump = true;
+                      break;
+                    }
+                  }
+                } finally {
+                  stdoutReader.close();
+                }
+
+                if (foundAppMaster) {
+                  scan.numAppMasters++;
+                  if (this instanceof TestUberAM) {
+                    if (!foundThreadDump) {
+                      scan.missingUberAmDump = true;
+                    }
+                  } else if (foundThreadDump) {
+                    scan.unexpectedAmDump = true;
+                  }
+                } else {
+                  scan.numMapTasks++;
+                  if (!foundThreadDump) {
+                    scan.missingMapDump = true;
+                  }
+                }
+              }
             }
-          } else {
-            numMapTasks++;
-            assertTrue(foundThreadDump, "No thread dump");
           }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
         }
-      }
+        // Both the container counts and expected dump presence must be 
satisfied.
+        return scan.countsReady() && scan.dumpsReady();
+      }, 1000, 30_000, "thread dump logs not ready");
+    } catch (TimeoutException e) {
+      LOG.warn("Timed out waiting for thread dump logs", e);
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
     }
 
     // Make sure we checked non-empty set
     //
-    assertEquals(1, numAppMasters, "No AppMaster log found!");
-    if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
-      assertSame(0, numMapTasks, "MapTask log with uber found!");
+    assertEquals(expectedAppMasters, scan.numAppMasters,
+        "No AppMaster log found!");
+    assertSame(expectedMapTasks, scan.numMapTasks,
+        sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)
+            ? "MapTask log with uber found!"
+            : "No MapTask log found!");
+    if (this instanceof TestUberAM) {
+      assertFalse(scan.missingUberAmDump, "No thread dump");
     } else {
-      assertSame(1, numMapTasks, "No MapTask log found!");
+      assertFalse(scan.unexpectedAmDump, "Unexpected thread dump");
     }
+    assertFalse(scan.missingMapDump, "No thread dump");
   }
 
   private Path createTempFile(String filename, String contents)
@@ -1368,6 +1430,7 @@ private void createAndAddJarToJar(JarOutputStream jos, 
File jarFile)
   }
 
   @Test
+  @Timeout(value = 300)
   public void testSharedCache() throws Exception {
     Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
 
@@ -1450,6 +1513,7 @@ protected void setup(Context context)
   }
 
   @Test
+  @Timeout(value = 300)
   public void testSleepJobName() throws IOException {
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(conf);
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
index 7ecc856ee84..8b6b5f74c50 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ public static void setup() throws IOException {
 
   @Override
   @Test
+  @Timeout(value = 300)
   public void testSleepJob()
   throws Exception {
     numSleepReducers = 1;
@@ -60,6 +62,7 @@ public void testSleepJob()
   }
   
   @Test
+  @Timeout(value = 300)
   public void testSleepJobWithMultipleReducers()
   throws Exception {
     numSleepReducers = 3;
@@ -81,6 +84,7 @@ protected void verifySleepJobCounters(Job job) throws 
InterruptedException,
 
   @Override
   @Test
+  @Timeout(value = 300)
   public void testRandomWriter()
   throws IOException, InterruptedException, ClassNotFoundException {
     super.testRandomWriter();
@@ -99,6 +103,7 @@ protected void verifyRandomWriterCounters(Job job)
 
   @Override
   @Test
+  @Timeout(value = 300)
   public void testFailingMapper()
   throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("\n\n\nStarting uberized testFailingMapper().");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to