scwhittle commented on code in PR #29882:
URL: https://github.com/apache/beam/pull/29882#discussion_r1445985304
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -97,6 +98,17 @@ public void onActivate(boolean pushing) {}
*/
public abstract void reportLull(Thread trackedThread, long millis);
+ /**
+ * Called when a lull has been detected since the start of bundle
processing. This indicates
+ * that more than {@link #BUNDLE_LULL_REPORT_MS} has been spent processing
the same bundle (ie
+ * between startBundle/finishBundle).
+ *
+ * @param trackedThread The execution thread that is in a lull.
+ * @param millis The milliseconds since the state was most recently
entered.
Review Comment:
should comment be, "the milliseconds since the bundle started processing"?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -276,6 +276,23 @@ protected String getLullMessage(Thread trackedThread,
Duration lullDuration) {
return message.toString();
}
+ protected String getBundleLullMessage(Thread trackedThread, Duration
lullDuration) {
+ StringBuilder message = new StringBuilder();
+ message.append("Operation ongoing");
+ if (getStepName() != null) {
Review Comment:
remove stepname check and always log "in bundle"
below I think the messaeg should be something like
"Bundle processing ongoing for at least X without completing"
steps may be outputting and procesisng, we just haven't finished processing
all the input
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -335,6 +375,12 @@ protected void takeSampleOnce(long millisSinceLastSample) {
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
+ updateMillisSinceBundleStart(millisSinceLastSample);
+ }
+
+ @SuppressWarnings("NonAtomicVolatileUpdate")
+ private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+ millisSinceBundleStart += millisSinceLastSample;
Review Comment:
I think you could put the logic here to check if the bundle lull should be
called or not (similar to check in updateMillisSinceLastTransition).
Then you could call virtual reportBundleLull() which
DataflowExecutionStateTracker could implement with it's own custom message.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
}
}
+ @Override
+ public void reportBundleLull(Thread trackedThread, String
customLogMessage, long millis) {
+ // If we're not logging warnings, nothing to report.
+ if (!LOG.isWarnEnabled()) {
+ return;
+ }
+
+ Duration lullDuration = Duration.millis(millis);
+
+ // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
+ // context of the current step. To ensure things are logged correctly,
we get the currently
+ // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
+ LogRecord logRecord =
+ new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread,
lullDuration));
+ logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+ LogRecord customLogRecord = new LogRecord(Level.WARNING,
customLogMessage);
+ customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+ // Publish directly in the context of this specific ExecutionState.
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ dataflowLoggingHandler.publish(this, logRecord);
+ dataflowLoggingHandler.publish(this, customLogRecord);
+
+ if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+ Map<Thread, StackTraceElement[]> threadSet =
Thread.getAllStackTraces();
Review Comment:
I think you should have some log that all threads are being dumped due to X
duration before just logging them.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
}
}
+ @Override
+ public void reportBundleLull(Thread trackedThread, String
customLogMessage, long millis) {
+ // If we're not logging warnings, nothing to report.
+ if (!LOG.isWarnEnabled()) {
+ return;
+ }
+
+ Duration lullDuration = Duration.millis(millis);
+
+ // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
+ // context of the current step. To ensure things are logged correctly,
we get the currently
+ // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
+ LogRecord logRecord =
+ new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread,
lullDuration));
+ logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+ LogRecord customLogRecord = new LogRecord(Level.WARNING,
customLogMessage);
+ customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+ // Publish directly in the context of this specific ExecutionState.
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ dataflowLoggingHandler.publish(this, logRecord);
+ dataflowLoggingHandler.publish(this, customLogRecord);
+
+ if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+ Map<Thread, StackTraceElement[]> threadSet =
Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> entry :
threadSet.entrySet()) {
Review Comment:
avoid code duplication with reportLull, you can factor out a method to log
all the threads.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -97,6 +98,17 @@ public void onActivate(boolean pushing) {}
*/
public abstract void reportLull(Thread trackedThread, long millis);
+ /**
+ * Called when a lull has been detected since the start of bundle
processing. This indicates
+ * that more than {@link #BUNDLE_LULL_REPORT_MS} has been spent processing
the same bundle (ie
+ * between startBundle/finishBundle).
+ *
+ * @param trackedThread The execution thread that is in a lull.
+ * @param millis The milliseconds since the state was most recently
entered.
+ */
+ public abstract void reportBundleLull(
Review Comment:
I don't think this needs to be a method on the ExecutionState, as it is not
state specific. It could just be a method within ExecutionStateTracker which
represents the execution across states.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -312,6 +329,45 @@ public void reportLull(Thread trackedThread, long millis) {
}
}
+ @Override
+ public void reportBundleLull(Thread trackedThread, String
customLogMessage, long millis) {
+ // If we're not logging warnings, nothing to report.
+ if (!LOG.isWarnEnabled()) {
+ return;
+ }
+
+ Duration lullDuration = Duration.millis(millis);
+
+ // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
+ // context of the current step. To ensure things are logged correctly,
we get the currently
+ // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
+ LogRecord logRecord =
+ new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread,
lullDuration));
+ logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+ LogRecord customLogRecord = new LogRecord(Level.WARNING,
customLogMessage);
+ customLogRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+
+ // Publish directly in the context of this specific ExecutionState.
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ dataflowLoggingHandler.publish(this, logRecord);
+ dataflowLoggingHandler.publish(this, customLogRecord);
+
+ if (shouldLogFullThreadDumpForBundle(lullDuration)) {
Review Comment:
&& LOG.isInfoEnabled()
since getting the stack traces could be expensive.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleExecutionState.java:
##########
@@ -149,11 +149,40 @@ public String getLullMessage(Thread trackedThread,
Duration millis) {
return message.toString();
}
+ @VisibleForTesting
+ public String getBundleLullMessage(Thread trackedThread, Duration millis) {
+ // TODO(ajamato): Share getBunldeLullMessage code with
DataflowExecutionState.
+ String userStepName =
+
this.labelsMetadata.getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM,
null);
+ StringBuilder message = new StringBuilder();
+ message.append("Operation ongoing");
+ if (userStepName != null) {
+ message.append(" in bundle ");
+ }
+ message
+ .append(" for at least ")
+ .append(formatDuration(millis))
+ .append(" without outputting or completing ")
+ .append(getStateName());
+ message.append("\n");
+
+ StackTraceElement[] fullTrace = trackedThread.getStackTrace();
+ for (StackTraceElement e : fullTrace) {
+ message.append(" at ").append(e).append("\n");
+ }
+ return message.toString();
+ }
+
@Override
public void reportLull(Thread trackedThread, long millis) {
LOG.warn(getLullMessage(trackedThread, Duration.millis(millis)));
}
+ @Override
+ public void reportBundleLull(Thread trackedThread, String customLogMessage,
long millis) {
Review Comment:
if this is a method on ExecutionStateTracker, you could get rid of the
customLogMessage by having DataflowExecutionStateTracker override log it's
messaeg and then call the super method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]