scwhittle commented on code in PR #29882:
URL: https://github.com/apache/beam/pull/29882#discussion_r1497343785
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -260,23 +270,69 @@ public static class DataflowExecutionStateTracker extends
ExecutionStateTracker
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- private final MillisProvider clock = System::currentTimeMillis;
+ /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
+ private final Clock clock;
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
+ /** Last milliseconds since epoch when a full thread dump was performed at
bundle level. */
+ private long lastFullThreadDumpMillisForBundleLull = 0;
+
+ /** The minimum lull duration in milliseconds to perform a full thread
dump. */
+ private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 *
60 * 1000;
+
+ private long logFullThreadDumpMillisForBundleLull;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
+
+ private static final PeriodFormatter DURATION_FORMATTER =
+ new PeriodFormatterBuilder()
+ .appendDays()
+ .appendSuffix("d")
+ .minimumPrintedDigits(2)
+ .appendHours()
+ .appendSuffix("h")
+ .printZeroAlways()
+ .appendMinutes()
+ .appendSuffix("m")
+ .appendSeconds()
+ .appendSuffix("s")
+ .toFormatter();
+
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
+ this(
+ sampler,
+ otherState,
+ counterFactory,
+ options,
+ workItemId,
+ Clock.SYSTEM,
+ LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS);
+ }
+
+ public DataflowExecutionStateTracker(
Review Comment:
can annotate with @ VisibleForTesting
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -260,23 +270,69 @@ public static class DataflowExecutionStateTracker extends
ExecutionStateTracker
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- private final MillisProvider clock = System::currentTimeMillis;
+ /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
+ private final Clock clock;
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
+ /** Last milliseconds since epoch when a full thread dump was performed at
bundle level. */
+ private long lastFullThreadDumpMillisForBundleLull = 0;
Review Comment:
how about naming this lastFullThreadDumpMillis
can change comment to not refer to at bundle level as well
The current name is too similar to logFullThreadDumpMillisForBundleLull and
is confusing.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -260,23 +270,69 @@ public static class DataflowExecutionStateTracker extends
ExecutionStateTracker
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;
- private final MillisProvider clock = System::currentTimeMillis;
+ /** Clock used to either provide real system time or mocked to virtualize
time for testing. */
+ private final Clock clock;
@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
+ /** Last milliseconds since epoch when a full thread dump was performed at
bundle level. */
+ private long lastFullThreadDumpMillisForBundleLull = 0;
+
+ /** The minimum lull duration in milliseconds to perform a full thread
dump. */
+ private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 *
60 * 1000;
+
+ private long logFullThreadDumpMillisForBundleLull;
Review Comment:
if you're injecting the clock it seems you could remove this variable and
just use the constant below. In the test just advance the clock by enough to
be greater than the constant.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java:
##########
@@ -123,12 +157,112 @@ private void
assertElementProcessingTimeCounter(NameContext step, int millis, in
.build()));
}
- private ExecutionStateTracker createTracker() {
+ private DataflowExecutionStateTracker createTracker() {
return new DataflowExecutionStateTracker(
sampler,
new TestDataflowExecutionState(NameContext.forStage("test-stage"),
"other"),
counterSet,
options,
"test-work-item-id");
}
+
+ @Test
+ public void testLullReportsRightTrace() throws Exception {
+ FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+ DataflowExecutionStateTracker tracker = createTracker(clock);
+ // Adding test for the full thread dump, but since we can't mock
+ // Thread.getAllStackTraces(), we are starting a background thread
+ // to verify the full thread dump.
+ Thread backgroundThread =
+ new Thread("backgroundThread") {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // exiting the thread
+ }
+ }
+ };
+
+ backgroundThread.start();
+ try {
+ // Full thread dump should be performed, because we never performed
+ // a full thread dump before, and the lull duration is more than 20
+ // minutes.
+ tracker.reportBundleLull(30 * 60 * 1000);
+ verifyLullLog(true);
+
+ // Full thread dump should not be performed because the last dump
+ // was only 5 minutes ago.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
+ tracker.reportBundleLull(30 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should not be performed because the lull duration
+ // is only 6 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ tracker.reportBundleLull(6 * 60 * 1000);
+ verifyLullLog(false);
+
+ // Full thread dump should be performed, because it has been 21 minutes
+ // since the last dump, and the lull duration is more than 20 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(16L).getMillis());
+ tracker.reportBundleLull(30 * 60 * 1000);
+ // verifyLullLog(true);
+ } finally {
+ // Cleaning up the background thread.
+ backgroundThread.interrupt();
+ backgroundThread.join();
+ }
+ }
+
+ private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
+ File[] files = logFolder.listFiles();
+ assertThat(files, Matchers.arrayWithSize(1));
+ File logFile = files[0];
+ List<String> lines = Files.readAllLines(logFile.toPath());
+
+ String warnLines =
+ Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"WARN\"")));
+ assertThat(
+ warnLines,
+ Matchers.allOf(
+ Matchers.containsString("Operation ongoing in bundle for at
least"),
+ Matchers.containsString(" without completing"),
+ Matchers.containsString("Processing times in each step"),
+ Matchers.containsString(
+
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker")));
+
+ String infoLines =
+ Joiner.on("\n").join(Iterables.filter(lines, line ->
line.contains("\"INFO\"")));
+ if (hasFullThreadDump) {
+ assertThat(
+ infoLines,
+ Matchers.anyOf(
Review Comment:
allOf?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]