This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ffe2dba5320 Implementing lull reporting at bundle level processing 
(#29882)
ffe2dba5320 is described below

commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf
Author: Arvind Ram <[email protected]>
AuthorDate: Mon Feb 26 12:23:45 2024 -0800

    Implementing lull reporting at bundle level processing (#29882)
---
 .../core/metrics/ExecutionStateTracker.java        |  25 ++++
 .../dataflow/worker/DataflowExecutionContext.java  | 117 ++++++++++++++++-
 .../dataflow/worker/DataflowOperationContext.java  |  80 +-----------
 .../runners/dataflow/worker/StackTraceUtil.java    |  66 ++++++++++
 .../worker/DataflowExecutionStateTrackerTest.java  | 140 ++++++++++++++++++++-
 .../worker/DataflowOperationContextTest.java       |  80 ++----------
 6 files changed, 353 insertions(+), 155 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index dc6fd2f8248..b0b8f0107f3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,6 +46,7 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> 
SAMPLING_UPDATER =
       AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -139,8 +140,17 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
    */
   private volatile long millisSinceLastTransition = 0;
 
+  /**
+   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
+   *
+   * <p>This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
+   * thus it being marked volatile.
+   */
+  private volatile long millisSinceBundleStart = 0;
+
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
+  private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
     this.sampler = sampler;
@@ -155,8 +165,10 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
     currentState = null;
     numTransitions = 0;
     millisSinceLastTransition = 0;
+    millisSinceBundleStart = 0;
     transitionsAtLastSample = 0;
     nextLullReportMs = LULL_REPORT_MS;
+    nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   @VisibleForTesting
@@ -335,6 +347,19 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
+    updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  // Override this to implement bundle level lull reporting.
+  protected void reportBundleLull(long millisSinceBundleStart) {}
+
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+    millisSinceBundleStart += millisSinceLastSample;
+    if (millisSinceBundleStart > nextBundleLullReportMs) {
+      reportBundleLull(millisSinceBundleStart);
+      nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
+    }
   }
 
   @SuppressWarnings("NonAtomicVolatileUpdate")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 080fa7c9dac..16ff2975b02 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
@@ -29,6 +30,8 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.NullSideInputReader;
@@ -37,10 +40,13 @@ import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -48,11 +54,16 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Execution context for the Dataflow worker. */
 @SuppressWarnings({
@@ -260,23 +271,59 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
     @Nullable
     private ActiveMessageMetadata activeMessageMetadata = null;
 
-    private final MillisProvider clock = System::currentTimeMillis;
+    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
+    private final Clock clock;
 
     @GuardedBy("this")
     private final Map<String, IntSummaryStatistics> processingTimesByStep = 
new HashMap<>();
 
+    /** Last milliseconds since epoch when a full thread dump was performed. */
+    private long lastFullThreadDumpMillis = 0;
+
+    /** The minimum lull duration in milliseconds to perform a full thread 
dump. */
+    private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 
60 * 1000;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
+
+    private static final PeriodFormatter DURATION_FORMATTER =
+        new PeriodFormatterBuilder()
+            .appendDays()
+            .appendSuffix("d")
+            .minimumPrintedDigits(2)
+            .appendHours()
+            .appendSuffix("h")
+            .printZeroAlways()
+            .appendMinutes()
+            .appendSuffix("m")
+            .appendSeconds()
+            .appendSuffix("s")
+            .toFormatter();
+
     public DataflowExecutionStateTracker(
         ExecutionStateSampler sampler,
         DataflowOperationContext.DataflowExecutionState otherState,
         CounterFactory counterFactory,
         PipelineOptions options,
         String workItemId) {
+      this(sampler, otherState, counterFactory, options, workItemId, 
Clock.SYSTEM);
+    }
+
+    @VisibleForTesting
+    public DataflowExecutionStateTracker(
+        ExecutionStateSampler sampler,
+        DataflowOperationContext.DataflowExecutionState otherState,
+        CounterFactory counterFactory,
+        PipelineOptions options,
+        String workItemId,
+        Clock clock) {
       super(sampler);
       this.elementExecutionTracker =
           DataflowElementExecutionTracker.create(counterFactory, options);
       this.otherState = otherState;
       this.workItemId = workItemId;
       this.contextActivationObserverRegistry = 
ContextActivationObserverRegistry.createDefault();
+      this.clock = clock;
+      DataflowWorkerLoggingInitializer.initialize();
     }
 
     @Override
@@ -301,12 +348,76 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       }
     }
 
+    private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
+      if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) 
{
+        return false;
+      }
+      long now = clock.currentTimeMillis();
+      if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS 
< now) {
+        lastFullThreadDumpMillis = now;
+        return true;
+      }
+      return false;
+    }
+
+    private String getBundleLullMessage(Duration lullDuration) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("Operation ongoing in bundle for at least ")
+          .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
+          .append(" without completing")
+          .append("\n");
+      synchronized (this) {
+        if (this.activeMessageMetadata != null) {
+          message.append(
+              "Current user step name: " + 
getActiveMessageMetadata().get().userStepName() + "\n");
+          message.append(
+              "Time spent in this step(millis): "
+                  + (clock.currentTimeMillis() - 
getActiveMessageMetadata().get().startTime())
+                  + "\n");
+        }
+        message.append("Processing times in each step(millis)\n");
+        for (Map.Entry<String, IntSummaryStatistics> entry :
+            this.processingTimesByStep.entrySet()) {
+          message.append("Step name: " + entry.getKey() + "\n");
+          message.append("Time spent in this step: " + 
entry.getValue().toString() + "\n");
+        }
+      }
+
+      return message.toString();
+    }
+
     @Override
     protected void takeSampleOnce(long millisSinceLastSample) {
       elementExecutionTracker.takeSample(millisSinceLastSample);
       super.takeSampleOnce(millisSinceLastSample);
     }
 
+    @Override
+    protected void reportBundleLull(long millisElapsedSinceBundleStart) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord = new LogRecord(Level.WARNING, 
getBundleLullMessage(lullDuration));
+      logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(logRecord);
+
+      if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+        StackTraceUtil.logAllStackTraces();
+      }
+    }
+
     /**
      * Enter a new state on the tracker. If the new state is a Dataflow 
processing state, tracks the
      * activeMessageMetadata with the start time of the new state.
@@ -323,7 +434,7 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
           synchronized (this) {
             this.activeMessageMetadata =
                 ActiveMessageMetadata.create(
-                    newDFState.getStepName().userName(), clock.getMillis());
+                    newDFState.getStepName().userName(), 
clock.currentTimeMillis());
           }
         }
         elementExecutionTracker.enter(newDFState.getStepName());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index b2ab928bc99..7d90a512519 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -19,16 +19,13 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
 
-import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.CounterMetadata;
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.io.Closeable;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -42,7 +39,6 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.format.PeriodFormatter;
@@ -185,9 +181,6 @@ public class DataflowOperationContext implements 
OperationContext {
     private final ProfileScope profileScope;
     private final @Nullable MetricsContainer metricsContainer;
 
-    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
-    private final Clock clock;
-
     public DataflowExecutionState(
         NameContext nameContext,
         String stateName,
@@ -195,31 +188,12 @@ public class DataflowOperationContext implements 
OperationContext {
         @Nullable Integer inputIndex,
         @Nullable MetricsContainer metricsContainer,
         ProfileScope profileScope) {
-      this(
-          nameContext,
-          stateName,
-          requestingStepName,
-          inputIndex,
-          metricsContainer,
-          profileScope,
-          Clock.SYSTEM);
-    }
-
-    public DataflowExecutionState(
-        NameContext nameContext,
-        String stateName,
-        @Nullable String requestingStepName,
-        @Nullable Integer inputIndex,
-        @Nullable MetricsContainer metricsContainer,
-        ProfileScope profileScope,
-        Clock clock) {
       super(stateName);
       this.stepName = nameContext;
       this.requestingStepName = requestingStepName;
       this.inputIndex = inputIndex;
       this.profileScope = Preconditions.checkNotNull(profileScope);
       this.metricsContainer = metricsContainer;
-      this.clock = clock;
     }
 
     /**
@@ -251,9 +225,6 @@ public class DataflowOperationContext implements 
OperationContext {
       return description.toString();
     }
 
-    private static final ImmutableSet<String> FRAMEWORK_CLASSES =
-        ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
-
     protected String getLullMessage(Thread trackedThread, Duration 
lullDuration) {
       StringBuilder message = new StringBuilder();
       message.append("Operation ongoing");
@@ -272,7 +243,7 @@ public class DataflowOperationContext implements 
OperationContext {
 
       message.append("\n");
 
-      
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
+      
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
       return message.toString();
     }
 
@@ -296,55 +267,6 @@ public class DataflowOperationContext implements 
OperationContext {
       DataflowWorkerLoggingHandler dataflowLoggingHandler =
           DataflowWorkerLoggingInitializer.getLoggingHandler();
       dataflowLoggingHandler.publish(this, logRecord);
-
-      if (shouldLogFullThreadDump(lullDuration)) {
-        Map<Thread, StackTraceElement[]> threadSet = 
Thread.getAllStackTraces();
-        for (Map.Entry<Thread, StackTraceElement[]> entry : 
threadSet.entrySet()) {
-          Thread thread = entry.getKey();
-          StackTraceElement[] stackTrace = entry.getValue();
-          StringBuilder message = new StringBuilder();
-          message.append(thread.toString()).append(":\n");
-          message.append(getStackTraceForLullMessage(stackTrace));
-          logRecord = new LogRecord(Level.INFO, message.toString());
-          logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
-          dataflowLoggingHandler.publish(this, logRecord);
-        }
-      }
-    }
-
-    /**
-     * The time interval between two full thread dump. (A full thread dump is 
performed at most once
-     * every 20 minutes.)
-     */
-    private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60 
* 1000;
-
-    /** The minimum lull duration to perform a full thread dump. */
-    private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 
1000;
-
-    /** Last time when a full thread dump was performed. */
-    private long lastFullThreadDumpMillis = 0;
-
-    private boolean shouldLogFullThreadDump(Duration lullDuration) {
-      if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
-        return false;
-      }
-      long now = clock.currentTimeMillis();
-      if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS < 
now) {
-        lastFullThreadDumpMillis = now;
-        return true;
-      }
-      return false;
-    }
-
-    private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) 
{
-      StringBuilder message = new StringBuilder();
-      for (StackTraceElement e : stackTrace) {
-        if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
-          break;
-        }
-        message.append("  at ").append(e).append("\n");
-      }
-      return message.toString();
     }
 
     public @Nullable MetricsContainer getMetricsContainer() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
new file mode 100644
index 00000000000..041944f09cf
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods to print the stack traces of all the threads. */
+@Internal
+public final class StackTraceUtil {
+  private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+      ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
+  private static final Logger LOG = 
LoggerFactory.getLogger(StackTraceUtil.class);
+
+  public static String getStackTraceForLullMessage(StackTraceElement[] 
stackTrace) {
+    StringBuilder message = new StringBuilder();
+    for (StackTraceElement e : stackTrace) {
+      if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+        break;
+      }
+      message.append("  at ").append(e).append("\n");
+    }
+    return message.toString();
+  }
+
+  public static void logAllStackTraces() {
+    DataflowWorkerLoggingHandler dataflowLoggingHandler =
+        DataflowWorkerLoggingInitializer.getLoggingHandler();
+    Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
+    for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
+      Thread thread = entry.getKey();
+      StackTraceElement[] stackTrace = entry.getValue();
+      StringBuilder message = new StringBuilder();
+      message.append(thread.toString()).append(":\n");
+      message.append(getStackTraceForLullMessage(stackTrace));
+      LogRecord logRecord = new LogRecord(Level.INFO, message.toString());
+      logRecord.setLoggerName(StackTraceUtil.LOG.getName());
+      dataflowLoggingHandler.publish(logRecord);
+    }
+  }
+
+  private StackTraceUtil() {}
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 551937c3559..3502b621147 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -22,8 +22,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
 
+import com.google.api.client.testing.http.FixedClock;
+import com.google.api.client.util.Clock;
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -35,28 +41,55 @@ import 
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
 import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /** Tests for {@link DataflowExecutionStateTrackerTest}. */
 public class DataflowExecutionStateTrackerTest {
 
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Rule public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+  private File logFolder;
   private PipelineOptions options;
   private MillisProvider clock;
   private ExecutionStateSampler sampler;
   private CounterSet counterSet;
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     options = PipelineOptionsFactory.create();
     clock = mock(MillisProvider.class);
     sampler = ExecutionStateSampler.newForTest(clock);
     counterSet = new CounterSet();
+    logFolder = tempFolder.newFolder();
+    System.setProperty(
+        DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY,
+        new File(logFolder, "dataflow-json.log").getAbsolutePath());
+    // We need to reset *first* because some other test may have already 
initialized the
+    // logging initializer.
+    DataflowWorkerLoggingInitializer.reset();
+    DataflowWorkerLoggingInitializer.initialize();
+  }
+
+  @After
+  public void tearDown() {
+    DataflowWorkerLoggingInitializer.reset();
   }
 
   private final NameContext step1 =
@@ -67,7 +100,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testReportsElementExecutionTime() throws IOException {
     enableTimePerElementExperiment();
-    ExecutionStateTracker tracker = createTracker();
+    DataflowExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {}
@@ -84,7 +117,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testTakesSampleOnDeactivate() throws IOException {
     enableTimePerElementExperiment();
-    ExecutionStateTracker tracker = createTracker();
+    DataflowExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {
@@ -123,7 +156,7 @@ public class DataflowExecutionStateTrackerTest {
                 .build()));
   }
 
-  private ExecutionStateTracker createTracker() {
+  private DataflowExecutionStateTracker createTracker() {
     return new DataflowExecutionStateTracker(
         sampler,
         new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
@@ -131,4 +164,103 @@ public class DataflowExecutionStateTrackerTest {
         options,
         "test-work-item-id");
   }
+
+  @Test
+  public void testLullReportsRightTrace() throws Exception {
+    FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+    DataflowExecutionStateTracker tracker = createTracker(clock);
+    // Adding test for the full thread dump, but since we can't mock
+    // Thread.getAllStackTraces(), we are starting a background thread
+    // to verify the full thread dump.
+    Thread backgroundThread =
+        new Thread("backgroundThread") {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+              // exiting the thread
+            }
+          }
+        };
+
+    backgroundThread.start();
+    try {
+      // Full thread dump should be performed, because we never performed
+      // a full thread dump before, and the lull duration is more than 20
+      // minutes.
+      tracker.reportBundleLull(30 * 60 * 1000);
+      verifyLullLog(true);
+
+      // Full thread dump should not be performed because the last dump
+      // was only 5 minutes ago.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+      tracker.reportBundleLull(30 * 60 * 1000);
+      verifyLullLog(false);
+
+      // Full thread dump should not be performed because the lull duration
+      // is only 6 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(6 * 60 * 1000);
+      verifyLullLog(false);
+
+      // Full thread dump should be performed, because it has been 21 minutes
+      // since the last dump, and the lull duration is more than 20 minutes.
+      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+      tracker.reportBundleLull(30 * 60 * 1000);
+      // verifyLullLog(true);
+    } finally {
+      // Cleaning up the background thread.
+      backgroundThread.interrupt();
+      backgroundThread.join();
+    }
+  }
+
+  private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
+    File[] files = logFolder.listFiles();
+    assertThat(files, Matchers.arrayWithSize(1));
+    File logFile = files[0];
+    List<String> lines = Files.readAllLines(logFile.toPath());
+
+    String warnLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"WARN\"")));
+    assertThat(
+        warnLines,
+        Matchers.allOf(
+            Matchers.containsString("Operation ongoing in bundle for at 
least"),
+            Matchers.containsString(" without completing"),
+            Matchers.containsString("Processing times in each step"),
+            Matchers.containsString(
+                
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker")));
+
+    String infoLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
+    if (hasFullThreadDump) {
+      assertThat(
+          infoLines,
+          Matchers.allOf(
+              Matchers.containsString("Thread[backgroundThread,"),
+              
Matchers.containsString("org.apache.beam.runners.dataflow.worker.StackTraceUtil")));
+    } else {
+      assertThat(
+          infoLines,
+          Matchers.not(
+              Matchers.anyOf(
+                  Matchers.containsString("Thread[backgroundThread,"),
+                  Matchers.containsString(
+                      
"org.apache.beam.runners.dataflow.worker.StackTraceUtil"))));
+    }
+    // Truncate the file when done to prepare for the next test.
+    new FileOutputStream(logFile, false).getChannel().truncate(0).close();
+  }
+
+  private DataflowExecutionStateTracker createTracker(Clock clock) {
+    return new DataflowExecutionStateTracker(
+        sampler,
+        new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
+        counterSet,
+        options,
+        "test-work-item-id",
+        clock);
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
index 34c3b3d5373..88a9f8e11c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
@@ -29,8 +29,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.io.Closeable;
 import java.io.File;
@@ -206,7 +204,6 @@ public class DataflowOperationContextTest {
     @Test
     public void testLullReportsRightTrace() throws Exception {
       Thread mockThread = mock(Thread.class);
-      FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
 
       DataflowExecutionState executionState =
           new DataflowExecutionState(
@@ -215,8 +212,7 @@ public class DataflowOperationContextTest {
               null /* requestingStepName */,
               null /* inputIndex */,
               null /* metricsContainer */,
-              ScopedProfiler.INSTANCE.emptyScope(),
-              clock) {
+              ScopedProfiler.INSTANCE.emptyScope()) {
             @Override
             public @Nullable CounterUpdate extractUpdate(boolean 
isFinalUpdate) {
               // not being used for extracting updates
@@ -238,55 +234,11 @@ public class DataflowOperationContextTest {
                 SimpleDoFnRunner.class.getName(), "processElement", 
"SimpleDoFnRunner.java", 500),
           };
       when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
-
-      // Adding test for the full thread dump, but since we can't mock
-      // Thread.getAllStackTraces(), we are starting a background thread
-      // to verify the full thread dump.
-      Thread backgroundThread =
-          new Thread("backgroundThread") {
-            @Override
-            public void run() {
-              try {
-                Thread.sleep(Long.MAX_VALUE);
-              } catch (InterruptedException e) {
-                // exiting the thread
-              }
-            }
-          };
-
-      backgroundThread.start();
-      try {
-        // Full thread dump should be performed, because we never performed
-        // a full thread dump before, and the lull duration is more than 20
-        // minutes.
-        executionState.reportLull(mockThread, 30 * 60 * 1000);
-        verifyLullLog(true);
-
-        // Full thread dump should not be performed because the last dump
-        // was only 5 minutes ago.
-        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
-        executionState.reportLull(mockThread, 30 * 60 * 1000);
-        verifyLullLog(false);
-
-        // Full thread dump should not be performed because the lull duration
-        // is only 6 minutes.
-        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
-        executionState.reportLull(mockThread, 6 * 60 * 1000);
-        verifyLullLog(false);
-
-        // Full thread dump should be performed, because it has been 21 minutes
-        // since the last dump, and the lull duration is more than 20 minutes.
-        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
-        executionState.reportLull(mockThread, 30 * 60 * 1000);
-        verifyLullLog(true);
-      } finally {
-        // Cleaning up the background thread.
-        backgroundThread.interrupt();
-        backgroundThread.join();
-      }
+      executionState.reportLull(mockThread, 30 * 60 * 1000);
+      verifyLullLog();
     }
 
-    private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
+    private void verifyLullLog() throws IOException {
       File[] files = logFolder.listFiles();
       assertThat(files, Matchers.arrayWithSize(1));
       File logFile = files[0];
@@ -305,23 +257,13 @@ public class DataflowOperationContextTest {
 
       String infoLines =
           Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
-      if (hasFullThreadDump) {
-        assertThat(
-            infoLines,
-            Matchers.allOf(
-                Matchers.containsString("Thread[backgroundThread,"),
-                Matchers.containsString(
-                    
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"),
-                
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
-      } else {
-        assertThat(
-            infoLines,
-            Matchers.not(
-                Matchers.anyOf(
-                    Matchers.containsString("Thread[backgroundThread,"),
-                    Matchers.containsString(
-                        
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
-      }
+      assertThat(
+          infoLines,
+          Matchers.not(
+              Matchers.anyOf(
+                  Matchers.containsString("Thread[backgroundThread,"),
+                  Matchers.containsString(
+                      
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
       // Truncate the file when done to prepare for the next test.
       new FileOutputStream(logFile, false).getChannel().truncate(0).close();
     }


Reply via email to