zhijiangW commented on a change in pull request #10083:
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r343999496
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImpl.java
##########
@@ -47,108 +47,63 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Back pressure statistics tracker.
- *
- * <p>Back pressure is determined by sampling running tasks. If a task is
- * slowed down by back pressure it will be stuck in memory requests to a
- * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
- *
- * <p>The back pressured stack traces look like this:
- *
- * <pre>
- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
<--- BLOCKING
- * request
- * [...]
- * </pre>
+ * Back pressure statistics tracker. See {@link
org.apache.flink.runtime.taskexecutor.BackPressureSampleService}
+ * for more details about how back pressure ratio of a task is calculated.
*/
public class BackPressureStatsTrackerImpl implements BackPressureStatsTracker {
private static final Logger LOG =
LoggerFactory.getLogger(BackPressureStatsTrackerImpl.class);
- /** Maximum stack trace depth for samples. */
- static final int MAX_STACK_TRACE_DEPTH = 8;
-
- /** Expected class name for back pressure indicating stack trace
element. */
- static final String EXPECTED_CLASS_NAME =
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
-
- /** Expected method name for back pressure indicating stack trace
element. */
- static final String EXPECTED_METHOD_NAME =
"requestBufferBuilderBlocking";
-
/** Lock guarding trigger operations. */
private final Object lock = new Object();
- /* Stack trace sample coordinator. */
- private final StackTraceSampleCoordinator coordinator;
+ private final BackPressureRequestCoordinator coordinator;
/**
* Completed stats. Important: Job vertex IDs need to be scoped by job
ID,
- * because they are potentially constant across runs messing up the
cached
- * data.
+ * because they are potentially constant across runs which may mess up
the
+ * cached data.
*/
private final Cache<ExecutionJobVertex, OperatorBackPressureStats>
operatorStatsCache;
- /** Pending in progress stats. Important: Job vertex IDs need to be
scoped
- * by job ID, because they are potentially constant across runs messing
up
- * the cached data.*/
+ /**
+ * Pending in progress stats. Important: Job vertex IDs need to be
scoped
+ * by job ID, because they are potentially constant across runs which
may
+ * mess up the cached data.
+ */
private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
- /** Cleanup interval for completed stats cache. */
- private final int cleanUpInterval;
-
- private final int numSamples;
-
private final int backPressureStatsRefreshInterval;
- private final Time delayBetweenSamples;
-
/** Flag indicating whether the stats tracker has been shut down. */
+ @GuardedBy("lock")
private boolean shutDown;
/**
* Creates a back pressure statistics tracker.
*
* @param cleanUpInterval Clean up interval for completed stats.
- * @param numSamples Number of stack trace samples when
determining back pressure.
- * @param delayBetweenSamples Delay between samples when determining
back pressure.
+ * @param backPressureStatsRefreshInterval
Review comment:
add one more param for `coordinator` and give some comments for
`backPressureStatsRefreshInterval`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services