tillrohrmann commented on a change in pull request #12980:
URL: https://github.com/apache/flink/pull/12980#discussion_r460156112
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
/** The closing future is completed when the slot is freed and closed.
*/
private final CompletableFuture<Void> closingFuture;
+ /**
+ * {@link ExecutorService} for background actions, e.g. verify all
managed memory released.
+ */
+ private final ExecutorService asyncExecutor;
Review comment:
```suggestion
private final Executor asyncExecutor;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
- // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+ // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms
~ 2 s)
// which means that MemoryReservationException will be thrown after 1 s
of trying
Review comment:
```suggestion
// which means that MemoryReservationException will be thrown after 2 s
of trying
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
/** The closing future is completed when the slot is freed and closed.
*/
private final CompletableFuture<Void> closingFuture;
+ /**
+ * {@link ExecutorService} for background actions, e.g. verify all
managed memory released.
Review comment:
```suggestion
* {@link Executor} for background actions, e.g. verify all managed
memory released.
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
slotActions = null;
state = State.CREATED;
closingFuture = new CompletableFuture<>();
+
+ asyncExecutor = new ThreadPoolExecutor(
+ 0,
+ numberSlots,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>());
Review comment:
```suggestion
new SynchronousQueue<>(),
new ExecutorThreadFactory("task-slot-memory-verifier"));
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
// and set the slot state to releasing so that
it gets eventually freed
tasks.values().forEach(task ->
task.failExternally(cause));
}
+
final CompletableFuture<Void> cleanupFuture =
FutureUtils
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
- .thenRun(() -> {
- verifyMemoryFreed();
- this.memoryManager.shutdown();
- });
-
+ .thenRun(memoryManager::shutdown);
+ verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);
Review comment:
Maybe add a test which ensures that we can call
`MemoryManager.verifyEmpty()` after it has been shut down. That way we
explicitly state that this an explicit contract we want to guard.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
slotActions = null;
state = State.CREATED;
closingFuture = new CompletableFuture<>();
+
+ asyncExecutor = new ThreadPoolExecutor(
Review comment:
What spoke against using the `ioExecutor` in `TaskManagerSharedServices`
when creating the `TaskSlotTableImpl`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
// and set the slot state to releasing so that
it gets eventually freed
tasks.values().forEach(task ->
task.failExternally(cause));
}
+
final CompletableFuture<Void> cleanupFuture =
FutureUtils
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
- .thenRun(() -> {
- verifyMemoryFreed();
- this.memoryManager.shutdown();
- });
-
+ .thenRun(memoryManager::shutdown);
+ verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);
FutureUtils.forward(cleanupFuture, closingFuture);
}
return closingFuture;
}
- private void verifyMemoryFreed() {
- if (!memoryManager.verifyEmpty()) {
- LOG.warn("Not all slot memory is freed, potential
memory leak at {}", this);
- }
+ private void
verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after) {
+ FutureUtils.runAfterwardsAsync(
+ after,
+ () -> {
+ if (!memoryManager.verifyEmpty()) {
+ LOG.warn("Not all slot memory is freed,
potential memory leak at {}", this);
+ }
+ },
+ asyncExecutor);
Review comment:
Does it make sense to only run this action if `after` has completed
normally. E.g. using `after.thenRunAsync(() -> ..., asyncExecutor)`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
/** The closing future is completed when the slot is freed and closed.
*/
private final CompletableFuture<Void> closingFuture;
+ /**
+ * {@link ExecutorService} for background actions, e.g. verify all
managed memory released.
+ */
+ private final ExecutorService asyncExecutor;
Review comment:
```suggestion
private final ExecutorService memoryVerificationExecutor;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -107,6 +111,11 @@
"TaskSlotTableImpl is not initialized with proper main thread
executor, " +
"call to TaskSlotTableImpl#start is required");
+ /**
+ * {@link ExecutorService} for background actions, e.g. verify all
managed memory released.
+ */
+ private final ExecutorService asyncExecutor;
Review comment:
```suggestion
private final ExecutorService memoryVerificationExecutor;
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]