This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ffe2dba5320 Implementing lull reporting at bundle level processing
(#29882)
ffe2dba5320 is described below
commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf
Author: Arvind Ram <[email protected]>
AuthorDate: Mon Feb 26 12:23:45 2024 -0800
Implementing lull reporting at bundle level processing (#29882)
---
.../core/metrics/ExecutionStateTracker.java | 25 ++++
.../dataflow/worker/DataflowExecutionContext.java | 117 ++++++++++++++++-
.../dataflow/worker/DataflowOperationContext.java | 80 +-----------
.../runners/dataflow/worker/StackTraceUtil.java | 66 ++++++++++
.../worker/DataflowExecutionStateTrackerTest.java | 140 ++++++++++++++++++++-
.../worker/DataflowOperationContextTest.java | 80 ++----------
6 files changed, 353 insertions(+), 155 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index dc6fd2f8248..b0b8f0107f3 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,6 +46,7 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
new ConcurrentHashMap<>();
private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+ private static final long BUNDLE_LULL_REPORT_MS =
TimeUnit.MINUTES.toMillis(10);
private static final AtomicIntegerFieldUpdater<ExecutionStateTracker>
SAMPLING_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class,
"sampling");
@@ -139,8 +140,17 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
*/
private volatile long millisSinceLastTransition = 0;
+ /**
+ * The number of milliseconds since the {@link ExecutionStateTracker}
initial state.
+ *
+ * <p>This variable is updated by the Sampling thread, and read by the
Progress Reporting thread,
+ * thus it being marked volatile.
+ */
+ private volatile long millisSinceBundleStart = 0;
+
private long transitionsAtLastSample = 0;
private long nextLullReportMs = LULL_REPORT_MS;
+ private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
@@ -155,8 +165,10 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
currentState = null;
numTransitions = 0;
millisSinceLastTransition = 0;
+ millisSinceBundleStart = 0;
transitionsAtLastSample = 0;
nextLullReportMs = LULL_REPORT_MS;
+ nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
}
@VisibleForTesting
@@ -335,6 +347,19 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
+ updateMillisSinceBundleStart(millisSinceLastSample);
+ }
+
+ // Override this to implement bundle level lull reporting.
+ protected void reportBundleLull(long millisSinceBundleStart) {}
+
+ @SuppressWarnings("NonAtomicVolatileUpdate")
+ private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+ millisSinceBundleStart += millisSinceLastSample;
+ if (millisSinceBundleStart > nextBundleLullReportMs) {
+ reportBundleLull(millisSinceBundleStart);
+ nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
+ }
}
@SuppressWarnings("NonAtomicVolatileUpdate")
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 080fa7c9dac..16ff2975b02 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
@@ -29,6 +30,8 @@ import java.util.IntSummaryStatistics;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
@@ -37,10 +40,13 @@ import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -48,11 +54,16 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Execution context for the Dataflow worker. */
@SuppressWarnings({
@@ -260,23 +271,59 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- private final MillisProvider clock = System::currentTimeMillis;
+ /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
+ private final Clock clock;
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
+ /** Last milliseconds since epoch when a full thread dump was performed. */
+ private long lastFullThreadDumpMillis = 0;
+
+ /** The minimum lull duration in milliseconds to perform a full thread
dump. */
+ private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 *
60 * 1000;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
+
+ private static final PeriodFormatter DURATION_FORMATTER =
+ new PeriodFormatterBuilder()
+ .appendDays()
+ .appendSuffix("d")
+ .minimumPrintedDigits(2)
+ .appendHours()
+ .appendSuffix("h")
+ .printZeroAlways()
+ .appendMinutes()
+ .appendSuffix("m")
+ .appendSeconds()
+ .appendSuffix("s")
+ .toFormatter();
+
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
+ this(sampler, otherState, counterFactory, options, workItemId,
Clock.SYSTEM);
+ }
+
+ @VisibleForTesting
+ public DataflowExecutionStateTracker(
+ ExecutionStateSampler sampler,
+ DataflowOperationContext.DataflowExecutionState otherState,
+ CounterFactory counterFactory,
+ PipelineOptions options,
+ String workItemId,
+ Clock clock) {
super(sampler);
this.elementExecutionTracker =
DataflowElementExecutionTracker.create(counterFactory, options);
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry =
ContextActivationObserverRegistry.createDefault();
+ this.clock = clock;
+ DataflowWorkerLoggingInitializer.initialize();
}
@Override
@@ -301,12 +348,76 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
}
}
+ private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
+ if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS)
{
+ return false;
+ }
+ long now = clock.currentTimeMillis();
+ if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS
< now) {
+ lastFullThreadDumpMillis = now;
+ return true;
+ }
+ return false;
+ }
+
+ private String getBundleLullMessage(Duration lullDuration) {
+ StringBuilder message = new StringBuilder();
+ message
+ .append("Operation ongoing in bundle for at least ")
+ .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
+ .append(" without completing")
+ .append("\n");
+ synchronized (this) {
+ if (this.activeMessageMetadata != null) {
+ message.append(
+ "Current user step name: " +
getActiveMessageMetadata().get().userStepName() + "\n");
+ message.append(
+ "Time spent in this step(millis): "
+ + (clock.currentTimeMillis() -
getActiveMessageMetadata().get().startTime())
+ + "\n");
+ }
+ message.append("Processing times in each step(millis)\n");
+ for (Map.Entry<String, IntSummaryStatistics> entry :
+ this.processingTimesByStep.entrySet()) {
+ message.append("Step name: " + entry.getKey() + "\n");
+ message.append("Time spent in this step: " +
entry.getValue().toString() + "\n");
+ }
+ }
+
+ return message.toString();
+ }
+
@Override
protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSampleOnce(millisSinceLastSample);
}
+ @Override
+ protected void reportBundleLull(long millisElapsedSinceBundleStart) {
+ // If we're not logging warnings, nothing to report.
+ if (!LOG.isWarnEnabled()) {
+ return;
+ }
+
+ Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
+
+ // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
+ // context of the current step. To ensure things are logged correctly,
we get the currently
+ // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
+ LogRecord logRecord = new LogRecord(Level.WARNING,
getBundleLullMessage(lullDuration));
+ logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
+
+ // Publish directly in the context of this specific ExecutionState.
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ dataflowLoggingHandler.publish(logRecord);
+
+ if (shouldLogFullThreadDumpForBundle(lullDuration)) {
+ StackTraceUtil.logAllStackTraces();
+ }
+ }
+
/**
* Enter a new state on the tracker. If the new state is a Dataflow
processing state, tracks the
* activeMessageMetadata with the start time of the new state.
@@ -323,7 +434,7 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
- newDFState.getStepName().userName(), clock.getMillis());
+ newDFState.getStepName().userName(),
clock.currentTimeMillis());
}
}
elementExecutionTracker.enter(newDFState.getStepName());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index b2ab928bc99..7d90a512519 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -19,16 +19,13 @@ package org.apache.beam.runners.dataflow.worker;
import static
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
-import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.io.Closeable;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.LogRecord;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -42,7 +39,6 @@ import
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte
import org.apache.beam.sdk.metrics.MetricsContainer;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormatter;
@@ -185,9 +181,6 @@ public class DataflowOperationContext implements
OperationContext {
private final ProfileScope profileScope;
private final @Nullable MetricsContainer metricsContainer;
- /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
- private final Clock clock;
-
public DataflowExecutionState(
NameContext nameContext,
String stateName,
@@ -195,31 +188,12 @@ public class DataflowOperationContext implements
OperationContext {
@Nullable Integer inputIndex,
@Nullable MetricsContainer metricsContainer,
ProfileScope profileScope) {
- this(
- nameContext,
- stateName,
- requestingStepName,
- inputIndex,
- metricsContainer,
- profileScope,
- Clock.SYSTEM);
- }
-
- public DataflowExecutionState(
- NameContext nameContext,
- String stateName,
- @Nullable String requestingStepName,
- @Nullable Integer inputIndex,
- @Nullable MetricsContainer metricsContainer,
- ProfileScope profileScope,
- Clock clock) {
super(stateName);
this.stepName = nameContext;
this.requestingStepName = requestingStepName;
this.inputIndex = inputIndex;
this.profileScope = Preconditions.checkNotNull(profileScope);
this.metricsContainer = metricsContainer;
- this.clock = clock;
}
/**
@@ -251,9 +225,6 @@ public class DataflowOperationContext implements
OperationContext {
return description.toString();
}
- private static final ImmutableSet<String> FRAMEWORK_CLASSES =
- ImmutableSet.of(SimpleDoFnRunner.class.getName(),
DoFnInstanceManagers.class.getName());
-
protected String getLullMessage(Thread trackedThread, Duration
lullDuration) {
StringBuilder message = new StringBuilder();
message.append("Operation ongoing");
@@ -272,7 +243,7 @@ public class DataflowOperationContext implements
OperationContext {
message.append("\n");
-
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
+
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
return message.toString();
}
@@ -296,55 +267,6 @@ public class DataflowOperationContext implements
OperationContext {
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(this, logRecord);
-
- if (shouldLogFullThreadDump(lullDuration)) {
- Map<Thread, StackTraceElement[]> threadSet =
Thread.getAllStackTraces();
- for (Map.Entry<Thread, StackTraceElement[]> entry :
threadSet.entrySet()) {
- Thread thread = entry.getKey();
- StackTraceElement[] stackTrace = entry.getValue();
- StringBuilder message = new StringBuilder();
- message.append(thread.toString()).append(":\n");
- message.append(getStackTraceForLullMessage(stackTrace));
- logRecord = new LogRecord(Level.INFO, message.toString());
- logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
- dataflowLoggingHandler.publish(this, logRecord);
- }
- }
- }
-
- /**
- * The time interval between two full thread dump. (A full thread dump is
performed at most once
- * every 20 minutes.)
- */
- private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60
* 1000;
-
- /** The minimum lull duration to perform a full thread dump. */
- private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 *
1000;
-
- /** Last time when a full thread dump was performed. */
- private long lastFullThreadDumpMillis = 0;
-
- private boolean shouldLogFullThreadDump(Duration lullDuration) {
- if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
- return false;
- }
- long now = clock.currentTimeMillis();
- if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS <
now) {
- lastFullThreadDumpMillis = now;
- return true;
- }
- return false;
- }
-
- private String getStackTraceForLullMessage(StackTraceElement[] stackTrace)
{
- StringBuilder message = new StringBuilder();
- for (StackTraceElement e : stackTrace) {
- if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
- break;
- }
- message.append(" at ").append(e).append("\n");
- }
- return message.toString();
}
public @Nullable MetricsContainer getMetricsContainer() {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
new file mode 100644
index 00000000000..041944f09cf
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods to print the stack traces of all the threads. */
+@Internal
+public final class StackTraceUtil {
+ private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+ ImmutableSet.of(SimpleDoFnRunner.class.getName(),
DoFnInstanceManagers.class.getName());
+ private static final Logger LOG =
LoggerFactory.getLogger(StackTraceUtil.class);
+
+ public static String getStackTraceForLullMessage(StackTraceElement[]
stackTrace) {
+ StringBuilder message = new StringBuilder();
+ for (StackTraceElement e : stackTrace) {
+ if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+ break;
+ }
+ message.append(" at ").append(e).append("\n");
+ }
+ return message.toString();
+ }
+
+ public static void logAllStackTraces() {
+ DataflowWorkerLoggingHandler dataflowLoggingHandler =
+ DataflowWorkerLoggingInitializer.getLoggingHandler();
+ Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
+ Thread thread = entry.getKey();
+ StackTraceElement[] stackTrace = entry.getValue();
+ StringBuilder message = new StringBuilder();
+ message.append(thread.toString()).append(":\n");
+ message.append(getStackTraceForLullMessage(stackTrace));
+ LogRecord logRecord = new LogRecord(Level.INFO, message.toString());
+ logRecord.setLoggerName(StackTraceUtil.LOG.getName());
+ dataflowLoggingHandler.publish(logRecord);
+ }
+ }
+
+ private StackTraceUtil() {}
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 551937c3559..3502b621147 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -22,8 +22,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
+import com.google.api.client.testing.http.FixedClock;
+import com.google.api.client.util.Clock;
import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -35,28 +41,55 @@ import
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/** Tests for {@link DataflowExecutionStateTrackerTest}. */
public class DataflowExecutionStateTrackerTest {
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Rule public RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+
+ private File logFolder;
private PipelineOptions options;
private MillisProvider clock;
private ExecutionStateSampler sampler;
private CounterSet counterSet;
@Before
- public void setUp() {
+ public void setUp() throws IOException {
options = PipelineOptionsFactory.create();
clock = mock(MillisProvider.class);
sampler = ExecutionStateSampler.newForTest(clock);
counterSet = new CounterSet();
+ logFolder = tempFolder.newFolder();
+ System.setProperty(
+ DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY,
+ new File(logFolder, "dataflow-json.log").getAbsolutePath());
+ // We need to reset *first* because some other test may have already
initialized the
+ // logging initializer.
+ DataflowWorkerLoggingInitializer.reset();
+ DataflowWorkerLoggingInitializer.initialize();
+ }
+
+ @After
+ public void tearDown() {
+ DataflowWorkerLoggingInitializer.reset();
}
private final NameContext step1 =
@@ -67,7 +100,7 @@ public class DataflowExecutionStateTrackerTest {
@Test
public void testReportsElementExecutionTime() throws IOException {
enableTimePerElementExperiment();
- ExecutionStateTracker tracker = createTracker();
+ DataflowExecutionStateTracker tracker = createTracker();
try (Closeable c1 = tracker.activate(new Thread())) {
try (Closeable c2 = tracker.enterState(step1Process)) {}
@@ -84,7 +117,7 @@ public class DataflowExecutionStateTrackerTest {
@Test
public void testTakesSampleOnDeactivate() throws IOException {
enableTimePerElementExperiment();
- ExecutionStateTracker tracker = createTracker();
+ DataflowExecutionStateTracker tracker = createTracker();
try (Closeable c1 = tracker.activate(new Thread())) {
try (Closeable c2 = tracker.enterState(step1Process)) {
@@ -123,7 +156,7 @@ public class DataflowExecutionStateTrackerTest {
.build()));
}
- private ExecutionStateTracker createTracker() {
+ private DataflowExecutionStateTracker createTracker() {
return new DataflowExecutionStateTracker(
sampler,
new TestDataflowExecutionState(NameContext.forStage("test-stage"),
"other"),
@@ -131,4 +164,103 @@ public class DataflowExecutionStateTrackerTest {
options,
"test-work-item-id");
}
+
+ @Test
+ public void testLullReportsRightTrace() throws Exception {
+ FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+ DataflowExecutionStateTracker tracker = createTracker(clock);
+ // Adding test for the full thread dump, but since we can't mock
+ // Thread.getAllStackTraces(), we are starting a background thread
+ // to verify the full thread dump.
+ Thread backgroundThread =
+ new Thread("backgroundThread") {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // exiting the thread
+ }
+ }
+ };
+
+ backgroundThread.start();
+ try {
+ // Full thread dump should be performed, because we never performed
+ // a full thread dump before, and the lull duration is more than 20
+ // minutes.
+ tracker.reportBundleLull(30 * 60 * 1000);
+ verifyLullLog(true);
+
+ // Full thread dump should not be performed because the last dump
+ // was only 5 minutes ago.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
+ tracker.reportBundleLull(30 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should not be performed because the lull duration
+ // is only 6 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ tracker.reportBundleLull(6 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should be performed, because it has been 21 minutes
+ // since the last dump, and the lull duration is more than 20 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ tracker.reportBundleLull(30 * 60 * 1000);
+ // verifyLullLog(true);
+ } finally {
+ // Cleaning up the background thread.
+ backgroundThread.interrupt();
+ backgroundThread.join();
+ }
+ }
+
+ private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
+ File[] files = logFolder.listFiles();
+ assertThat(files, Matchers.arrayWithSize(1));
+ File logFile = files[0];
+ List<String> lines = Files.readAllLines(logFile.toPath());
+
+ String warnLines =
+ Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"WARN\"")));
+ assertThat(
+ warnLines,
+ Matchers.allOf(
+ Matchers.containsString("Operation ongoing in bundle for at
least"),
+ Matchers.containsString(" without completing"),
+ Matchers.containsString("Processing times in each step"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker")));
+
+ String infoLines =
+ Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"INFO\"")));
+ if (hasFullThreadDump) {
+ assertThat(
+ infoLines,
+ Matchers.allOf(
+ Matchers.containsString("Thread[backgroundThread,"),
+
Matchers.containsString("org.apache.beam.runners.dataflow.worker.StackTraceUtil")));
+ } else {
+ assertThat(
+ infoLines,
+ Matchers.not(
+ Matchers.anyOf(
+ Matchers.containsString("Thread[backgroundThread,"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.StackTraceUtil"))));
+ }
+ // Truncate the file when done to prepare for the next test.
+ new FileOutputStream(logFile, false).getChannel().truncate(0).close();
+ }
+
+ private DataflowExecutionStateTracker createTracker(Clock clock) {
+ return new DataflowExecutionStateTracker(
+ sampler,
+ new TestDataflowExecutionState(NameContext.forStage("test-stage"),
"other"),
+ counterSet,
+ options,
+ "test-work-item-id",
+ clock);
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
index 34c3b3d5373..88a9f8e11c7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
@@ -29,8 +29,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.io.Closeable;
import java.io.File;
@@ -206,7 +204,6 @@ public class DataflowOperationContextTest {
@Test
public void testLullReportsRightTrace() throws Exception {
Thread mockThread = mock(Thread.class);
- FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
DataflowExecutionState executionState =
new DataflowExecutionState(
@@ -215,8 +212,7 @@ public class DataflowOperationContextTest {
null /* requestingStepName */,
null /* inputIndex */,
null /* metricsContainer */,
- ScopedProfiler.INSTANCE.emptyScope(),
- clock) {
+ ScopedProfiler.INSTANCE.emptyScope()) {
@Override
public @Nullable CounterUpdate extractUpdate(boolean
isFinalUpdate) {
// not being used for extracting updates
@@ -238,55 +234,11 @@ public class DataflowOperationContextTest {
SimpleDoFnRunner.class.getName(), "processElement",
"SimpleDoFnRunner.java", 500),
};
when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
-
- // Adding test for the full thread dump, but since we can't mock
- // Thread.getAllStackTraces(), we are starting a background thread
- // to verify the full thread dump.
- Thread backgroundThread =
- new Thread("backgroundThread") {
- @Override
- public void run() {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- // exiting the thread
- }
- }
- };
-
- backgroundThread.start();
- try {
- // Full thread dump should be performed, because we never performed
- // a full thread dump before, and the lull duration is more than 20
- // minutes.
- executionState.reportLull(mockThread, 30 * 60 * 1000);
- verifyLullLog(true);
-
- // Full thread dump should not be performed because the last dump
- // was only 5 minutes ago.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
- executionState.reportLull(mockThread, 30 * 60 * 1000);
- verifyLullLog(false);
-
- // Full thread dump should not be performed because the lull duration
- // is only 6 minutes.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
- executionState.reportLull(mockThread, 6 * 60 * 1000);
- verifyLullLog(false);
-
- // Full thread dump should be performed, because it has been 21 minutes
- // since the last dump, and the lull duration is more than 20 minutes.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
- executionState.reportLull(mockThread, 30 * 60 * 1000);
- verifyLullLog(true);
- } finally {
- // Cleaning up the background thread.
- backgroundThread.interrupt();
- backgroundThread.join();
- }
+ executionState.reportLull(mockThread, 30 * 60 * 1000);
+ verifyLullLog();
}
- private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
+ private void verifyLullLog() throws IOException {
File[] files = logFolder.listFiles();
assertThat(files, Matchers.arrayWithSize(1));
File logFile = files[0];
@@ -305,23 +257,13 @@ public class DataflowOperationContextTest {
String infoLines =
Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"INFO\"")));
- if (hasFullThreadDump) {
- assertThat(
- infoLines,
- Matchers.allOf(
- Matchers.containsString("Thread[backgroundThread,"),
- Matchers.containsString(
-
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"),
-
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
- } else {
- assertThat(
- infoLines,
- Matchers.not(
- Matchers.anyOf(
- Matchers.containsString("Thread[backgroundThread,"),
- Matchers.containsString(
-
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
- }
+ assertThat(
+ infoLines,
+ Matchers.not(
+ Matchers.anyOf(
+ Matchers.containsString("Thread[backgroundThread,"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
// Truncate the file when done to prepare for the next test.
new FileOutputStream(logFile, false).getChannel().truncate(0).close();
}