scwhittle commented on code in PR #29882:
URL: https://github.com/apache/beam/pull/29882#discussion_r1452359816


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -321,6 +354,13 @@ public void takeSample(long millisSinceLastSample) {
     }
   }
 
+  protected void reportBundleLull(String customLogMessage) {
+    if (currentState != null) {
+      currentState.reportBundleLull(trackedThread, customLogMessage, 
millisSinceBundleStart);

Review Comment:
   example is here. This wouldn't log if there wasn't a current state. But 
we're logging about the bundle, not anything about the particular state.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -97,6 +98,17 @@ public void onActivate(boolean pushing) {}
      */
     public abstract void reportLull(Thread trackedThread, long millis);
 
+    /**
+     * Called when a lull has been detected since the start of bundle 
processing. This indicates
+     * that more than {@link #BUNDLE_LULL_REPORT_MS} has been spent processing 
the same bundle (ie
+     * between startBundle/finishBundle).
+     *
+     * @param trackedThread The execution thread that is in a lull.
+     * @param millis The milliseconds since the state was most recently 
entered.
+     */
+    public abstract void reportBundleLull(

Review Comment:
   I think that is worthwhile to avoid having this rely on the step context 
when it should be logged regardless of if there is a step context or not.
   
   You can extract whatever shared logic to print the thread stacks static 
method in ExecutionStateTracker to share it.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -335,6 +375,12 @@ protected void takeSampleOnce(long millisSinceLastSample) {
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
+    updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+    millisSinceBundleStart += millisSinceLastSample;

Review Comment:
   I think you could only construct the message when logging with my suggestion.
   ```
     private void updateMillisSinceBundleStart(long millisSinceLastSample) {
       // This variable is written by the Sampler thread, and read by the 
Progress Reporting thread.
       // Because only one thread modifies it, volatile provides enough 
synchronization.
       millisSinceLastStart += millisSinceLastSample;
       if (millisSinceLastStart > nextBundleLullReportMs) {
         // virtual reportBundleLull woudl construct the expensive message 
itself.
         reportBundleLull(trackedThread, millisSinceLastTransition);
         nextBundleLullReportMs += LULL_REPORT_MS;
       }
     }
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleExecutionState.java:
##########
@@ -149,11 +149,40 @@ public String getLullMessage(Thread trackedThread, 
Duration millis) {
     return message.toString();
   }
 
+  @VisibleForTesting
+  public String getBundleLullMessage(Thread trackedThread, Duration millis) {
+    // TODO(ajamato): Share getBunldeLullMessage code with 
DataflowExecutionState.
+    String userStepName =
+        
this.labelsMetadata.getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, 
null);
+    StringBuilder message = new StringBuilder();
+    message.append("Operation ongoing");
+    if (userStepName != null) {
+      message.append(" in bundle ");
+    }
+    message
+        .append(" for at least ")
+        .append(formatDuration(millis))
+        .append(" without outputting or completing ")
+        .append(getStateName());
+    message.append("\n");
+
+    StackTraceElement[] fullTrace = trackedThread.getStackTrace();
+    for (StackTraceElement e : fullTrace) {
+      message.append("  at ").append(e).append("\n");
+    }
+    return message.toString();
+  }
+
   @Override
   public void reportLull(Thread trackedThread, long millis) {
     LOG.warn(getLullMessage(trackedThread, Duration.millis(millis)));
   }
 
+  @Override
+  public void reportBundleLull(Thread trackedThread, String customLogMessage, 
long millis) {

Review Comment:
   see above, seems worthwhile changing even if it makes PR bigger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to