scwhittle commented on code in PR #29882:
URL: https://github.com/apache/beam/pull/29882#discussion_r1445985304


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -97,6 +98,17 @@ public void onActivate(boolean pushing) {}
      */
     public abstract void reportLull(Thread trackedThread, long millis);
 
+    /**
+     * Called when a lull has been detected since the start of bundle 
processing. This indicates
+     * that more than {@link #BUNDLE_LULL_REPORT_MS} has been spent processing 
the same bundle (ie
+     * between startBundle/finishBundle).
+     *
+     * @param trackedThread The execution thread that is in a lull.
+     * @param millis The milliseconds since the state was most recently 
entered.

Review Comment:
   should comment be, "the milliseconds since the bundle started processing"?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -276,6 +276,23 @@ protected String getLullMessage(Thread trackedThread, 
Duration lullDuration) {
       return message.toString();
     }
 
+    protected String getBundleLullMessage(Thread trackedThread, Duration 
lullDuration) {
+      StringBuilder message = new StringBuilder();
+      message.append("Operation ongoing");
+      if (getStepName() != null) {

Review Comment:
   remove stepname check and always log "in bundle"
   
   below I think the messaeg should be something like
   "Bundle processing ongoing for at least X without completing"
   
   steps may be outputting and procesisng, we just haven't finished processing 
all the input



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -335,6 +375,12 @@ protected void takeSampleOnce(long millisSinceLastSample) {
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
+    updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+    millisSinceBundleStart += millisSinceLastSample;

Review Comment:
   I think you could put the logic here to check if the bundle lull should be 
called or not (similar to check in updateMillisSinceLastTransition).
   
   Then you could call virtual reportBundleLull() which 
DataflowExecutionStateTracker could implement with it's own custom message.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
       }
     }
 
+    @Override
+    public void reportBundleLull(Thread trackedThread, String 
customLogMessage, long millis) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millis);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord =
+          new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, 
lullDuration));
+      logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+      LogRecord customLogRecord = new LogRecord(Level.WARNING, 
customLogMessage);
+      customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(this, logRecord);
+      dataflowLoggingHandler.publish(this, customLogRecord);
+
+      if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+        Map<Thread, StackTraceElement[]> threadSet = 
Thread.getAllStackTraces();

Review Comment:
   I think you should have some log that all threads are being dumped due to X 
duration before just logging them.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
       }
     }
 
+    @Override
+    public void reportBundleLull(Thread trackedThread, String 
customLogMessage, long millis) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millis);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord =
+          new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, 
lullDuration));
+      logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+      LogRecord customLogRecord = new LogRecord(Level.WARNING, 
customLogMessage);
+      customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(this, logRecord);
+      dataflowLoggingHandler.publish(this, customLogRecord);
+
+      if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+        Map<Thread, StackTraceElement[]> threadSet = 
Thread.getAllStackTraces();
+        for (Map.Entry<Thread, StackTraceElement[]> entry : 
threadSet.entrySet()) {

Review Comment:
   avoid code duplication with reportLull, you can factor out a method to log 
all the threads.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -97,6 +98,17 @@ public void onActivate(boolean pushing) {}
      */
     public abstract void reportLull(Thread trackedThread, long millis);
 
+    /**
+     * Called when a lull has been detected since the start of bundle 
processing. This indicates
+     * that more than {@link #BUNDLE_LULL_REPORT_MS} has been spent processing 
the same bundle (ie
+     * between startBundle/finishBundle).
+     *
+     * @param trackedThread The execution thread that is in a lull.
+     * @param millis The milliseconds since the state was most recently 
entered.
+     */
+    public abstract void reportBundleLull(

Review Comment:
   I don't think this needs to be a method on the ExecutionState, as it is not 
state specific. It could just be a method within ExecutionStateTracker which 
represents the execution across states.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
       }
     }
 
+    @Override
+    public void reportBundleLull(Thread trackedThread, String 
customLogMessage, long millis) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millis);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord =
+          new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, 
lullDuration));
+      logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+      LogRecord customLogRecord = new LogRecord(Level.WARNING, 
customLogMessage);
+      customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(this, logRecord);
+      dataflowLoggingHandler.publish(this, customLogRecord);
+
+      if (shouldLogFullThreadDumpForBundle(lullDuration)) {

Review Comment:
   && LOG.isInfoEnabled()
   since getting the stack traces could be expensive.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleExecutionState.java:
##########
@@ -149,11 +149,40 @@ public String getLullMessage(Thread trackedThread, 
Duration millis) {
     return message.toString();
   }
 
+  @VisibleForTesting
+  public String getBundleLullMessage(Thread trackedThread, Duration millis) {
+    // TODO(ajamato): Share getBunldeLullMessage code with 
DataflowExecutionState.
+    String userStepName =
+        
this.labelsMetadata.getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, 
null);
+    StringBuilder message = new StringBuilder();
+    message.append("Operation ongoing");
+    if (userStepName != null) {
+      message.append(" in bundle ");
+    }
+    message
+        .append(" for at least ")
+        .append(formatDuration(millis))
+        .append(" without outputting or completing ")
+        .append(getStateName());
+    message.append("\n");
+
+    StackTraceElement[] fullTrace = trackedThread.getStackTrace();
+    for (StackTraceElement e : fullTrace) {
+      message.append("  at ").append(e).append("\n");
+    }
+    return message.toString();
+  }
+
   @Override
   public void reportLull(Thread trackedThread, long millis) {
     LOG.warn(getLullMessage(trackedThread, Duration.millis(millis)));
   }
 
+  @Override
+  public void reportBundleLull(Thread trackedThread, String customLogMessage, 
long millis) {

Review Comment:
   if this is a method on ExecutionStateTracker, you could get rid of the 
customLogMessage by having DataflowExecutionStateTracker override log it's 
messaeg and then call the super method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to