Repository: flink
Updated Branches:
  refs/heads/master 92026ff39 -> 24c30878e


[FLINK-8626] Introduce BackPressureStatsTracker interface

Renames BackPressureStatsTracker into BackPressureStatsTrackerImpl and introduce
a BackPressureStatsTracker interface. This will make testing easier when we 
don't
have to set up all the different components.

This closes #5443.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f719befa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f719befa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f719befa

Branch: refs/heads/master
Commit: f719befaa1c00997ef34809e121f6192f44f7164
Parents: 92026ff
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 9 14:07:31 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Feb 13 08:50:50 2018 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  14 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   2 +-
 .../runtime/jobmaster/JobManagerServices.java   |  20 +-
 .../legacy/JobVertexBackPressureHandler.java    |  14 +-
 .../backpressure/BackPressureStatsTracker.java  | 331 +----------------
 .../BackPressureStatsTrackerImpl.java           | 363 +++++++++++++++++++
 .../VoidBackPressureStatsTracker.java           |  36 ++
 .../flink/runtime/jobmaster/JobMasterTest.java  |  11 +-
 .../JobVertexBackPressureHandlerTest.java       |  10 +-
 .../BackPressureStatsTrackerITCase.java         | 333 -----------------
 .../BackPressureStatsTrackerImplITCase.java     | 333 +++++++++++++++++
 .../BackPressureStatsTrackerImplTest.java       | 183 ++++++++++
 .../BackPressureStatsTrackerTest.java           | 183 ----------
 13 files changed, 951 insertions(+), 882 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3583763..c0c6ac1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -56,7 +56,7 @@ import 
org.apache.flink.runtime.rest.handler.legacy.SubtasksAllAccumulatorsHandl
 import org.apache.flink.runtime.rest.handler.legacy.SubtasksTimesHandler;
 import org.apache.flink.runtime.rest.handler.legacy.TaskManagerLogHandler;
 import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
@@ -143,7 +143,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private final StackTraceSampleCoordinator stackTraceSamples;
 
-       private final BackPressureStatsTracker backPressureStatsTracker;
+       private final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
 
        private final WebMonitorConfig cfg;
 
@@ -223,7 +223,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                Time delayBetweenSamples = Time.milliseconds(delay);
 
-               backPressureStatsTracker = new BackPressureStatsTracker(
+               backPressureStatsTrackerImpl = new BackPressureStatsTrackerImpl(
                        stackTraceSamples,
                        cleanUpInterval,
                        numSamples,
@@ -288,7 +288,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                get(router, new SubtasksTimesHandler(executionGraphCache, 
scheduledExecutor));
                get(router, new 
JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
                get(router, new 
JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor));
-               get(router, new 
JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, 
backPressureStatsTracker, refreshInterval));
+               get(router, new 
JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, 
backPressureStatsTrackerImpl, refreshInterval));
                get(router, new 
SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor));
                get(router, new 
SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
                get(router, new 
SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, 
metricFetcher));
@@ -447,7 +447,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                synchronized (startupShutdownLock) {
                        leaderRetrievalService.start(retriever);
 
-                       long delay = 
backPressureStatsTracker.getCleanUpInterval();
+                       long delay = 
backPressureStatsTrackerImpl.getCleanUpInterval();
 
                        // Scheduled back pressure stats tracker cache cleanup. 
We schedule
                        // this here repeatedly, because cache clean up only 
happens on
@@ -457,7 +457,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                                @Override
                                public void run() {
                                        try {
-                                               
backPressureStatsTracker.cleanUpOperatorStatsCache();
+                                               
backPressureStatsTrackerImpl.cleanUpOperatorStatsCache();
                                        } catch (Throwable t) {
                                                LOG.error("Error during back 
pressure stats cache cleanup.", t);
                                        }
@@ -480,7 +480,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                        stackTraceSamples.shutDown();
 
-                       backPressureStatsTracker.shutDown();
+                       backPressureStatsTrackerImpl.shutDown();
 
                        cleanup();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index bca135a..06700d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -172,7 +172,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                userCodeLoader,
                                restAddress,
                                metricRegistry.getMetricQueryServicePath(),
-                               jobManagerServices.backPressureStatsTracker);
+                               
jobManagerServices.backPressureStatsTrackerImpl);
 
                        this.timeout = jobManagerServices.rpcAskTimeout;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e363265..2295b5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
@@ -56,7 +56,7 @@ public class JobManagerServices {
        public final Time rpcAskTimeout;
 
        private final StackTraceSampleCoordinator stackTraceSampleCoordinator;
-       public final BackPressureStatsTracker backPressureStatsTracker;
+       public final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
 
        public JobManagerServices(
                        ScheduledExecutorService executorService,
@@ -64,19 +64,19 @@ public class JobManagerServices {
                        RestartStrategyFactory restartStrategyFactory,
                        Time rpcAskTimeout,
                        StackTraceSampleCoordinator stackTraceSampleCoordinator,
-                       BackPressureStatsTracker backPressureStatsTracker) {
+                       BackPressureStatsTrackerImpl 
backPressureStatsTrackerImpl) {
 
                this.executorService = checkNotNull(executorService);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
                this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
                this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
                this.stackTraceSampleCoordinator = 
checkNotNull(stackTraceSampleCoordinator);
-               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker);
+               this.backPressureStatsTrackerImpl = 
checkNotNull(backPressureStatsTrackerImpl);
 
                executorService.scheduleWithFixedDelay(
-                       backPressureStatsTracker::cleanUpOperatorStatsCache,
-                       backPressureStatsTracker.getCleanUpInterval(),
-                       backPressureStatsTracker.getCleanUpInterval(),
+                       backPressureStatsTrackerImpl::cleanUpOperatorStatsCache,
+                       backPressureStatsTrackerImpl.getCleanUpInterval(),
+                       backPressureStatsTrackerImpl.getCleanUpInterval(),
                        TimeUnit.MILLISECONDS);
        }
 
@@ -101,7 +101,7 @@ public class JobManagerServices {
                libraryCacheManager.shutdown();
 
                stackTraceSampleCoordinator.shutDown();
-               backPressureStatsTracker.shutDown();
+               backPressureStatsTrackerImpl.shutDown();
 
                if (firstException != null) {
                        ExceptionUtils.rethrowException(firstException, "Error 
while shutting down JobManager services");
@@ -145,7 +145,7 @@ public class JobManagerServices {
 
                final StackTraceSampleCoordinator stackTraceSampleCoordinator =
                        new StackTraceSampleCoordinator(futureExecutor, 
timeout.toMillis());
-               final BackPressureStatsTracker backPressureStatsTracker = new 
BackPressureStatsTracker(
+               final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl 
= new BackPressureStatsTrackerImpl(
                        stackTraceSampleCoordinator,
                        
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
                        config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
@@ -158,6 +158,6 @@ public class JobManagerServices {
                        
RestartStrategyFactory.createRestartStrategyFactory(config),
                        Time.of(timeout.length(), timeout.unit()),
                        stackTraceSampleCoordinator,
-                       backPressureStatsTracker);
+                       backPressureStatsTrackerImpl);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
index 4538650..b733434 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -46,7 +46,7 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
        private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
 
        /** Back pressure stats tracker. */
-       private final BackPressureStatsTracker backPressureStatsTracker;
+       private final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
 
        /** Time after which stats are considered outdated. */
        private final int refreshInterval;
@@ -54,11 +54,11 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
        public JobVertexBackPressureHandler(
                        ExecutionGraphCache executionGraphHolder,
                        Executor executor,
-                       BackPressureStatsTracker backPressureStatsTracker,
+                       BackPressureStatsTrackerImpl 
backPressureStatsTrackerImpl,
                        int refreshInterval) {
 
                super(executionGraphHolder, executor);
-               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker, "Stats tracker");
+               this.backPressureStatsTrackerImpl = 
checkNotNull(backPressureStatsTrackerImpl, "Stats tracker");
                checkArgument(refreshInterval >= 0, "Negative timeout");
                this.refreshInterval = refreshInterval;
        }
@@ -81,7 +81,7 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
 
                        gen.writeStartObject();
 
-                       Optional<OperatorBackPressureStats> statsOption = 
backPressureStatsTracker
+                       Optional<OperatorBackPressureStats> statsOption = 
backPressureStatsTrackerImpl
                                        
.getOperatorBackPressureStats(jobVertex);
 
                        if (statsOption.isPresent()) {
@@ -89,7 +89,7 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
 
                                // Check whether we need to refresh
                                if (refreshInterval <= 
System.currentTimeMillis() - stats.getEndTimestamp()) {
-                                       
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+                                       
backPressureStatsTrackerImpl.triggerStackTraceSample(jobVertex);
                                        gen.writeStringField("status", 
"deprecated");
                                } else {
                                        gen.writeStringField("status", "ok");
@@ -112,7 +112,7 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
                                }
                                gen.writeEndArray();
                        } else {
-                               
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+                               
backPressureStatsTrackerImpl.triggerStackTraceSample(jobVertex);
                                gen.writeStringField("status", "deprecated");
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
index ec8a451..356c8b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -18,133 +18,14 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
 
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Back pressure statistics tracker.
- *
- * <p>Back pressure is determined by sampling running tasks. If a task is
- * slowed down by back pressure it will be stuck in memory requests to a
- * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
- *
- * <p>The back pressured stack traces look like this:
- *
- * <pre>
- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
<--- BLOCKING
- * request
- * [...]
- * </pre>
+ * Interface for a tracker of back pressure statistics for {@link 
ExecutionJobVertex}.
  */
-public class BackPressureStatsTracker {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureStatsTracker.class);
-
-       /** Maximum stack trace depth for samples. */
-       static final int MAX_STACK_TRACE_DEPTH = 3;
-
-       /** Expected class name for back pressure indicating stack trace 
element. */
-       static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
-
-       /** Expected method name for back pressure indicating stack trace 
element. */
-       static final String EXPECTED_METHOD_NAME = 
"requestBufferBuilderBlocking";
-
-       /** Lock guarding trigger operations. */
-       private final Object lock = new Object();
-
-       /* Stack trace sample coordinator. */
-       private final StackTraceSampleCoordinator coordinator;
-
-       /**
-        * Completed stats. Important: Job vertex IDs need to be scoped by job 
ID,
-        * because they are potentially constant across runs messing up the 
cached
-        * data.
-        */
-       private final Cache<ExecutionJobVertex, OperatorBackPressureStats> 
operatorStatsCache;
-
-       /** Pending in progress stats. Important: Job vertex IDs need to be 
scoped
-        * by job ID, because they are potentially constant across runs messing 
up
-        * the cached data.*/
-       private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
-
-       /** Cleanup interval for completed stats cache. */
-       private final int cleanUpInterval;
-
-       private final int numSamples;
-
-       private final int backPressureStatsRefreshInterval;
-
-       private final Time delayBetweenSamples;
-
-       /** Flag indicating whether the stats tracker has been shut down. */
-       private boolean shutDown;
-
-       /**
-        * Creates a back pressure statistics tracker.
-        *
-        * @param cleanUpInterval     Clean up interval for completed stats.
-        * @param numSamples          Number of stack trace samples when 
determining back pressure.
-        * @param delayBetweenSamples Delay between samples when determining 
back pressure.
-        */
-       public BackPressureStatsTracker(
-                       StackTraceSampleCoordinator coordinator,
-                       int cleanUpInterval,
-                       int numSamples,
-                       int backPressureStatsRefreshInterval,
-                       Time delayBetweenSamples) {
-
-               this.coordinator = checkNotNull(coordinator, "Stack trace 
sample coordinator");
-
-               checkArgument(cleanUpInterval >= 0, "Clean up interval");
-               this.cleanUpInterval = cleanUpInterval;
-
-               checkArgument(numSamples >= 1, "Number of samples");
-               this.numSamples = numSamples;
-
-               checkArgument(
-                       backPressureStatsRefreshInterval >= 0,
-                       "backPressureStatsRefreshInterval must be greater than 
or equal to 0");
-               this.backPressureStatsRefreshInterval = 
backPressureStatsRefreshInterval;
-
-               this.delayBetweenSamples = checkNotNull(delayBetweenSamples, 
"Delay between samples");
-
-               this.operatorStatsCache = CacheBuilder.newBuilder()
-                               .concurrencyLevel(1)
-                               .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
-                               .build();
-       }
-
-       /** Cleanup interval for completed stats cache. */
-       public long getCleanUpInterval() {
-               return cleanUpInterval;
-       }
+public interface BackPressureStatsTracker {
 
        /**
         * Returns back pressure statistics for a operator. Automatically 
triggers stack trace sampling
@@ -153,211 +34,5 @@ public class BackPressureStatsTracker {
         * @param vertex Operator to get the stats for.
         * @return Back pressure statistics for an operator
         */
-       public Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
-               synchronized (lock) {
-                       final OperatorBackPressureStats stats = 
operatorStatsCache.getIfPresent(vertex);
-                       if (stats == null || backPressureStatsRefreshInterval 
<= System.currentTimeMillis() - stats.getEndTimestamp()) {
-                               triggerStackTraceSampleInternal(vertex);
-                       }
-                       return Optional.ofNullable(stats);
-               }
-       }
-
-       /**
-        * Triggers a stack trace sample for a operator to gather the back 
pressure
-        * statistics. If there is a sample in progress for the operator, the 
call
-        * is ignored.
-        *
-        * @param vertex Operator to get the stats for.
-        * @return Flag indicating whether a sample with triggered.
-        */
-       private boolean triggerStackTraceSampleInternal(final 
ExecutionJobVertex vertex) {
-               assert(Thread.holdsLock(lock));
-
-               if (shutDown) {
-                       return false;
-               }
-
-               if (!pendingStats.contains(vertex) &&
-                       
!vertex.getGraph().getState().isGloballyTerminalState()) {
-
-                       Executor executor = 
vertex.getGraph().getFutureExecutor();
-
-                       // Only trigger if still active job
-                       if (executor != null) {
-                               pendingStats.add(vertex);
-
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Triggering stack trace 
sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
-                               }
-
-                               CompletableFuture<StackTraceSample> sample = 
coordinator.triggerStackTraceSample(
-                                       vertex.getTaskVertices(),
-                                       numSamples,
-                                       delayBetweenSamples,
-                                       MAX_STACK_TRACE_DEPTH);
-
-                               sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
-
-                               return true;
-                       }
-               }
-
-               return false;
-       }
-
-       /**
-        * Triggers a stack trace sample for a operator to gather the back 
pressure
-        * statistics. If there is a sample in progress for the operator, the 
call
-        * is ignored.
-        *
-        * @param vertex Operator to get the stats for.
-        * @return Flag indicating whether a sample with triggered.
-        * @deprecated {@link 
#getOperatorBackPressureStats(ExecutionJobVertex)} will trigger
-        * stack trace sampling automatically.
-        */
-       @Deprecated
-       public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
-               synchronized (lock) {
-                       return triggerStackTraceSampleInternal(vertex);
-               }
-       }
-
-       /**
-        * Cleans up the operator stats cache if it contains timed out entries.
-        *
-        * <p>The Guava cache only evicts as maintenance during normal 
operations.
-        * If this handler is inactive, it will never be cleaned.
-        */
-       public void cleanUpOperatorStatsCache() {
-               operatorStatsCache.cleanUp();
-       }
-
-       /**
-        * Shuts down the stats tracker.
-        *
-        * <p>Invalidates the cache and clears all pending stats.
-        */
-       public void shutDown() {
-               synchronized (lock) {
-                       if (!shutDown) {
-                               operatorStatsCache.invalidateAll();
-                               pendingStats.clear();
-
-                               shutDown = true;
-                       }
-               }
-       }
-
-       /**
-        * Invalidates the cache (irrespective of clean up interval).
-        */
-       void invalidateOperatorStatsCache() {
-               operatorStatsCache.invalidateAll();
-       }
-
-       /**
-        * Callback on completed stack trace sample.
-        */
-       class StackTraceSampleCompletionCallback implements 
BiFunction<StackTraceSample, Throwable, Void> {
-
-               private final ExecutionJobVertex vertex;
-
-               public StackTraceSampleCompletionCallback(ExecutionJobVertex 
vertex) {
-                       this.vertex = vertex;
-               }
-
-               @Override
-               public Void apply(StackTraceSample stackTraceSample, Throwable 
throwable) {
-                       synchronized (lock) {
-                               try {
-                                       if (shutDown) {
-                                               return null;
-                                       }
-
-                                       // Job finished, ignore.
-                                       JobStatus jobState = 
vertex.getGraph().getState();
-                                       if (jobState.isGloballyTerminalState()) 
{
-                                               LOG.debug("Ignoring sample, 
because job is in state " + jobState + ".");
-                                       } else if (stackTraceSample != null) {
-                                               OperatorBackPressureStats stats 
= createStatsFromSample(stackTraceSample);
-                                               operatorStatsCache.put(vertex, 
stats);
-                                       } else {
-                                               LOG.debug("Failed to gather 
stack trace sample.", throwable);
-                                       }
-                               } catch (Throwable t) {
-                                       LOG.error("Error during stats 
completion.", t);
-                               } finally {
-                                       pendingStats.remove(vertex);
-                               }
-
-                               return null;
-                       }
-               }
-
-               /**
-                * Creates the back pressure stats from a stack trace sample.
-                *
-                * @param sample Stack trace sample to base stats on.
-                *
-                * @return Back pressure stats
-                */
-               private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample) {
-                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
traces = sample.getStackTraces();
-
-                       // Map task ID to subtask index, because the web 
interface expects
-                       // it like that.
-                       Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
-                                       
.newHashMapWithExpectedSize(traces.size());
-
-                       Set<ExecutionAttemptID> sampledTasks = 
sample.getStackTraces().keySet();
-
-                       for (ExecutionVertex task : vertex.getTaskVertices()) {
-                               ExecutionAttemptID taskId = 
task.getCurrentExecutionAttempt().getAttemptId();
-                               if (sampledTasks.contains(taskId)) {
-                                       subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
-                               } else {
-                                       LOG.debug("Outdated sample. A task, 
which is part of the " +
-                                                       "sample has been 
reset.");
-                               }
-                       }
-
-                       // Ratio of blocked samples to total samples per sub 
task. Array
-                       // position corresponds to sub task index.
-                       double[] backPressureRatio = new double[traces.size()];
-
-                       for (Entry<ExecutionAttemptID, 
List<StackTraceElement[]>> entry : traces.entrySet()) {
-                               int backPressureSamples = 0;
-
-                               List<StackTraceElement[]> taskTraces = 
entry.getValue();
-
-                               for (StackTraceElement[] trace : taskTraces) {
-                                       for (int i = trace.length - 1; i >= 0; 
i--) {
-                                               StackTraceElement elem = 
trace[i];
-
-                                               if 
(elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
-                                                               
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
-
-                                                       backPressureSamples++;
-                                                       break; // Continue with 
next stack trace
-                                               }
-                                       }
-                               }
-
-                               int subtaskIndex = 
subtaskIndexMap.get(entry.getKey());
-
-                               int size = taskTraces.size();
-                               double ratio = (size > 0)
-                                               ? ((double) 
backPressureSamples) / size
-                                               : 0;
-
-                               backPressureRatio[subtaskIndex] = ratio;
-                       }
-
-                       return new OperatorBackPressureStats(
-                                       sample.getSampleId(),
-                                       sample.getEndTime(),
-                                       backPressureRatio);
-               }
-       }
+       Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
new file mode 100644
index 0000000..b1409b1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
<--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);
+
+       /** Maximum stack trace depth for samples. */
+       static final int MAX_STACK_TRACE_DEPTH = 3;
+
+       /** Expected class name for back pressure indicating stack trace 
element. */
+       static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+       /** Expected method name for back pressure indicating stack trace 
element. */
+       static final String EXPECTED_METHOD_NAME = 
"requestBufferBuilderBlocking";
+
+       /** Lock guarding trigger operations. */
+       private final Object lock = new Object();
+
+       /* Stack trace sample coordinator. */
+       private final StackTraceSampleCoordinator coordinator;
+
+       /**
+        * Completed stats. Important: Job vertex IDs need to be scoped by job 
ID,
+        * because they are potentially constant across runs messing up the 
cached
+        * data.
+        */
+       private final Cache<ExecutionJobVertex, OperatorBackPressureStats> 
operatorStatsCache;
+
+       /** Pending in progress stats. Important: Job vertex IDs need to be 
scoped
+        * by job ID, because they are potentially constant across runs messing 
up
+        * the cached data.*/
+       private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+       /** Cleanup interval for completed stats cache. */
+       private final int cleanUpInterval;
+
+       private final int numSamples;
+
+       private final int backPressureStatsRefreshInterval;
+
+       private final Time delayBetweenSamples;
+
+       /** Flag indicating whether the stats tracker has been shut down. */
+       private boolean shutDown;
+
+       /**
+        * Creates a back pressure statistics tracker.
+        *
+        * @param cleanUpInterval     Clean up interval for completed stats.
+        * @param numSamples          Number of stack trace samples when 
determining back pressure.
+        * @param delayBetweenSamples Delay between samples when determining 
back pressure.
+        */
+       public BackPressureStatsTrackerImpl(
+                       StackTraceSampleCoordinator coordinator,
+                       int cleanUpInterval,
+                       int numSamples,
+                       int backPressureStatsRefreshInterval,
+                       Time delayBetweenSamples) {
+
+               this.coordinator = checkNotNull(coordinator, "Stack trace 
sample coordinator");
+
+               checkArgument(cleanUpInterval >= 0, "Clean up interval");
+               this.cleanUpInterval = cleanUpInterval;
+
+               checkArgument(numSamples >= 1, "Number of samples");
+               this.numSamples = numSamples;
+
+               checkArgument(
+                       backPressureStatsRefreshInterval >= 0,
+                       "backPressureStatsRefreshInterval must be greater than 
or equal to 0");
+               this.backPressureStatsRefreshInterval = 
backPressureStatsRefreshInterval;
+
+               this.delayBetweenSamples = checkNotNull(delayBetweenSamples, 
"Delay between samples");
+
+               this.operatorStatsCache = CacheBuilder.newBuilder()
+                               .concurrencyLevel(1)
+                               .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                               .build();
+       }
+
+       /** Cleanup interval for completed stats cache. */
+       public long getCleanUpInterval() {
+               return cleanUpInterval;
+       }
+
+       /**
+        * Returns back pressure statistics for a operator. Automatically 
triggers stack trace sampling
+        * if statistics are not available or outdated.
+        *
+        * @param vertex Operator to get the stats for.
+        * @return Back pressure statistics for an operator
+        */
+       public Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+               synchronized (lock) {
+                       final OperatorBackPressureStats stats = 
operatorStatsCache.getIfPresent(vertex);
+                       if (stats == null || backPressureStatsRefreshInterval 
<= System.currentTimeMillis() - stats.getEndTimestamp()) {
+                               triggerStackTraceSampleInternal(vertex);
+                       }
+                       return Optional.ofNullable(stats);
+               }
+       }
+
+       /**
+        * Triggers a stack trace sample for a operator to gather the back 
pressure
+        * statistics. If there is a sample in progress for the operator, the 
call
+        * is ignored.
+        *
+        * @param vertex Operator to get the stats for.
+        * @return Flag indicating whether a sample with triggered.
+        */
+       private boolean triggerStackTraceSampleInternal(final 
ExecutionJobVertex vertex) {
+               assert(Thread.holdsLock(lock));
+
+               if (shutDown) {
+                       return false;
+               }
+
+               if (!pendingStats.contains(vertex) &&
+                       
!vertex.getGraph().getState().isGloballyTerminalState()) {
+
+                       Executor executor = 
vertex.getGraph().getFutureExecutor();
+
+                       // Only trigger if still active job
+                       if (executor != null) {
+                               pendingStats.add(vertex);
+
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Triggering stack trace 
sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+                               }
+
+                               CompletableFuture<StackTraceSample> sample = 
coordinator.triggerStackTraceSample(
+                                       vertex.getTaskVertices(),
+                                       numSamples,
+                                       delayBetweenSamples,
+                                       MAX_STACK_TRACE_DEPTH);
+
+                               sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
+
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       /**
+        * Triggers a stack trace sample for a operator to gather the back 
pressure
+        * statistics. If there is a sample in progress for the operator, the 
call
+        * is ignored.
+        *
+        * @param vertex Operator to get the stats for.
+        * @return Flag indicating whether a sample with triggered.
+        * @deprecated {@link 
#getOperatorBackPressureStats(ExecutionJobVertex)} will trigger
+        * stack trace sampling automatically.
+        */
+       @Deprecated
+       public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
+               synchronized (lock) {
+                       return triggerStackTraceSampleInternal(vertex);
+               }
+       }
+
+       /**
+        * Cleans up the operator stats cache if it contains timed out entries.
+        *
+        * <p>The Guava cache only evicts as maintenance during normal 
operations.
+        * If this handler is inactive, it will never be cleaned.
+        */
+       public void cleanUpOperatorStatsCache() {
+               operatorStatsCache.cleanUp();
+       }
+
+       /**
+        * Shuts down the stats tracker.
+        *
+        * <p>Invalidates the cache and clears all pending stats.
+        */
+       public void shutDown() {
+               synchronized (lock) {
+                       if (!shutDown) {
+                               operatorStatsCache.invalidateAll();
+                               pendingStats.clear();
+
+                               shutDown = true;
+                       }
+               }
+       }
+
+       /**
+        * Invalidates the cache (irrespective of clean up interval).
+        */
+       void invalidateOperatorStatsCache() {
+               operatorStatsCache.invalidateAll();
+       }
+
+       /**
+        * Callback on completed stack trace sample.
+        */
+       class StackTraceSampleCompletionCallback implements 
BiFunction<StackTraceSample, Throwable, Void> {
+
+               private final ExecutionJobVertex vertex;
+
+               public StackTraceSampleCompletionCallback(ExecutionJobVertex 
vertex) {
+                       this.vertex = vertex;
+               }
+
+               @Override
+               public Void apply(StackTraceSample stackTraceSample, Throwable 
throwable) {
+                       synchronized (lock) {
+                               try {
+                                       if (shutDown) {
+                                               return null;
+                                       }
+
+                                       // Job finished, ignore.
+                                       JobStatus jobState = 
vertex.getGraph().getState();
+                                       if (jobState.isGloballyTerminalState()) 
{
+                                               LOG.debug("Ignoring sample, 
because job is in state " + jobState + ".");
+                                       } else if (stackTraceSample != null) {
+                                               OperatorBackPressureStats stats 
= createStatsFromSample(stackTraceSample);
+                                               operatorStatsCache.put(vertex, 
stats);
+                                       } else {
+                                               LOG.debug("Failed to gather 
stack trace sample.", throwable);
+                                       }
+                               } catch (Throwable t) {
+                                       LOG.error("Error during stats 
completion.", t);
+                               } finally {
+                                       pendingStats.remove(vertex);
+                               }
+
+                               return null;
+                       }
+               }
+
+               /**
+                * Creates the back pressure stats from a stack trace sample.
+                *
+                * @param sample Stack trace sample to base stats on.
+                *
+                * @return Back pressure stats
+                */
+               private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample) {
+                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
traces = sample.getStackTraces();
+
+                       // Map task ID to subtask index, because the web 
interface expects
+                       // it like that.
+                       Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+                                       
.newHashMapWithExpectedSize(traces.size());
+
+                       Set<ExecutionAttemptID> sampledTasks = 
sample.getStackTraces().keySet();
+
+                       for (ExecutionVertex task : vertex.getTaskVertices()) {
+                               ExecutionAttemptID taskId = 
task.getCurrentExecutionAttempt().getAttemptId();
+                               if (sampledTasks.contains(taskId)) {
+                                       subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
+                               } else {
+                                       LOG.debug("Outdated sample. A task, 
which is part of the " +
+                                                       "sample has been 
reset.");
+                               }
+                       }
+
+                       // Ratio of blocked samples to total samples per sub 
task. Array
+                       // position corresponds to sub task index.
+                       double[] backPressureRatio = new double[traces.size()];
+
+                       for (Entry<ExecutionAttemptID, 
List<StackTraceElement[]>> entry : traces.entrySet()) {
+                               int backPressureSamples = 0;
+
+                               List<StackTraceElement[]> taskTraces = 
entry.getValue();
+
+                               for (StackTraceElement[] trace : taskTraces) {
+                                       for (int i = trace.length - 1; i >= 0; 
i--) {
+                                               StackTraceElement elem = 
trace[i];
+
+                                               if 
(elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+                                                               
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+                                                       backPressureSamples++;
+                                                       break; // Continue with 
next stack trace
+                                               }
+                                       }
+                               }
+
+                               int subtaskIndex = 
subtaskIndexMap.get(entry.getKey());
+
+                               int size = taskTraces.size();
+                               double ratio = (size > 0)
+                                               ? ((double) 
backPressureSamples) / size
+                                               : 0;
+
+                               backPressureRatio[subtaskIndex] = ratio;
+                       }
+
+                       return new OperatorBackPressureStats(
+                                       sample.getSampleId(),
+                                       sample.getEndTime(),
+                                       backPressureRatio);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
new file mode 100644
index 0000000..2a55d47
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/VoidBackPressureStatsTracker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.Optional;
+
+/**
+ * {@link BackPressureStatsTracker} implementation which always returns no 
back pressure statistics.
+ */
+public enum VoidBackPressureStatsTracker implements BackPressureStatsTracker {
+
+       INSTANCE {
+               @Override
+               public Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+                       return Optional.empty();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 945f05f..027ba5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -40,8 +40,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -125,9 +124,7 @@ public class JobMasterTest extends TestLogger {
                                FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
                                null,
                                null,
-                               new BackPressureStatsTracker(
-                                       new 
StackTraceSampleCoordinator(scheduledExecutor, testingTimeout.toMilliseconds()),
-                                       60000, 100, 60000, 
Time.milliseconds(50)));
+                               VoidBackPressureStatsTracker.INSTANCE);
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -226,9 +223,7 @@ public class JobMasterTest extends TestLogger {
                                FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()),
                                null,
                                null,
-                               new BackPressureStatsTracker(
-                                       new 
StackTraceSampleCoordinator(scheduledExecutor, testingTimeout.toMilliseconds()),
-                                       60000, 100, 60000, 
Time.milliseconds(50)));
+                               VoidBackPressureStatsTracker.INSTANCE);
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
index 4abaef3..8331d24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
 import org.apache.flink.util.TestLogger;
 
@@ -47,7 +47,7 @@ import static org.mockito.Mockito.when;
 public class JobVertexBackPressureHandlerTest extends TestLogger {
        @Test
        public void testGetPaths() {
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(mock(ExecutionGraphCache.class), 
Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(mock(ExecutionGraphCache.class), 
Executors.directExecutor(), mock(BackPressureStatsTrackerImpl.class), 0);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
@@ -57,7 +57,7 @@ public class JobVertexBackPressureHandlerTest extends 
TestLogger {
        @Test
        public void testResponseNoStatsAvailable() throws Exception {
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+               BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
 
                
when(statsTracker.getOperatorBackPressureStats(any(ExecutionJobVertex.class)))
                                .thenReturn(Optional.empty());
@@ -88,7 +88,7 @@ public class JobVertexBackPressureHandlerTest extends 
TestLogger {
        @Test
        public void testResponseStatsAvailable() throws Exception {
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+               BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
 
                OperatorBackPressureStats stats = new OperatorBackPressureStats(
                                0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });
@@ -150,7 +150,7 @@ public class JobVertexBackPressureHandlerTest extends 
TestLogger {
        @Test
        public void testResponsePassedRefreshInterval() throws Exception {
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
-               BackPressureStatsTracker statsTracker = 
mock(BackPressureStatsTracker.class);
+               BackPressureStatsTrackerImpl statsTracker = 
mock(BackPressureStatsTrackerImpl.class);
 
                OperatorBackPressureStats stats = new OperatorBackPressureStats(
                                0, System.currentTimeMillis(), new double[] { 
0.31, 0.48, 1.0, 0.0 });

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
deleted file mode 100644
index 9a0e9b6..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.backpressure;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-
-/**
- * Simple back pressured task test.
- */
-public class BackPressureStatsTrackerITCase extends TestLogger {
-
-       private static NetworkBufferPool networkBufferPool;
-       private static ActorSystem testActorSystem;
-
-       /** Shared as static variable with the test task. */
-       private static BufferPool testBufferPool;
-
-       @BeforeClass
-       public static void setup() {
-               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-               networkBufferPool = new NetworkBufferPool(100, 8192);
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(testActorSystem);
-               networkBufferPool.destroyAllBufferPools();
-               networkBufferPool.destroy();
-       }
-
-       /**
-        * Tests a simple fake-back pressured task. Back pressure is assumed 
when
-        * sampled stack traces are in blocking buffer requests.
-        */
-       @Test
-       public void testBackPressuredProducer() throws Exception {
-               new JavaTestKit(testActorSystem) {{
-                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
-
-                       // The JobGraph
-                       final JobGraph jobGraph = new JobGraph();
-                       final int parallelism = 4;
-
-                       final JobVertex task = new JobVertex("Task");
-                       task.setInvokableClass(BackPressuredTask.class);
-                       task.setParallelism(parallelism);
-
-                       jobGraph.addVertex(task);
-
-                       final Configuration config = new Configuration();
-
-                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                               config,
-                               TestingUtils.defaultExecutor());
-
-                       ActorGateway jobManger = null;
-                       ActorGateway taskManager = null;
-
-                       //
-                       // 1) Consume all buffers at first (no buffers for the 
test task)
-                       //
-                       testBufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
-                       final List<Buffer> buffers = new ArrayList<>();
-                       while (true) {
-                               Buffer buffer = testBufferPool.requestBuffer();
-                               if (buffer != null) {
-                                       buffers.add(buffer);
-                               } else {
-                                       break;
-                               }
-                       }
-
-                       try {
-                               jobManger = TestingUtils.createJobManager(
-                                       testActorSystem,
-                                       TestingUtils.defaultExecutor(),
-                                       TestingUtils.defaultExecutor(),
-                                       config,
-                                       highAvailabilityServices);
-
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
-
-                               taskManager = TestingUtils.createTaskManager(
-                                       testActorSystem,
-                                       highAvailabilityServices,
-                                       config,
-                                       true,
-                                       true);
-
-                               final ActorGateway jm = jobManger;
-
-                               new Within(deadline) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                                       // Submit the job and 
wait until it is running
-                                                       
JobClient.submitJobDetached(
-                                                                       new 
AkkaJobManagerGateway(jm),
-                                                                       config,
-                                                                       
jobGraph,
-                                                                       
Time.milliseconds(deadline.toMillis()),
-                                                                       
ClassLoader.getSystemClassLoader());
-
-                                                       jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
-
-                                                       expectMsgEquals(new 
AllVerticesRunning(jobGraph.getJobID()));
-
-                                                       // Get the 
ExecutionGraph
-                                                       jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
-
-                                                       ExecutionGraphFound 
executionGraphResponse =
-                                                                       
expectMsgClass(ExecutionGraphFound.class);
-
-                                                       ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
-                                                       ExecutionJobVertex 
vertex = executionGraph.getJobVertex(task.getID());
-
-                                                       
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
-                                                                       
testActorSystem.dispatcher(), 60000);
-
-                                                       // Verify back pressure 
(clean up interval can be ignored)
-                                                       
BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
-                                                               coordinator,
-                                                               100 * 1000,
-                                                               20,
-                                                               
Integer.MAX_VALUE,
-                                                               
Time.milliseconds(10L));
-
-                                                       int numAttempts = 10;
-
-                                                       int nextSampleId = 0;
-
-                                                       // Verify that all 
tasks are back pressured. This
-                                                       // can fail if the task 
takes longer to request
-                                                       // the buffer.
-                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
-                                                               try {
-                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
-
-                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
-                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
-                                                                       
Assert.assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
-
-                                                                       for 
(int i = 0; i < parallelism; i++) {
-                                                                               
Assert.assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
-                                                                       }
-
-                                                                       
nextSampleId = stats.getSampleId() + 1;
-
-                                                                       break;
-                                                               } catch 
(Throwable t) {
-                                                                       if 
(attempt == numAttempts - 1) {
-                                                                               
throw t;
-                                                                       } else {
-                                                                               
Thread.sleep(500);
-                                                                       }
-                                                               }
-                                                       }
-
-                                                       //
-                                                       // 2) Release all 
buffers and let the tasks grab one
-                                                       //
-                                                       for (Buffer buf : 
buffers) {
-                                                               
buf.recycleBuffer();
-                                                               
Assert.assertTrue(buf.isRecycled());
-                                                       }
-
-                                                       // Wait for all buffers 
to be available. The tasks
-                                                       // grab them and then 
immediately release them.
-                                                       while 
(testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
-                                                               
Thread.sleep(100);
-                                                       }
-
-                                                       // Verify that no task 
is back pressured any more.
-                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
-                                                               try {
-                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
-
-                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
-                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
-
-                                                                       // 
Verify that no task is back pressured
-                                                                       for 
(int i = 0; i < parallelism; i++) {
-                                                                               
Assert.assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
-                                                                       }
-
-                                                                       break;
-                                                               } catch 
(Throwable t) {
-                                                                       if 
(attempt == numAttempts - 1) {
-                                                                               
throw t;
-                                                                       } else {
-                                                                               
Thread.sleep(500);
-                                                                       }
-                                                               }
-                                                       }
-
-                                                       // Shut down
-                                                       jm.tell(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
-
-                                                       // Cancel job
-                                                       jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
-
-                                                       // Response to removal 
notification
-                                                       expectMsgEquals(true);
-
-                                                       //
-                                                       // 3) Trigger stats for 
archived job
-                                                       //
-                                                       
statsTracker.invalidateOperatorStatsCache();
-                                                       
Assert.assertFalse("Unexpected trigger", 
statsTracker.triggerStackTraceSample(vertex));
-
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                                       
Assert.fail(e.getMessage());
-                                               }
-                                       }
-                               };
-                       } finally {
-                               TestingUtils.stopActor(jobManger);
-                               TestingUtils.stopActor(taskManager);
-
-                               
highAvailabilityServices.closeAndCleanupAllData();
-
-                               testBufferPool.lazyDestroy();
-                       }
-               }};
-       }
-
-       /**
-        * Triggers a new stats sample.
-        */
-       private OperatorBackPressureStats triggerStatsSample(
-                       BackPressureStatsTracker statsTracker,
-                       ExecutionJobVertex vertex) throws InterruptedException {
-
-               statsTracker.invalidateOperatorStatsCache();
-               Assert.assertTrue("Failed to trigger", 
statsTracker.triggerStackTraceSample(vertex));
-
-               // Sleep minimum duration
-               Thread.sleep(20 * 10);
-
-               Optional<OperatorBackPressureStats> stats;
-
-               // Get the stats
-               while (!(stats = 
statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) {
-                       Thread.sleep(10);
-               }
-
-               return stats.get();
-       }
-
-       /**
-        * A back pressured producer sharing a {@link BufferPool} with the
-        * test driver.
-        */
-       public static class BackPressuredTask extends AbstractInvokable {
-
-               public BackPressuredTask(Environment environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       while (true) {
-                               final BufferBuilder bufferBuilder = 
testBufferPool.requestBufferBuilderBlocking();
-                               // Got a buffer, yay!
-                               bufferBuilder.build().recycleBuffer();
-
-                               new CountDownLatch(1).await();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
new file mode 100644
index 0000000..cb67d48
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+
+/**
+ * Simple back pressured task test.
+ */
+public class BackPressureStatsTrackerImplITCase extends TestLogger {
+
+       private static NetworkBufferPool networkBufferPool;
+       private static ActorSystem testActorSystem;
+
+       /** Shared as static variable with the test task. */
+       private static BufferPool testBufferPool;
+
+       @BeforeClass
+       public static void setup() {
+               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+               networkBufferPool = new NetworkBufferPool(100, 8192);
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(testActorSystem);
+               networkBufferPool.destroyAllBufferPools();
+               networkBufferPool.destroy();
+       }
+
+       /**
+        * Tests a simple fake-back pressured task. Back pressure is assumed 
when
+        * sampled stack traces are in blocking buffer requests.
+        */
+       @Test
+       public void testBackPressuredProducer() throws Exception {
+               new JavaTestKit(testActorSystem) {{
+                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
+
+                       // The JobGraph
+                       final JobGraph jobGraph = new JobGraph();
+                       final int parallelism = 4;
+
+                       final JobVertex task = new JobVertex("Task");
+                       task.setInvokableClass(BackPressuredTask.class);
+                       task.setParallelism(parallelism);
+
+                       jobGraph.addVertex(task);
+
+                       final Configuration config = new Configuration();
+
+                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                               config,
+                               TestingUtils.defaultExecutor());
+
+                       ActorGateway jobManger = null;
+                       ActorGateway taskManager = null;
+
+                       //
+                       // 1) Consume all buffers at first (no buffers for the 
test task)
+                       //
+                       testBufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
+                       final List<Buffer> buffers = new ArrayList<>();
+                       while (true) {
+                               Buffer buffer = testBufferPool.requestBuffer();
+                               if (buffer != null) {
+                                       buffers.add(buffer);
+                               } else {
+                                       break;
+                               }
+                       }
+
+                       try {
+                               jobManger = TestingUtils.createJobManager(
+                                       testActorSystem,
+                                       TestingUtils.defaultExecutor(),
+                                       TestingUtils.defaultExecutor(),
+                                       config,
+                                       highAvailabilityServices);
+
+                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+                               taskManager = TestingUtils.createTaskManager(
+                                       testActorSystem,
+                                       highAvailabilityServices,
+                                       config,
+                                       true,
+                                       true);
+
+                               final ActorGateway jm = jobManger;
+
+                               new Within(deadline) {
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                                                       // Submit the job and 
wait until it is running
+                                                       
JobClient.submitJobDetached(
+                                                                       new 
AkkaJobManagerGateway(jm),
+                                                                       config,
+                                                                       
jobGraph,
+                                                                       
Time.milliseconds(deadline.toMillis()),
+                                                                       
ClassLoader.getSystemClassLoader());
+
+                                                       jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+                                                       expectMsgEquals(new 
AllVerticesRunning(jobGraph.getJobID()));
+
+                                                       // Get the 
ExecutionGraph
+                                                       jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
+
+                                                       ExecutionGraphFound 
executionGraphResponse =
+                                                                       
expectMsgClass(ExecutionGraphFound.class);
+
+                                                       ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
+                                                       ExecutionJobVertex 
vertex = executionGraph.getJobVertex(task.getID());
+
+                                                       
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+                                                                       
testActorSystem.dispatcher(), 60000);
+
+                                                       // Verify back pressure 
(clean up interval can be ignored)
+                                                       
BackPressureStatsTrackerImpl statsTracker = new BackPressureStatsTrackerImpl(
+                                                               coordinator,
+                                                               100 * 1000,
+                                                               20,
+                                                               
Integer.MAX_VALUE,
+                                                               
Time.milliseconds(10L));
+
+                                                       int numAttempts = 10;
+
+                                                       int nextSampleId = 0;
+
+                                                       // Verify that all 
tasks are back pressured. This
+                                                       // can fail if the task 
takes longer to request
+                                                       // the buffer.
+                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
+                                                               try {
+                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+                                                                       
Assert.assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
+
+                                                                       for 
(int i = 0; i < parallelism; i++) {
+                                                                               
Assert.assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
+                                                                       }
+
+                                                                       
nextSampleId = stats.getSampleId() + 1;
+
+                                                                       break;
+                                                               } catch 
(Throwable t) {
+                                                                       if 
(attempt == numAttempts - 1) {
+                                                                               
throw t;
+                                                                       } else {
+                                                                               
Thread.sleep(500);
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       //
+                                                       // 2) Release all 
buffers and let the tasks grab one
+                                                       //
+                                                       for (Buffer buf : 
buffers) {
+                                                               
buf.recycleBuffer();
+                                                               
Assert.assertTrue(buf.isRecycled());
+                                                       }
+
+                                                       // Wait for all buffers 
to be available. The tasks
+                                                       // grab them and then 
immediately release them.
+                                                       while 
(testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
+                                                               
Thread.sleep(100);
+                                                       }
+
+                                                       // Verify that no task 
is back pressured any more.
+                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
+                                                               try {
+                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+
+                                                                       // 
Verify that no task is back pressured
+                                                                       for 
(int i = 0; i < parallelism; i++) {
+                                                                               
Assert.assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
+                                                                       }
+
+                                                                       break;
+                                                               } catch 
(Throwable t) {
+                                                                       if 
(attempt == numAttempts - 1) {
+                                                                               
throw t;
+                                                                       } else {
+                                                                               
Thread.sleep(500);
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       // Shut down
+                                                       jm.tell(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
+
+                                                       // Cancel job
+                                                       jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+                                                       // Response to removal 
notification
+                                                       expectMsgEquals(true);
+
+                                                       //
+                                                       // 3) Trigger stats for 
archived job
+                                                       //
+                                                       
statsTracker.invalidateOperatorStatsCache();
+                                                       
Assert.assertFalse("Unexpected trigger", 
statsTracker.triggerStackTraceSample(vertex));
+
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                                       
Assert.fail(e.getMessage());
+                                               }
+                                       }
+                               };
+                       } finally {
+                               TestingUtils.stopActor(jobManger);
+                               TestingUtils.stopActor(taskManager);
+
+                               
highAvailabilityServices.closeAndCleanupAllData();
+
+                               testBufferPool.lazyDestroy();
+                       }
+               }};
+       }
+
+       /**
+        * Triggers a new stats sample.
+        */
+       private OperatorBackPressureStats triggerStatsSample(
+                       BackPressureStatsTrackerImpl statsTracker,
+                       ExecutionJobVertex vertex) throws InterruptedException {
+
+               statsTracker.invalidateOperatorStatsCache();
+               Assert.assertTrue("Failed to trigger", 
statsTracker.triggerStackTraceSample(vertex));
+
+               // Sleep minimum duration
+               Thread.sleep(20 * 10);
+
+               Optional<OperatorBackPressureStats> stats;
+
+               // Get the stats
+               while (!(stats = 
statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) {
+                       Thread.sleep(10);
+               }
+
+               return stats.get();
+       }
+
+       /**
+        * A back pressured producer sharing a {@link BufferPool} with the
+        * test driver.
+        */
+       public static class BackPressuredTask extends AbstractInvokable {
+
+               public BackPressuredTask(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       while (true) {
+                               final BufferBuilder bufferBuilder = 
testBufferPool.requestBufferBuilderBlocking();
+                               // Got a buffer, yay!
+                               bufferBuilder.build().recycleBuffer();
+
+                               new CountDownLatch(1).await();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
new file mode 100644
index 0000000..e418980
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the BackPressureStatsTrackerImpl.
+ */
+public class BackPressureStatsTrackerImplTest extends TestLogger {
+
+       /** Tests simple statistics with fake stack traces. */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTriggerStackTraceSample() throws Exception {
+               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
+
+               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
+               Mockito.when(sampleCoordinator.triggerStackTraceSample(
+                               Matchers.any(ExecutionVertex[].class),
+                               Matchers.anyInt(),
+                               Matchers.any(Time.class),
+                               Matchers.anyInt())).thenReturn(sampleFuture);
+
+               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
+               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
+
+               // Same Thread execution context
+               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
+
+                       @Override
+                       public void execute(Runnable runnable) {
+                               runnable.run();
+                       }
+               });
+
+               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
+               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
+               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
+               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
+               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
+               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
+               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
+               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+
+               int numSamples = 100;
+               Time delayBetweenSamples = Time.milliseconds(100L);
+
+               BackPressureStatsTrackerImpl tracker = new 
BackPressureStatsTrackerImpl(
+                               sampleCoordinator, 9999, numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
+
+               // getOperatorBackPressureStats triggers stack trace sampling
+               
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
+                               Matchers.eq(taskVertices),
+                               Matchers.eq(numSamples),
+                               Matchers.eq(delayBetweenSamples),
+                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
+
+               // Request back pressure stats again. This should not trigger 
another sample request
+               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
+                               Matchers.eq(taskVertices),
+                               Matchers.eq(numSamples),
+                               Matchers.eq(delayBetweenSamples),
+                               
Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH));
+
+               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               // Complete the future
+               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
+               for (ExecutionVertex vertex : taskVertices) {
+                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
+
+                       for (int i = 0; i < taskVertices.length; i++) {
+                               // Traces until sub task index are back 
pressured
+                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
+                       }
+
+                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
+               }
+
+               int sampleId = 1231;
+               int endTime = 841;
+
+               StackTraceSample sample = new StackTraceSample(
+                               sampleId,
+                               0,
+                               endTime,
+                               traces);
+
+               // Succeed the promise
+               sampleFuture.complete(sample);
+
+               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+
+               // Verify the stats
+               Assert.assertEquals(sampleId, stats.getSampleId());
+               Assert.assertEquals(endTime, stats.getEndTimestamp());
+               Assert.assertEquals(taskVertices.length, 
stats.getNumberOfSubTasks());
+
+               for (int i = 0; i < taskVertices.length; i++) {
+                       double ratio = stats.getBackPressureRatio(i);
+                       // Traces until sub task index are back pressured
+                       Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+               }
+       }
+
+       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
+               if (isBackPressure) {
+                       return new StackTraceElement[] { new StackTraceElement(
+                                       
BackPressureStatsTrackerImpl.EXPECTED_CLASS_NAME,
+                                       
BackPressureStatsTrackerImpl.EXPECTED_METHOD_NAME,
+                                       "LocalBufferPool.java",
+                                       133) };
+               } else {
+                       return Thread.currentThread().getStackTrace();
+               }
+       }
+
+       private ExecutionVertex mockExecutionVertex(
+                       ExecutionJobVertex jobVertex,
+                       int subTaskIndex) {
+
+               Execution exec = Mockito.mock(Execution.class);
+               Mockito.when(exec.getAttemptId()).thenReturn(new 
ExecutionAttemptID());
+
+               JobVertexID id = jobVertex.getJobVertexId();
+
+               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+               Mockito.when(vertex.getJobvertexId()).thenReturn(id);
+               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+               
Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+
+               return vertex;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f719befa/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
deleted file mode 100644
index 0bbf5f1..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.backpressure;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Tests for the BackPressureStatsTracker.
- */
-public class BackPressureStatsTrackerTest extends TestLogger {
-
-       /** Tests simple statistics with fake stack traces. */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTriggerStackTraceSample() throws Exception {
-               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
-
-               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
-               Mockito.when(sampleCoordinator.triggerStackTraceSample(
-                               Matchers.any(ExecutionVertex[].class),
-                               Matchers.anyInt(),
-                               Matchers.any(Time.class),
-                               Matchers.anyInt())).thenReturn(sampleFuture);
-
-               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
-               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
-
-               // Same Thread execution context
-               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
-
-                       @Override
-                       public void execute(Runnable runnable) {
-                               runnable.run();
-                       }
-               });
-
-               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
-
-               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
-               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
-               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
-               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
-               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
-
-               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
-               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
-               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
-               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
-
-               int numSamples = 100;
-               Time delayBetweenSamples = Time.milliseconds(100L);
-
-               BackPressureStatsTracker tracker = new BackPressureStatsTracker(
-                               sampleCoordinator, 9999, numSamples, 
Integer.MAX_VALUE, delayBetweenSamples);
-
-               // getOperatorBackPressureStats triggers stack trace sampling
-               
Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
-
-               // Request back pressure stats again. This should not trigger 
another sample request
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               Mockito.verify(sampleCoordinator, 
Mockito.times(1)).triggerStackTraceSample(
-                               Matchers.eq(taskVertices),
-                               Matchers.eq(numSamples),
-                               Matchers.eq(delayBetweenSamples),
-                               
Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
-
-               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               // Complete the future
-               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
-               for (ExecutionVertex vertex : taskVertices) {
-                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
-
-                       for (int i = 0; i < taskVertices.length; i++) {
-                               // Traces until sub task index are back 
pressured
-                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
-                       }
-
-                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
-               }
-
-               int sampleId = 1231;
-               int endTime = 841;
-
-               StackTraceSample sample = new StackTraceSample(
-                               sampleId,
-                               0,
-                               endTime,
-                               traces);
-
-               // Succeed the promise
-               sampleFuture.complete(sample);
-
-               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
-
-               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
-
-               // Verify the stats
-               Assert.assertEquals(sampleId, stats.getSampleId());
-               Assert.assertEquals(endTime, stats.getEndTimestamp());
-               Assert.assertEquals(taskVertices.length, 
stats.getNumberOfSubTasks());
-
-               for (int i = 0; i < taskVertices.length; i++) {
-                       double ratio = stats.getBackPressureRatio(i);
-                       // Traces until sub task index are back pressured
-                       Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
-               }
-       }
-
-       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
-               if (isBackPressure) {
-                       return new StackTraceElement[] { new StackTraceElement(
-                                       
BackPressureStatsTracker.EXPECTED_CLASS_NAME,
-                                       
BackPressureStatsTracker.EXPECTED_METHOD_NAME,
-                                       "LocalBufferPool.java",
-                                       133) };
-               } else {
-                       return Thread.currentThread().getStackTrace();
-               }
-       }
-
-       private ExecutionVertex mockExecutionVertex(
-                       ExecutionJobVertex jobVertex,
-                       int subTaskIndex) {
-
-               Execution exec = Mockito.mock(Execution.class);
-               Mockito.when(exec.getAttemptId()).thenReturn(new 
ExecutionAttemptID());
-
-               JobVertexID id = jobVertex.getJobVertexId();
-
-               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
-               Mockito.when(vertex.getJobvertexId()).thenReturn(id);
-               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-               
Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
-
-               return vertex;
-       }
-
-}

Reply via email to