arvindram03 commented on code in PR #30693:
URL: https://github.com/apache/beam/pull/30693#discussion_r1566528993
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -310,12 +350,64 @@ public Closeable activate() {
}
}
+ private String getBundleLullMessage(Thread trackedThread, Duration
lullDuration) {
+ StringBuilder message = new StringBuilder();
+ message
+ .append("Operation ongoing in bundle for at least ")
+ .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
+ .append(" without completing")
+ .append("\n");
+ synchronized (this) {
+ if (this.activeMessageMetadata != null) {
+ message.append(
+ "Current user step name: " +
getActiveMessageMetadata().get().userStepName() + "\n");
+ message.append(
+ "Time spent in this step(millis): "
+ + (clock.currentTimeMillis() -
getActiveMessageMetadata().get().startTime())
+ + "\n");
+ }
+ message.append("Processing times in each step(millis)\n");
+ for (Map.Entry<String, IntSummaryStatistics> entry :
+ this.processingTimesByStep.entrySet()) {
+ message.append("Step name: " + entry.getKey() + "\n");
+ message.append("Time spent in this step: " +
entry.getValue().toString() + "\n");
+ }
+ }
+
+ if (trackedThread != null) {
+
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
+ }
+ return message.toString();
+ }
+
@Override
protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSampleOnce(millisSinceLastSample);
}
+ @Override
+ protected void reportBundleLull(Thread trackedThread, long
millisElapsedSinceBundleStart) {
+ // If we're not logging warnings, nothing to report.
+ if (!LOG.isWarnEnabled()) {
+ return;
+ }
+
+ Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
+
+ // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
+ // context of the current step. To ensure things are logged correctly,
we get the currently
+ // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
+ LogRecord logRecord =
+ new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread,
lullDuration));
+ logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
+
+ // Publish directly in the context of this specific ExecutionState.
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ dataflowLoggingHandler.publish(logRecord);
Review Comment:
We initially add full thread dump here which spammed the logs. We removed as
part of this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]