zhijiangW commented on a change in pull request #10083:
[FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs.
URL: https://github.com/apache/flink/pull/10083#discussion_r342965035
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureSampleCoordinator.java
##########
@@ -63,57 +64,55 @@
/** Time out after the expected sampling duration. */
private final long sampleTimeout;
- /** In progress samples (guarded by lock). */
- private final Map<Integer, PendingStackTraceSample> pendingSamples =
new HashMap<>();
+ /** In progress samples. */
+ @GuardedBy("lock")
+ private final Map<Integer, PendingTaskBackPressureStats> pendingSamples
= new HashMap<>();
/** A list of recent sample IDs to identify late messages vs. invalid
ones. */
private final ArrayDeque<Integer> recentPendingSamples = new
ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
- /** Sample ID counter (guarded by lock). */
+ /** Sample ID counter. */
+ @GuardedBy("lock")
private int sampleIdCounter;
/**
- * Flag indicating whether the coordinator is still running (guarded by
- * lock).
+ * Flag indicating whether the coordinator is still running.
*/
+ @GuardedBy("lock")
private boolean isShutDown;
/**
* Creates a new coordinator for the job.
*
- * @param executor to use to execute the futures
+ * @param executor Used to execute the futures.
* @param sampleTimeout Time out after the expected sampling duration.
* This is added to the expected duration of a
* sample, which is determined by the number of
* samples and the delay between each sample.
*/
- public StackTraceSampleCoordinator(Executor executor, long
sampleTimeout) {
+ public BackPressureSampleCoordinator(Executor executor, long
sampleTimeout) {
checkArgument(sampleTimeout >= 0L);
- this.executor = Preconditions.checkNotNull(executor);
+ this.executor = checkNotNull(executor);
this.sampleTimeout = sampleTimeout;
}
/**
- * Triggers a stack trace sample to all tasks.
+ * Triggers a task back pressure stats sample to all tasks.
*
* @param tasksToSample Tasks to sample.
- * @param numSamples Number of stack trace samples to collect.
+ * @param numSamples Number of samples per task.
* @param delayBetweenSamples Delay between consecutive samples.
- * @param maxStackTraceDepth Maximum depth of the stack trace. 0
indicates
- * no maximum and keeps the complete stack
trace.
- * @return A future of the completed stack trace sample
+ * @return A future of the completed task back pressure stats sample
*/
@SuppressWarnings("unchecked")
- public CompletableFuture<StackTraceSample> triggerStackTraceSample(
+ CompletableFuture<BackPressureStats> triggerTaskBackPressureSample(
ExecutionVertex[] tasksToSample,
int numSamples,
Review comment:
`numSamples` and `delayBetweenSamples` should be passed into constructor of
this class instead of passing into constructor of
`BackPressureStatsTrackerImpl`. Because these arguments are only used inside
coordinator.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services