scwhittle commented on code in PR #29222:
URL: https://github.com/apache/beam/pull/29222#discussion_r1378617295


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -250,6 +250,8 @@ public class ExecutionStateTracker implements 
BundleProgressReporter {
     private final AtomicReference<@Nullable Thread> trackedThread;
     // Read by multiple threads, read and written by the ExecutionStateSampler 
thread lazily.
     private final AtomicLong lastTransitionTime;
+    // Used to throttle lull logging.
+    private long lastLullReport;

Review Comment:
   can you make it clearer if this is a duration or a time?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -333,31 +335,41 @@ private void takeSample(long currentTimeMillis, long 
millisSinceLastSample) {
         transitionsAtLastSample = transitionsAtThisSample;
       } else {
         long lullTimeMs = currentTimeMillis - lastTransitionTime.get();
-        Thread thread = trackedThread.get();
         if (lullTimeMs > MAX_LULL_TIME_MS) {
-          if (thread == null) {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for at least %s without 
outputting or completing (stack trace unable to be generated).",
-                    processBundleId.get(),
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod())));
-          } else if (currentExecutionState == null) {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for at least %s without 
outputting or completing:%n  at %s",
-                    processBundleId.get(),
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod()),
-                    Joiner.on("\n  at ").join(thread.getStackTrace())));
-          } else {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for PTransform{id=%s, 
name=%s, state=%s} for at least %s without outputting or completing:%n  at %s",
-                    processBundleId.get(),
-                    currentExecutionState.ptransformId,
-                    currentExecutionState.ptransformUniqueName,
-                    currentExecutionState.stateName,
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod()),
-                    Joiner.on("\n  at ").join(thread.getStackTrace())));
+          if (lullTimeMs < lastLullReport // This must be a new report.
+              || lullTimeMs > 1.2 * lastLullReport // Exponential backoff.
+              || lullTimeMs
+                  > MAX_LULL_TIME_MS + lastLullReport // At least once every 
MAX_LULL_TIME_MS.
+          ) {
+            lastLullReport = lullTimeMs;

Review Comment:
   seems like this should be reset for each processed bundle (ie in start) or 
should be reset when state changes (ie where transitionsAtLastSample is set) . 
Otherwise for long-lived streaming pipelines long ops may have happened long 
before but set a high watermark preventing future logs for things stuck for 5 
minutes.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -333,31 +335,41 @@ private void takeSample(long currentTimeMillis, long 
millisSinceLastSample) {
         transitionsAtLastSample = transitionsAtThisSample;
       } else {
         long lullTimeMs = currentTimeMillis - lastTransitionTime.get();
-        Thread thread = trackedThread.get();
         if (lullTimeMs > MAX_LULL_TIME_MS) {
-          if (thread == null) {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for at least %s without 
outputting or completing (stack trace unable to be generated).",
-                    processBundleId.get(),
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod())));
-          } else if (currentExecutionState == null) {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for at least %s without 
outputting or completing:%n  at %s",
-                    processBundleId.get(),
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod()),
-                    Joiner.on("\n  at ").join(thread.getStackTrace())));
-          } else {
-            LOG.warn(
-                String.format(
-                    "Operation ongoing in bundle %s for PTransform{id=%s, 
name=%s, state=%s} for at least %s without outputting or completing:%n  at %s",
-                    processBundleId.get(),
-                    currentExecutionState.ptransformId,
-                    currentExecutionState.ptransformUniqueName,
-                    currentExecutionState.stateName,
-                    
DURATION_FORMATTER.print(Duration.millis(lullTimeMs).toPeriod()),
-                    Joiner.on("\n  at ").join(thread.getStackTrace())));
+          if (lullTimeMs < lastLullReport // This must be a new report.
+              || lullTimeMs > 1.2 * lastLullReport // Exponential backoff.

Review Comment:
   do we want to bother with the exponential? Seems more complicated than 
necessary and just adds 3 more logs which don't seem particularly valuable and 
perhaps spammier than desired:
   5 min, 6 min, 7.2 min, 8.64, 10, 15, 20



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to