tillrohrmann commented on a change in pull request #12980:
URL: https://github.com/apache/flink/pull/12980#discussion_r460156112



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
        /** The closing future is completed when the slot is freed and closed. 
*/
        private final CompletableFuture<Void> closingFuture;
 
+       /**
+        * {@link ExecutorService} for background actions, e.g. verify all 
managed memory released.
+        */
+       private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
        private final Executor asyncExecutor;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
        // max. number of sleeps during try-reserving with exponentially
        // increasing delay before throwing OutOfMemoryError:
-       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms 
~ 2 s)
        // which means that MemoryReservationException will be thrown after 1 s 
of trying

Review comment:
       ```suggestion
        // which means that MemoryReservationException will be thrown after 2 s 
of trying
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
        /** The closing future is completed when the slot is freed and closed. 
*/
        private final CompletableFuture<Void> closingFuture;
 
+       /**
+        * {@link ExecutorService} for background actions, e.g. verify all 
managed memory released.

Review comment:
       ```suggestion
         * {@link Executor} for background actions, e.g. verify all managed 
memory released.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
                slotActions = null;
                state = State.CREATED;
                closingFuture = new CompletableFuture<>();
+
+               asyncExecutor = new ThreadPoolExecutor(
+                       0,
+                       numberSlots,
+                       60L, TimeUnit.SECONDS,
+                       new SynchronousQueue<>());

Review comment:
       ```suggestion
                        new SynchronousQueue<>(),
                        new ExecutorThreadFactory("task-slot-memory-verifier"));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
                                // and set the slot state to releasing so that 
it gets eventually freed
                                tasks.values().forEach(task -> 
task.failExternally(cause));
                        }
+
                        final CompletableFuture<Void> cleanupFuture = 
FutureUtils
                                
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
-                               .thenRun(() -> {
-                                       verifyMemoryFreed();
-                                       this.memoryManager.shutdown();
-                               });
-
+                               .thenRun(memoryManager::shutdown);
+                       verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);

Review comment:
       Maybe add a test which ensures that we can call 
`MemoryManager.verifyEmpty()` after it has been shut down. That way we 
explicitly state that this an explicit contract we want to guard.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
                slotActions = null;
                state = State.CREATED;
                closingFuture = new CompletableFuture<>();
+
+               asyncExecutor = new ThreadPoolExecutor(

Review comment:
       What spoke against using the `ioExecutor` in `TaskManagerSharedServices` 
when creating the `TaskSlotTableImpl`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
                                // and set the slot state to releasing so that 
it gets eventually freed
                                tasks.values().forEach(task -> 
task.failExternally(cause));
                        }
+
                        final CompletableFuture<Void> cleanupFuture = 
FutureUtils
                                
.waitForAll(tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList()))
-                               .thenRun(() -> {
-                                       verifyMemoryFreed();
-                                       this.memoryManager.shutdown();
-                               });
-
+                               .thenRun(memoryManager::shutdown);
+                       verifyAllManagedMemoryIsReleasedAfter(cleanupFuture);
                        FutureUtils.forward(cleanupFuture, closingFuture);
                }
                return closingFuture;
        }
 
-       private void verifyMemoryFreed() {
-               if (!memoryManager.verifyEmpty()) {
-                       LOG.warn("Not all slot memory is freed, potential 
memory leak at {}", this);
-               }
+       private void 
verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after) {
+               FutureUtils.runAfterwardsAsync(
+                       after,
+                       () -> {
+                               if (!memoryManager.verifyEmpty()) {
+                                       LOG.warn("Not all slot memory is freed, 
potential memory leak at {}", this);
+                               }
+                       },
+                       asyncExecutor);

Review comment:
       Does it make sense to only run this action if `after` has completed 
normally. E.g. using `after.thenRunAsync(() -> ..., asyncExecutor)`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -84,15 +85,22 @@
        /** The closing future is completed when the slot is freed and closed. 
*/
        private final CompletableFuture<Void> closingFuture;
 
+       /**
+        * {@link ExecutorService} for background actions, e.g. verify all 
managed memory released.
+        */
+       private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
        private final ExecutorService memoryVerificationExecutor;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -107,6 +111,11 @@
                "TaskSlotTableImpl is not initialized with proper main thread 
executor, " +
                        "call to TaskSlotTableImpl#start is required");
 
+       /**
+        * {@link ExecutorService} for background actions, e.g. verify all 
managed memory released.
+        */
+       private final ExecutorService asyncExecutor;

Review comment:
       ```suggestion
        private final ExecutorService memoryVerificationExecutor;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to