dustin12 commented on code in PR #30693:
URL: https://github.com/apache/beam/pull/30693#discussion_r1566540948
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -335,6 +352,19 @@ protected void takeSampleOnce(long millisSinceLastSample) {
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
+ updateMillisSinceBundleStart(millisSinceLastSample);
+ }
+
+ // Override this to implement bundle level lull reporting.
+ protected void reportBundleLull(Thread trackedThread, long
millisSinceBundleStart) {}
+
+ @SuppressWarnings("NonAtomicVolatileUpdate")
+ private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+ millisSinceBundleStart += millisSinceLastSample;
+ if (millisSinceBundleStart > nextBundleLullReportMs) {
+ reportBundleLull(trackedThread, millisSinceBundleStart);
+ nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
+ }
}
Review Comment:
Add a comment that the ignore warning is bc. updateMillisSinceBundleStart
will only ever be called by the thread doing the dofn processing. Reset is
where multiple threads may call and there they are doing blind writes.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java:
##########
@@ -139,8 +140,17 @@ public String getDescription() {
*/
private volatile long millisSinceLastTransition = 0;
+ /**
+ * The number of milliseconds since the {@link ExecutionStateTracker}
initial state.
+ *
+ * <p>This variable is updated by the Sampling thread, and read by the
Progress Reporting thread,
+ * thus it being marked volatile.
+ */
+ private volatile long millisSinceBundleStart = 0;
Review Comment:
its because we want to write from multiple threads during reset.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java:
##########
@@ -296,55 +267,6 @@ public void reportLull(Thread trackedThread, long millis) {
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(this, logRecord);
-
- if (shouldLogFullThreadDump(lullDuration)) {
Review Comment:
I don't think we should change the full thread dump for message level. It
happens only if a single message is stuck for 20 minutes which seems like the
pipeline is fully broken anyway. I agree leave it off for bundle level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]