zentol commented on a change in pull request #12981:
URL: https://github.com/apache/flink/pull/12981#discussion_r459982395



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
        // max. number of sleeps during try-reserving with exponentially
        // increasing delay before throwing OutOfMemoryError:
-       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms 
~ 2 s)
        // which means that MemoryReservationException will be thrown after 1 s 
of trying

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -147,8 +147,10 @@ void reserveMemory(long size, int maxSleeps) throws 
MemoryReservationException {
                        }
 
                        // no luck
-                       throw new MemoryReservationException(
-                               String.format("Could not allocate %d bytes, 
only %d bytes are remaining", size, availableOrReserved));
+                       throw new MemoryReservationException(String.format(
+                               "Could not allocate %d bytes, only %d bytes are 
remaining, try to upgrade to Java 8u72 or higher",

Review comment:
       I wouldn't change these; right here the result is ambiguous and isn't 
necessarily due to the Java version.
   If the Java version is the problem, then we have already logged a warning 
when the process was started.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
        // max. number of sleeps during try-reserving with exponentially
        // increasing delay before throwing OutOfMemoryError:
-       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms 
~ 2 s)
        // which means that MemoryReservationException will be thrown after 1 s 
of trying
-       private static final int MAX_SLEEPS = 10;
+       private static final int MAX_SLEEPS = 11;
+       private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // (total (128 x 
1024) - 1 ms ~ 2 min)

Review comment:
       ```suggestion
        private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // ((2^17) - 1 
ms ~ 2 min total sleep duration)
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
        // max. number of sleeps during try-reserving with exponentially
        // increasing delay before throwing OutOfMemoryError:
-       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms 
~ 2 s)
        // which means that MemoryReservationException will be thrown after 1 s 
of trying
-       private static final int MAX_SLEEPS = 10;
+       private static final int MAX_SLEEPS = 11;

Review comment:
       ```suggestion
        private static final int MAX_SLEEPS = 11;  // ((2^11) - 1 ms ~ 2 s  
total sleep duration)
   ```
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -61,7 +62,8 @@ long getAvailableMemorySize() {
 
        boolean verifyEmpty() {
                try {
-                       reserveMemory(totalMemorySize);
+                       // we wait longer as we have to GC all memory, 
allocated by task, to perform the verification

Review comment:
       wait longer than _what_?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
                slotActions = null;
                state = State.CREATED;
                closingFuture = new CompletableFuture<>();
+
+               asyncExecutor = new ThreadPoolExecutor(

Review comment:
       All executors should use a 
`org.apache.flink.runtime.util.ExecutorThreadFactory`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -122,15 +128,15 @@ void reserveMemory(long size) throws 
MemoryReservationException {
                                if (availableOrReserved >= size) {
                                        return;
                                }
-                               if (sleeps >= MAX_SLEEPS) {
+                               if (sleeps >= maxSleeps) {
                                        break;
                                }
-                               if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
-                                       // trigger again VM's Reference 
processing if we have to wait longer
-                                       System.gc();
-                               }
                                try {
                                        if 
(!JavaGcCleanerWrapper.tryRunPendingCleaners()) {
+                                               if (sleeps >= 
RETRIGGER_GC_AFTER_SLEEPS) {

Review comment:
       Is this just an optimization to skip 1 GC call?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -318,7 +318,10 @@ private void 
verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after
                        after,
                        () -> {
                                if (!memoryManager.verifyEmpty()) {
-                                       LOG.warn("Not all slot memory is freed, 
potential memory leak at {}", this);
+                                       LOG.warn(
+                                               "Not all slot managed memory is 
freed, potential memory leak at {}, " +

Review comment:
       same as above

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
 class UnsafeMemoryBudget {
        // max. number of sleeps during try-reserving with exponentially
        // increasing delay before throwing OutOfMemoryError:
-       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+       // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms 
~ 2 s)

Review comment:
       move the total calculation to the `MAX_SLEEPS` line; this section is now 
about the general concept of exponential delays, and shouldn't use examples for 
a particular sleep count.

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -303,32 +302,38 @@ private Runnable create(Object owner, Runnable 
cleanupOperation) {
        private static class PendingCleanersRunnerProvider {
                private static final String REFERENCE_CLASS = 
"java.lang.ref.Reference";
                private final String cleanerName;
-               private final ReflectionUtils reflectionUtils;
                private final String waitForReferenceProcessingName;
                private final Object[] waitForReferenceProcessingArgs;
                private final Class<?>[] waitForReferenceProcessingArgTypes;
 
                private PendingCleanersRunnerProvider(
                        String cleanerName,
-                       ReflectionUtils reflectionUtils,
                        String waitForReferenceProcessingName,
                        Object[] waitForReferenceProcessingArgs,
                        Class<?>[] waitForReferenceProcessingArgTypes) {
                        this.cleanerName = cleanerName;
-                       this.reflectionUtils = reflectionUtils;
                        this.waitForReferenceProcessingName = 
waitForReferenceProcessingName;
                        this.waitForReferenceProcessingArgs = 
waitForReferenceProcessingArgs;
                        this.waitForReferenceProcessingArgTypes = 
waitForReferenceProcessingArgTypes;
                }
 
+               @Nullable
                private PendingCleanersRunner createPendingCleanersRunner() {

Review comment:
       maybe use an `Optional` instead and use `Optional#orElse(null)` at the 
call site.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
                                // and set the slot state to releasing so that 
it gets eventually freed
                                tasks.values().forEach(task -> 
task.failExternally(cause));
                        }
+
                        final CompletableFuture<Void> cleanupFuture = 
FutureUtils

Review comment:
       maybe rename this to `shutdownFuture`? We aren't quite sure whether the 
cleanup was successful.

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -303,32 +302,38 @@ private Runnable create(Object owner, Runnable 
cleanupOperation) {
        private static class PendingCleanersRunnerProvider {
                private static final String REFERENCE_CLASS = 
"java.lang.ref.Reference";
                private final String cleanerName;
-               private final ReflectionUtils reflectionUtils;
                private final String waitForReferenceProcessingName;
                private final Object[] waitForReferenceProcessingArgs;
                private final Class<?>[] waitForReferenceProcessingArgTypes;
 
                private PendingCleanersRunnerProvider(
                        String cleanerName,
-                       ReflectionUtils reflectionUtils,
                        String waitForReferenceProcessingName,
                        Object[] waitForReferenceProcessingArgs,
                        Class<?>[] waitForReferenceProcessingArgTypes) {
                        this.cleanerName = cleanerName;
-                       this.reflectionUtils = reflectionUtils;
                        this.waitForReferenceProcessingName = 
waitForReferenceProcessingName;
                        this.waitForReferenceProcessingArgs = 
waitForReferenceProcessingArgs;
                        this.waitForReferenceProcessingArgTypes = 
waitForReferenceProcessingArgTypes;
                }
 
+               @Nullable
                private PendingCleanersRunner createPendingCleanersRunner() {
-                       Class<?> referenceClass = 
reflectionUtils.findClass(REFERENCE_CLASS);
-                       Method waitForReferenceProcessingMethod = 
reflectionUtils.findMethod(
-                               referenceClass,
-                               waitForReferenceProcessingName,
-                               waitForReferenceProcessingArgTypes);
-                       waitForReferenceProcessingMethod.setAccessible(true);
-                       return new 
PendingCleanersRunner(waitForReferenceProcessingMethod, 
waitForReferenceProcessingArgs);
+                       try {
+                               Class<?> referenceClass = 
Class.forName(REFERENCE_CLASS);
+                               Method waitForReferenceProcessingMethod = 
referenceClass.getDeclaredMethod(
+                                       waitForReferenceProcessingName,
+                                       waitForReferenceProcessingArgTypes);
+                               
waitForReferenceProcessingMethod.setAccessible(true);
+                               return new 
PendingCleanersRunner(waitForReferenceProcessingMethod, 
waitForReferenceProcessingArgs);
+                       } catch (ClassNotFoundException | NoSuchMethodException 
e) {

Review comment:
       Why  did you opt for not using ` ReflectionUtils` instead of catching 
the `FlinkRuntimeException` and checking the type of the cause, or introducing 
some sort of optional failure handler to `findClass()/findMethod()` (like 
`findClass(String clazz, FunctionWithException<Class<?>, 
FlinkRuntimeException>)`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
                slotActions = null;
                state = State.CREATED;
                closingFuture = new CompletableFuture<>();
+
+               asyncExecutor = new ThreadPoolExecutor(
+                       0,
+                       numberSlots,
+                       60L, TimeUnit.SECONDS,
+                       new SynchronousQueue<>());

Review comment:
       Whenever you use an executor that is not provided via some factory 
method in `java.util.concurrent.Executors` I would recommend writing down some 
tests for it to ensure it behaves exactly the way you expect it to.
   This right here is probably fine since we know the upper limit of possible 
tasks to be queued (== 1 task per slot), but a test would also be good for 
documentation purposes.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to