dustin12 commented on code in PR #30693:
URL: https://github.com/apache/beam/pull/30693#discussion_r1552236808


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -335,6 +352,19 @@ protected void takeSampleOnce(long millisSinceLastSample) {
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
+    updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  // Override this to implement bundle level lull reporting.
+  protected void reportBundleLull(Thread trackedThread, long 
millisSinceBundleStart) {}
+
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+    millisSinceBundleStart += millisSinceLastSample;
+    if (millisSinceBundleStart > nextBundleLullReportMs) {
+      reportBundleLull(trackedThread, millisSinceBundleStart);
+      nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
+    }
   }
 

Review Comment:
   maybe a BEAM/Java developer can help here.  Is there a reason we supress 
these warnings?  Is it safe to do for some reason?  It seems to indicate a 
possible data race which we should fix?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/** Utility methods to print the stack traces of all the threads. */
+@Internal
+public final class StackTraceUtil {
+  private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+      ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
+
+  public static String getStackTraceForLullMessage(StackTraceElement[] 
stackTrace) {
+    StringBuilder message = new StringBuilder();
+    for (StackTraceElement e : stackTrace) {
+      if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+        break;

Review Comment:
   where is this code from?  It doesn't seem to have moved from another file 
and I'm not sure why it sbeing added.  I'm also a bit confused as to why we 
would stop a stacktrace as soon as we got to a Framework class, it doesn't seem 
related to bundle lulls.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -139,8 +140,17 @@ public String getDescription() {
    */
   private volatile long millisSinceLastTransition = 0;
 
+  /**
+   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
+   *
+   * <p>This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
+   * thus it being marked volatile.
+   */
+  private volatile long millisSinceBundleStart = 0;

Review Comment:
   why is this marked as volatile?  And thus needs the supress warning on line 
361.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -310,12 +350,64 @@ public Closeable activate() {
       }
     }
 
+    private String getBundleLullMessage(Thread trackedThread, Duration 
lullDuration) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("Operation ongoing in bundle for at least ")
+          .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
+          .append(" without completing")
+          .append("\n");
+      synchronized (this) {
+        if (this.activeMessageMetadata != null) {
+          message.append(
+              "Current user step name: " + 
getActiveMessageMetadata().get().userStepName() + "\n");
+          message.append(
+              "Time spent in this step(millis): "
+                  + (clock.currentTimeMillis() - 
getActiveMessageMetadata().get().startTime())
+                  + "\n");
+        }
+        message.append("Processing times in each step(millis)\n");
+        for (Map.Entry<String, IntSummaryStatistics> entry :
+            this.processingTimesByStep.entrySet()) {
+          message.append("Step name: " + entry.getKey() + "\n");
+          message.append("Time spent in this step: " + 
entry.getValue().toString() + "\n");
+        }
+      }
+
+      if (trackedThread != null) {
+        
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
+      }
+      return message.toString();
+    }
+
     @Override
     protected void takeSampleOnce(long millisSinceLastSample) {
       elementExecutionTracker.takeSample(millisSinceLastSample);
       super.takeSampleOnce(millisSinceLastSample);
     }
 
+    @Override
+    protected void reportBundleLull(Thread trackedThread, long 
millisElapsedSinceBundleStart) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord =
+          new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, 
lullDuration));
+      logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(logRecord);

Review Comment:
   should we do the logFullThreadDump if shouldLogFullThreadDump is true?  I'm 
not sure who added that.  Maybe Sam knows?  If no one knows lets not add it but 
since it exists for single message it seems like someone added it on purpose.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java:
##########
@@ -123,12 +158,115 @@ private void 
assertElementProcessingTimeCounter(NameContext step, int millis, in
                 .build()));
   }
 
-  private ExecutionStateTracker createTracker() {
+  private DataflowExecutionStateTracker createTracker() {
     return new DataflowExecutionStateTracker(
         sampler,
         new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
         counterSet,
         options,
         "test-work-item-id");
   }
+
+  @Test
+  public void testLullReportsRightTrace() throws Exception {
+    Thread mockThread = mock(Thread.class);
+    StackTraceElement[] doFnStackTrace =
+        new StackTraceElement[] {
+          new StackTraceElement(
+              "userpackage.SomeUserDoFn", "helperMethod", "SomeUserDoFn.java", 
250),
+          new StackTraceElement("userpackage.SomeUserDoFn", "process", 
"SomeUserDoFn.java", 450),
+          new StackTraceElement(
+              SimpleDoFnRunner.class.getName(), "processElement", 
"SimpleDoFnRunner.java", 500),
+        };
+    when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
+    FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+    DataflowExecutionStateTracker tracker = createTracker(clock);
+    // Adding test for the full thread dump, but since we can't mock
+    // Thread.getAllStackTraces(), we are starting a background thread
+    // to verify the full thread dump.
+    Thread backgroundThread =
+        new Thread("backgroundThread") {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+              // exiting the thread
+            }
+          }
+        };
+
+    backgroundThread.start();
+    try {
+      // Full thread dump should be performed, because we never performed
+      // a full thread dump before, and the lull duration is more than 20
+      // minutes.
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should not be performed because the last dump
+      // was only 5 minutes ago.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should not be performed because the lull duration
+      // is only 6 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(mockThread, 6 * 60 * 1000);
+      verifyLullLog();
+
+      // Full thread dump should be performed, because it has been 21 minutes
+      // since the last dump, and the lull duration is more than 20 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
+    } finally {
+      // Cleaning up the background thread.
+      backgroundThread.interrupt();
+      backgroundThread.join();
+    }
+  }
+
+  private void verifyLullLog() throws IOException {
+    File[] files = logFolder.listFiles();
+    assertThat(files, Matchers.arrayWithSize(1));
+    File logFile = files[0];
+    List<String> lines = Files.readAllLines(logFile.toPath());
+
+    String warnLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"WARN\"")));
+    assertThat(
+        warnLines,
+        Matchers.allOf(
+            Matchers.containsString("Operation ongoing in bundle for at 
least"),
+            Matchers.containsString(" without completing"),
+            Matchers.containsString("Processing times in each step"),
+            Matchers.containsString(
+                
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"),
+            Matchers.containsString("userpackage.SomeUserDoFn.helperMethod"),
+            
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
+
+    String infoLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
+    assertThat(
+        infoLines,
+        Matchers.not(

Review Comment:
   it seems from this we are always verifying that a full thread dump is not 
performed.  The comments in the tests sometimes say we want a full thread dump 
although looking at code I don't see when we'd ever get one.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -296,55 +267,6 @@ public void reportLull(Thread trackedThread, long millis) {
       DataflowWorkerLoggingHandler dataflowLoggingHandler =
           DataflowWorkerLoggingInitializer.getLoggingHandler();
       dataflowLoggingHandler.publish(this, logRecord);
-
-      if (shouldLogFullThreadDump(lullDuration)) {

Review Comment:
   why is reportLull changing so much?  I don't think you call this since your 
using reportBundleLull instead.  It seems like this would just remain the same.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -185,41 +181,19 @@ public abstract static class DataflowExecutionState 
extends ExecutionState {
     private final ProfileScope profileScope;
     private final @Nullable MetricsContainer metricsContainer;
 
-    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
-    private final Clock clock;
-
     public DataflowExecutionState(
         NameContext nameContext,
         String stateName,
         @Nullable String requestingStepName,
         @Nullable Integer inputIndex,
         @Nullable MetricsContainer metricsContainer,
         ProfileScope profileScope) {
-      this(
-          nameContext,
-          stateName,
-          requestingStepName,
-          inputIndex,
-          metricsContainer,
-          profileScope,
-          Clock.SYSTEM);
-    }
-
-    public DataflowExecutionState(
-        NameContext nameContext,
-        String stateName,
-        @Nullable String requestingStepName,
-        @Nullable Integer inputIndex,
-        @Nullable MetricsContainer metricsContainer,
-        ProfileScope profileScope,
-        Clock clock) {
       super(stateName);

Review Comment:
   why the change here from this constructor to super and then setting each 
field?  Maybe a Java thing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to