This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 50c59912bc0 Revert "Implementing lull reporting at bundle level
processing (#29882)" (#30648)
50c59912bc0 is described below
commit 50c59912bc002947c335170b42827b278b78aae1
Author: Arvind Ram <[email protected]>
AuthorDate: Mon Mar 18 03:31:49 2024 -0700
Revert "Implementing lull reporting at bundle level processing (#29882)"
(#30648)
This reverts commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf.
---
.../core/metrics/ExecutionStateTracker.java | 25 ----
.../dataflow/worker/DataflowExecutionContext.java | 117 +----------------
.../dataflow/worker/DataflowOperationContext.java | 80 +++++++++++-
.../runners/dataflow/worker/StackTraceUtil.java | 66 ----------
.../worker/DataflowExecutionStateTrackerTest.java | 140 +--------------------
.../worker/DataflowOperationContextTest.java | 80 ++++++++++--
6 files changed, 155 insertions(+), 353 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index b0b8f0107f3..dc6fd2f8248 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,7 +46,6 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
new ConcurrentHashMap<>();
private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
- private static final long BUNDLE_LULL_REPORT_MS =
TimeUnit.MINUTES.toMillis(10);
private static final AtomicIntegerFieldUpdater<ExecutionStateTracker>
SAMPLING_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class,
"sampling");
@@ -140,17 +139,8 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
*/
private volatile long millisSinceLastTransition = 0;
- /**
- * The number of milliseconds since the {@link ExecutionStateTracker}
initial state.
- *
- * <p>This variable is updated by the Sampling thread, and read by the
Progress Reporting thread,
- * thus it being marked volatile.
- */
- private volatile long millisSinceBundleStart = 0;
-
private long transitionsAtLastSample = 0;
private long nextLullReportMs = LULL_REPORT_MS;
- private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
@@ -165,10 +155,8 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
currentState = null;
numTransitions = 0;
millisSinceLastTransition = 0;
- millisSinceBundleStart = 0;
transitionsAtLastSample = 0;
nextLullReportMs = LULL_REPORT_MS;
- nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
}
@VisibleForTesting
@@ -347,19 +335,6 @@ public class ExecutionStateTracker implements
Comparable<ExecutionStateTracker>
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
- updateMillisSinceBundleStart(millisSinceLastSample);
- }
-
- // Override this to implement bundle level lull reporting.
- protected void reportBundleLull(long millisSinceBundleStart) {}
-
- @SuppressWarnings("NonAtomicVolatileUpdate")
- private void updateMillisSinceBundleStart(long millisSinceLastSample) {
- millisSinceBundleStart += millisSinceLastSample;
- if (millisSinceBundleStart > nextBundleLullReportMs) {
- reportBundleLull(millisSinceBundleStart);
- nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
- }
}
@SuppressWarnings("NonAtomicVolatileUpdate")
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 16ff2975b02..080fa7c9dac 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
@@ -30,8 +29,6 @@ import java.util.IntSummaryStatistics;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
@@ -40,13 +37,10 @@ import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
-import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -54,16 +48,11 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
+import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Execution context for the Dataflow worker. */
@SuppressWarnings({
@@ -271,59 +260,23 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
- private final Clock clock;
+ private final MillisProvider clock = System::currentTimeMillis;
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
- /** Last milliseconds since epoch when a full thread dump was performed. */
- private long lastFullThreadDumpMillis = 0;
-
- /** The minimum lull duration in milliseconds to perform a full thread
dump. */
- private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 *
60 * 1000;
-
- private static final Logger LOG =
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
-
- private static final PeriodFormatter DURATION_FORMATTER =
- new PeriodFormatterBuilder()
- .appendDays()
- .appendSuffix("d")
- .minimumPrintedDigits(2)
- .appendHours()
- .appendSuffix("h")
- .printZeroAlways()
- .appendMinutes()
- .appendSuffix("m")
- .appendSeconds()
- .appendSuffix("s")
- .toFormatter();
-
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
- this(sampler, otherState, counterFactory, options, workItemId,
Clock.SYSTEM);
- }
-
- @VisibleForTesting
- public DataflowExecutionStateTracker(
- ExecutionStateSampler sampler,
- DataflowOperationContext.DataflowExecutionState otherState,
- CounterFactory counterFactory,
- PipelineOptions options,
- String workItemId,
- Clock clock) {
super(sampler);
this.elementExecutionTracker =
DataflowElementExecutionTracker.create(counterFactory, options);
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry =
ContextActivationObserverRegistry.createDefault();
- this.clock = clock;
- DataflowWorkerLoggingInitializer.initialize();
}
@Override
@@ -348,76 +301,12 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
}
}
- private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
- if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS)
{
- return false;
- }
- long now = clock.currentTimeMillis();
- if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS
< now) {
- lastFullThreadDumpMillis = now;
- return true;
- }
- return false;
- }
-
- private String getBundleLullMessage(Duration lullDuration) {
- StringBuilder message = new StringBuilder();
- message
- .append("Operation ongoing in bundle for at least ")
- .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
- .append(" without completing")
- .append("\n");
- synchronized (this) {
- if (this.activeMessageMetadata != null) {
- message.append(
- "Current user step name: " +
getActiveMessageMetadata().get().userStepName() + "\n");
- message.append(
- "Time spent in this step(millis): "
- + (clock.currentTimeMillis() -
getActiveMessageMetadata().get().startTime())
- + "\n");
- }
- message.append("Processing times in each step(millis)\n");
- for (Map.Entry<String, IntSummaryStatistics> entry :
- this.processingTimesByStep.entrySet()) {
- message.append("Step name: " + entry.getKey() + "\n");
- message.append("Time spent in this step: " +
entry.getValue().toString() + "\n");
- }
- }
-
- return message.toString();
- }
-
@Override
protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSampleOnce(millisSinceLastSample);
}
- @Override
- protected void reportBundleLull(long millisElapsedSinceBundleStart) {
- // If we're not logging warnings, nothing to report.
- if (!LOG.isWarnEnabled()) {
- return;
- }
-
- Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
-
- // Since the lull reporting executes in the sampler thread, it won't
automatically inherit the
- // context of the current step. To ensure things are logged correctly,
we get the currently
- // registered DataflowWorkerLoggingHandler and log directly in the
desired context.
- LogRecord logRecord = new LogRecord(Level.WARNING,
getBundleLullMessage(lullDuration));
- logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
-
- // Publish directly in the context of this specific ExecutionState.
- DataflowWorkerLoggingHandler dataflowLoggingHandler =
- DataflowWorkerLoggingInitializer.getLoggingHandler();
- dataflowLoggingHandler.publish(logRecord);
-
- if (shouldLogFullThreadDumpForBundle(lullDuration)) {
- StackTraceUtil.logAllStackTraces();
- }
- }
-
/**
* Enter a new state on the tracker. If the new state is a Dataflow
processing state, tracks the
* activeMessageMetadata with the start time of the new state.
@@ -434,7 +323,7 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
- newDFState.getStepName().userName(),
clock.currentTimeMillis());
+ newDFState.getStepName().userName(), clock.getMillis());
}
}
elementExecutionTracker.enter(newDFState.getStepName());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index 7d90a512519..b2ab928bc99 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.dataflow.worker;
import static
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
+import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.io.Closeable;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.LogRecord;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -39,6 +42,7 @@ import
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte
import org.apache.beam.sdk.metrics.MetricsContainer;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormatter;
@@ -181,6 +185,9 @@ public class DataflowOperationContext implements
OperationContext {
private final ProfileScope profileScope;
private final @Nullable MetricsContainer metricsContainer;
+ /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
+ private final Clock clock;
+
public DataflowExecutionState(
NameContext nameContext,
String stateName,
@@ -188,12 +195,31 @@ public class DataflowOperationContext implements
OperationContext {
@Nullable Integer inputIndex,
@Nullable MetricsContainer metricsContainer,
ProfileScope profileScope) {
+ this(
+ nameContext,
+ stateName,
+ requestingStepName,
+ inputIndex,
+ metricsContainer,
+ profileScope,
+ Clock.SYSTEM);
+ }
+
+ public DataflowExecutionState(
+ NameContext nameContext,
+ String stateName,
+ @Nullable String requestingStepName,
+ @Nullable Integer inputIndex,
+ @Nullable MetricsContainer metricsContainer,
+ ProfileScope profileScope,
+ Clock clock) {
super(stateName);
this.stepName = nameContext;
this.requestingStepName = requestingStepName;
this.inputIndex = inputIndex;
this.profileScope = Preconditions.checkNotNull(profileScope);
this.metricsContainer = metricsContainer;
+ this.clock = clock;
}
/**
@@ -225,6 +251,9 @@ public class DataflowOperationContext implements
OperationContext {
return description.toString();
}
+ private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+ ImmutableSet.of(SimpleDoFnRunner.class.getName(),
DoFnInstanceManagers.class.getName());
+
protected String getLullMessage(Thread trackedThread, Duration
lullDuration) {
StringBuilder message = new StringBuilder();
message.append("Operation ongoing");
@@ -243,7 +272,7 @@ public class DataflowOperationContext implements
OperationContext {
message.append("\n");
-
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
+
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
return message.toString();
}
@@ -267,6 +296,55 @@ public class DataflowOperationContext implements
OperationContext {
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(this, logRecord);
+
+ if (shouldLogFullThreadDump(lullDuration)) {
+ Map<Thread, StackTraceElement[]> threadSet =
Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> entry :
threadSet.entrySet()) {
+ Thread thread = entry.getKey();
+ StackTraceElement[] stackTrace = entry.getValue();
+ StringBuilder message = new StringBuilder();
+ message.append(thread.toString()).append(":\n");
+ message.append(getStackTraceForLullMessage(stackTrace));
+ logRecord = new LogRecord(Level.INFO, message.toString());
+ logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+ dataflowLoggingHandler.publish(this, logRecord);
+ }
+ }
+ }
+
+ /**
+ * The time interval between two full thread dump. (A full thread dump is
performed at most once
+ * every 20 minutes.)
+ */
+ private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60
* 1000;
+
+ /** The minimum lull duration to perform a full thread dump. */
+ private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 *
1000;
+
+ /** Last time when a full thread dump was performed. */
+ private long lastFullThreadDumpMillis = 0;
+
+ private boolean shouldLogFullThreadDump(Duration lullDuration) {
+ if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
+ return false;
+ }
+ long now = clock.currentTimeMillis();
+ if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS <
now) {
+ lastFullThreadDumpMillis = now;
+ return true;
+ }
+ return false;
+ }
+
+ private String getStackTraceForLullMessage(StackTraceElement[] stackTrace)
{
+ StringBuilder message = new StringBuilder();
+ for (StackTraceElement e : stackTrace) {
+ if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+ break;
+ }
+ message.append(" at ").append(e).append("\n");
+ }
+ return message.toString();
}
public @Nullable MetricsContainer getMetricsContainer() {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
deleted file mode 100644
index 041944f09cf..00000000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
-import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
-import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
-import org.apache.beam.sdk.annotations.Internal;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility methods to print the stack traces of all the threads. */
-@Internal
-public final class StackTraceUtil {
- private static final ImmutableSet<String> FRAMEWORK_CLASSES =
- ImmutableSet.of(SimpleDoFnRunner.class.getName(),
DoFnInstanceManagers.class.getName());
- private static final Logger LOG =
LoggerFactory.getLogger(StackTraceUtil.class);
-
- public static String getStackTraceForLullMessage(StackTraceElement[]
stackTrace) {
- StringBuilder message = new StringBuilder();
- for (StackTraceElement e : stackTrace) {
- if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
- break;
- }
- message.append(" at ").append(e).append("\n");
- }
- return message.toString();
- }
-
- public static void logAllStackTraces() {
- DataflowWorkerLoggingHandler dataflowLoggingHandler =
- DataflowWorkerLoggingInitializer.getLoggingHandler();
- Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
- for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
- Thread thread = entry.getKey();
- StackTraceElement[] stackTrace = entry.getValue();
- StringBuilder message = new StringBuilder();
- message.append(thread.toString()).append(":\n");
- message.append(getStackTraceForLullMessage(stackTrace));
- LogRecord logRecord = new LogRecord(Level.INFO, message.toString());
- logRecord.setLoggerName(StackTraceUtil.LOG.getName());
- dataflowLoggingHandler.publish(logRecord);
- }
- }
-
- private StackTraceUtil() {}
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 3502b621147..551937c3559 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -22,14 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
import java.io.Closeable;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.file.Files;
-import java.util.List;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -41,55 +35,28 @@ import
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
-import org.hamcrest.Matchers;
import org.joda.time.DateTimeUtils.MillisProvider;
-import org.joda.time.Duration;
-import org.junit.After;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
/** Tests for {@link DataflowExecutionStateTrackerTest}. */
public class DataflowExecutionStateTrackerTest {
- @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Rule public RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
-
- private File logFolder;
private PipelineOptions options;
private MillisProvider clock;
private ExecutionStateSampler sampler;
private CounterSet counterSet;
@Before
- public void setUp() throws IOException {
+ public void setUp() {
options = PipelineOptionsFactory.create();
clock = mock(MillisProvider.class);
sampler = ExecutionStateSampler.newForTest(clock);
counterSet = new CounterSet();
- logFolder = tempFolder.newFolder();
- System.setProperty(
- DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY,
- new File(logFolder, "dataflow-json.log").getAbsolutePath());
- // We need to reset *first* because some other test may have already
initialized the
- // logging initializer.
- DataflowWorkerLoggingInitializer.reset();
- DataflowWorkerLoggingInitializer.initialize();
- }
-
- @After
- public void tearDown() {
- DataflowWorkerLoggingInitializer.reset();
}
private final NameContext step1 =
@@ -100,7 +67,7 @@ public class DataflowExecutionStateTrackerTest {
@Test
public void testReportsElementExecutionTime() throws IOException {
enableTimePerElementExperiment();
- DataflowExecutionStateTracker tracker = createTracker();
+ ExecutionStateTracker tracker = createTracker();
try (Closeable c1 = tracker.activate(new Thread())) {
try (Closeable c2 = tracker.enterState(step1Process)) {}
@@ -117,7 +84,7 @@ public class DataflowExecutionStateTrackerTest {
@Test
public void testTakesSampleOnDeactivate() throws IOException {
enableTimePerElementExperiment();
- DataflowExecutionStateTracker tracker = createTracker();
+ ExecutionStateTracker tracker = createTracker();
try (Closeable c1 = tracker.activate(new Thread())) {
try (Closeable c2 = tracker.enterState(step1Process)) {
@@ -156,7 +123,7 @@ public class DataflowExecutionStateTrackerTest {
.build()));
}
- private DataflowExecutionStateTracker createTracker() {
+ private ExecutionStateTracker createTracker() {
return new DataflowExecutionStateTracker(
sampler,
new TestDataflowExecutionState(NameContext.forStage("test-stage"),
"other"),
@@ -164,103 +131,4 @@ public class DataflowExecutionStateTrackerTest {
options,
"test-work-item-id");
}
-
- @Test
- public void testLullReportsRightTrace() throws Exception {
- FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
- DataflowExecutionStateTracker tracker = createTracker(clock);
- // Adding test for the full thread dump, but since we can't mock
- // Thread.getAllStackTraces(), we are starting a background thread
- // to verify the full thread dump.
- Thread backgroundThread =
- new Thread("backgroundThread") {
- @Override
- public void run() {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- // exiting the thread
- }
- }
- };
-
- backgroundThread.start();
- try {
- // Full thread dump should be performed, because we never performed
- // a full thread dump before, and the lull duration is more than 20
- // minutes.
- tracker.reportBundleLull(30 * 60 * 1000);
- verifyLullLog(true);
-
- // Full thread dump should not be performed because the last dump
- // was only 5 minutes ago.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
- tracker.reportBundleLull(30 * 60 * 1000);
- verifyLullLog(false);
-
- // Full thread dump should not be performed because the lull duration
- // is only 6 minutes.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
- tracker.reportBundleLull(6 * 60 * 1000);
- verifyLullLog(false);
-
- // Full thread dump should be performed, because it has been 21 minutes
- // since the last dump, and the lull duration is more than 20 minutes.
- clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
- tracker.reportBundleLull(30 * 60 * 1000);
- // verifyLullLog(true);
- } finally {
- // Cleaning up the background thread.
- backgroundThread.interrupt();
- backgroundThread.join();
- }
- }
-
- private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
- File[] files = logFolder.listFiles();
- assertThat(files, Matchers.arrayWithSize(1));
- File logFile = files[0];
- List<String> lines = Files.readAllLines(logFile.toPath());
-
- String warnLines =
- Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"WARN\"")));
- assertThat(
- warnLines,
- Matchers.allOf(
- Matchers.containsString("Operation ongoing in bundle for at
least"),
- Matchers.containsString(" without completing"),
- Matchers.containsString("Processing times in each step"),
- Matchers.containsString(
-
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker")));
-
- String infoLines =
- Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"INFO\"")));
- if (hasFullThreadDump) {
- assertThat(
- infoLines,
- Matchers.allOf(
- Matchers.containsString("Thread[backgroundThread,"),
-
Matchers.containsString("org.apache.beam.runners.dataflow.worker.StackTraceUtil")));
- } else {
- assertThat(
- infoLines,
- Matchers.not(
- Matchers.anyOf(
- Matchers.containsString("Thread[backgroundThread,"),
- Matchers.containsString(
-
"org.apache.beam.runners.dataflow.worker.StackTraceUtil"))));
- }
- // Truncate the file when done to prepare for the next test.
- new FileOutputStream(logFile, false).getChannel().truncate(0).close();
- }
-
- private DataflowExecutionStateTracker createTracker(Clock clock) {
- return new DataflowExecutionStateTracker(
- sampler,
- new TestDataflowExecutionState(NameContext.forStage("test-stage"),
"other"),
- counterSet,
- options,
- "test-work-item-id",
- clock);
- }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
index 88a9f8e11c7..34c3b3d5373 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.api.client.testing.http.FixedClock;
+import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.io.Closeable;
import java.io.File;
@@ -204,6 +206,7 @@ public class DataflowOperationContextTest {
@Test
public void testLullReportsRightTrace() throws Exception {
Thread mockThread = mock(Thread.class);
+ FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
DataflowExecutionState executionState =
new DataflowExecutionState(
@@ -212,7 +215,8 @@ public class DataflowOperationContextTest {
null /* requestingStepName */,
null /* inputIndex */,
null /* metricsContainer */,
- ScopedProfiler.INSTANCE.emptyScope()) {
+ ScopedProfiler.INSTANCE.emptyScope(),
+ clock) {
@Override
public @Nullable CounterUpdate extractUpdate(boolean
isFinalUpdate) {
// not being used for extracting updates
@@ -234,11 +238,55 @@ public class DataflowOperationContextTest {
SimpleDoFnRunner.class.getName(), "processElement",
"SimpleDoFnRunner.java", 500),
};
when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
- executionState.reportLull(mockThread, 30 * 60 * 1000);
- verifyLullLog();
+
+ // Adding test for the full thread dump, but since we can't mock
+ // Thread.getAllStackTraces(), we are starting a background thread
+ // to verify the full thread dump.
+ Thread backgroundThread =
+ new Thread("backgroundThread") {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // exiting the thread
+ }
+ }
+ };
+
+ backgroundThread.start();
+ try {
+ // Full thread dump should be performed, because we never performed
+ // a full thread dump before, and the lull duration is more than 20
+ // minutes.
+ executionState.reportLull(mockThread, 30 * 60 * 1000);
+ verifyLullLog(true);
+
+ // Full thread dump should not be performed because the last dump
+ // was only 5 minutes ago.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
+ executionState.reportLull(mockThread, 30 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should not be performed because the lull duration
+ // is only 6 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ executionState.reportLull(mockThread, 6 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should be performed, because it has been 21 minutes
+ // since the last dump, and the lull duration is more than 20 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ executionState.reportLull(mockThread, 30 * 60 * 1000);
+ verifyLullLog(true);
+ } finally {
+ // Cleaning up the background thread.
+ backgroundThread.interrupt();
+ backgroundThread.join();
+ }
}
- private void verifyLullLog() throws IOException {
+ private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
File[] files = logFolder.listFiles();
assertThat(files, Matchers.arrayWithSize(1));
File logFile = files[0];
@@ -257,13 +305,23 @@ public class DataflowOperationContextTest {
String infoLines =
Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"INFO\"")));
- assertThat(
- infoLines,
- Matchers.not(
- Matchers.anyOf(
- Matchers.containsString("Thread[backgroundThread,"),
- Matchers.containsString(
-
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
+ if (hasFullThreadDump) {
+ assertThat(
+ infoLines,
+ Matchers.allOf(
+ Matchers.containsString("Thread[backgroundThread,"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"),
+
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
+ } else {
+ assertThat(
+ infoLines,
+ Matchers.not(
+ Matchers.anyOf(
+ Matchers.containsString("Thread[backgroundThread,"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
+ }
// Truncate the file when done to prepare for the next test.
new FileOutputStream(logFile, false).getChannel().truncate(0).close();
}