Repository: flink Updated Branches: refs/heads/master c212701d5 -> 9435370e7
[FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker Make triggerStackTraceSampleInternal private again and add locking to triggerStackTraceSample. This closes #5422. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9435370e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9435370e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9435370e Branch: refs/heads/master Commit: 9435370e76098f8ea3b689411c085c82a253a6d3 Parents: c85d5e3 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Feb 8 12:38:53 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Feb 8 12:40:29 2018 +0100 ---------------------------------------------------------------------- .../backpressure/BackPressureStatsTracker.java | 54 ++++++++++---------- .../BackPressureStatsTrackerTest.java | 8 ++- 2 files changed, 30 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java index 8c130e6..ec8a451 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -172,40 +171,39 @@ public class BackPressureStatsTracker { * @param vertex Operator to get the stats for. * @return Flag indicating whether a sample with triggered. */ - @VisibleForTesting - boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) { - synchronized (lock) { - if (shutDown) { - return false; - } + private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) { + assert(Thread.holdsLock(lock)); - if (!pendingStats.contains(vertex) && - !vertex.getGraph().getState().isGloballyTerminalState()) { + if (shutDown) { + return false; + } - Executor executor = vertex.getGraph().getFutureExecutor(); + if (!pendingStats.contains(vertex) && + !vertex.getGraph().getState().isGloballyTerminalState()) { - // Only trigger if still active job - if (executor != null) { - pendingStats.add(vertex); + Executor executor = vertex.getGraph().getFutureExecutor(); - if (LOG.isDebugEnabled()) { - LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); - } + // Only trigger if still active job + if (executor != null) { + pendingStats.add(vertex); - CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample( - vertex.getTaskVertices(), - numSamples, - delayBetweenSamples, - MAX_STACK_TRACE_DEPTH); + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); + } - sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); + CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample( + vertex.getTaskVertices(), + numSamples, + delayBetweenSamples, + MAX_STACK_TRACE_DEPTH); - return true; - } - } + sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); - return false; + return true; + } } + + return false; } /** @@ -220,7 +218,9 @@ public class BackPressureStatsTracker { */ @Deprecated public boolean triggerStackTraceSample(ExecutionJobVertex vertex) { - return triggerStackTraceSampleInternal(vertex); + synchronized (lock) { + return triggerStackTraceSampleInternal(vertex); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java index debf71d..0bbf5f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java @@ -93,18 +93,16 @@ public class BackPressureStatsTrackerTest extends TestLogger { // getOperatorBackPressureStats triggers stack trace sampling Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - Mockito.verify(sampleCoordinator).triggerStackTraceSample( + Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( Matchers.eq(taskVertices), Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples), Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH)); - // Trigger again for pending request, should not fire - Assert.assertFalse("Unexpected trigger", tracker.triggerStackTraceSampleInternal(jobVertex)); - + // Request back pressure stats again. This should not trigger another sample request Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - Mockito.verify(sampleCoordinator).triggerStackTraceSample( + Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( Matchers.eq(taskVertices), Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples),