arvindram03 commented on code in PR #30693:
URL: https://github.com/apache/beam/pull/30693#discussion_r1566527849


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java:
##########
@@ -123,12 +158,115 @@ private void 
assertElementProcessingTimeCounter(NameContext step, int millis, in
                 .build()));
   }
 
-  private ExecutionStateTracker createTracker() {
+  private DataflowExecutionStateTracker createTracker() {
     return new DataflowExecutionStateTracker(
         sampler,
         new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
         counterSet,
         options,
         "test-work-item-id");
   }
+
+  @Test
+  public void testLullReportsRightTrace() throws Exception {
+    Thread mockThread = mock(Thread.class);
+    StackTraceElement[] doFnStackTrace =
+        new StackTraceElement[] {
+          new StackTraceElement(
+              "userpackage.SomeUserDoFn", "helperMethod", "SomeUserDoFn.java", 
250),
+          new StackTraceElement("userpackage.SomeUserDoFn", "process", 
"SomeUserDoFn.java", 450),
+          new StackTraceElement(
+              SimpleDoFnRunner.class.getName(), "processElement", 
"SimpleDoFnRunner.java", 500),
+        };
+    when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
+    FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+    DataflowExecutionStateTracker tracker = createTracker(clock);
+    // Adding test for the full thread dump, but since we can't mock
+    // Thread.getAllStackTraces(), we are starting a background thread
+    // to verify the full thread dump.
+    Thread backgroundThread =
+        new Thread("backgroundThread") {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+              // exiting the thread
+            }
+          }
+        };
+
+    backgroundThread.start();
+    try {
+      // Full thread dump should be performed, because we never performed
+      // a full thread dump before, and the lull duration is more than 20
+      // minutes.
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should not be performed because the last dump
+      // was only 5 minutes ago.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should not be performed because the lull duration
+      // is only 6 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(mockThread, 6 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should be performed, because it has been 21 minutes
+      // since the last dump, and the lull duration is more than 20 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+    } finally {
+      // Cleaning up the background thread.
+      backgroundThread.interrupt();
+      backgroundThread.join();
+    }
+  }
+
+  private void verifyLullLog() throws IOException {
+    File[] files = logFolder.listFiles();
+    assertThat(files, Matchers.arrayWithSize(1));
+    File logFile = files[0];
+    List<String> lines = Files.readAllLines(logFile.toPath());
+
+    String warnLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"WARN\"")));
+    assertThat(
+        warnLines,
+        Matchers.allOf(
+            Matchers.containsString("Operation ongoing in bundle for at 
least"),
+            Matchers.containsString(" without completing"),
+            Matchers.containsString("Processing times in each step"),
+            Matchers.containsString(
+                
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"),
+            Matchers.containsString("userpackage.SomeUserDoFn.helperMethod"),
+            
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
+
+    String infoLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
+    assertThat(
+        infoLines,
+        Matchers.not(

Review Comment:
   Fixed the comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to