zentol commented on a change in pull request #12981:
URL: https://github.com/apache/flink/pull/12981#discussion_r459982395
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
- // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+ // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms
~ 2 s)
// which means that MemoryReservationException will be thrown after 1 s
of trying
Review comment:
```suggestion
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -147,8 +147,10 @@ void reserveMemory(long size, int maxSleeps) throws
MemoryReservationException {
}
// no luck
- throw new MemoryReservationException(
- String.format("Could not allocate %d bytes,
only %d bytes are remaining", size, availableOrReserved));
+ throw new MemoryReservationException(String.format(
+ "Could not allocate %d bytes, only %d bytes are
remaining, try to upgrade to Java 8u72 or higher",
Review comment:
I wouldn't change these; right here the result is ambiguous and isn't
necessarily due to the Java version.
If the Java version is the problem, then we have already logged a warning
when the process was started.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
- // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+ // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms
~ 2 s)
// which means that MemoryReservationException will be thrown after 1 s
of trying
- private static final int MAX_SLEEPS = 10;
+ private static final int MAX_SLEEPS = 11;
+ private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // (total (128 x
1024) - 1 ms ~ 2 min)
Review comment:
```suggestion
private static final int MAX_SLEEPS_VERIFY_EMPTY = 17; // ((2^17) - 1
ms ~ 2 min total sleep duration)
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
- // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+ // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms
~ 2 s)
// which means that MemoryReservationException will be thrown after 1 s
of trying
- private static final int MAX_SLEEPS = 10;
+ private static final int MAX_SLEEPS = 11;
Review comment:
```suggestion
private static final int MAX_SLEEPS = 11; // ((2^11) - 1 ms ~ 2 s
total sleep duration)
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -61,7 +62,8 @@ long getAvailableMemorySize() {
boolean verifyEmpty() {
try {
- reserveMemory(totalMemorySize);
+ // we wait longer as we have to GC all memory,
allocated by task, to perform the verification
Review comment:
wait longer than _what_?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
slotActions = null;
state = State.CREATED;
closingFuture = new CompletableFuture<>();
+
+ asyncExecutor = new ThreadPoolExecutor(
Review comment:
All executors should use a
`org.apache.flink.runtime.util.ExecutorThreadFactory`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -122,15 +128,15 @@ void reserveMemory(long size) throws
MemoryReservationException {
if (availableOrReserved >= size) {
return;
}
- if (sleeps >= MAX_SLEEPS) {
+ if (sleeps >= maxSleeps) {
break;
}
- if (sleeps >= RETRIGGER_GC_AFTER_SLEEPS) {
- // trigger again VM's Reference
processing if we have to wait longer
- System.gc();
- }
try {
if
(!JavaGcCleanerWrapper.tryRunPendingCleaners()) {
+ if (sleeps >=
RETRIGGER_GC_AFTER_SLEEPS) {
Review comment:
Is this just an optimization to skip 1 GC call?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -318,7 +318,10 @@ private void
verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after
after,
() -> {
if (!memoryManager.verifyEmpty()) {
- LOG.warn("Not all slot memory is freed,
potential memory leak at {}", this);
+ LOG.warn(
+ "Not all slot managed memory is
freed, potential memory leak at {}, " +
Review comment:
same as above
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
##########
@@ -37,9 +37,10 @@
class UnsafeMemoryBudget {
// max. number of sleeps during try-reserving with exponentially
// increasing delay before throwing OutOfMemoryError:
- // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 (total 1023 ms ~ 1 s)
+ // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, ... (total (2 x 1024) - 1 ms
~ 2 s)
Review comment:
move the total calculation to the `MAX_SLEEPS` line; this section is now
about the general concept of exponential delays, and shouldn't use examples for
a particular sleep count.
##########
File path:
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -303,32 +302,38 @@ private Runnable create(Object owner, Runnable
cleanupOperation) {
private static class PendingCleanersRunnerProvider {
private static final String REFERENCE_CLASS =
"java.lang.ref.Reference";
private final String cleanerName;
- private final ReflectionUtils reflectionUtils;
private final String waitForReferenceProcessingName;
private final Object[] waitForReferenceProcessingArgs;
private final Class<?>[] waitForReferenceProcessingArgTypes;
private PendingCleanersRunnerProvider(
String cleanerName,
- ReflectionUtils reflectionUtils,
String waitForReferenceProcessingName,
Object[] waitForReferenceProcessingArgs,
Class<?>[] waitForReferenceProcessingArgTypes) {
this.cleanerName = cleanerName;
- this.reflectionUtils = reflectionUtils;
this.waitForReferenceProcessingName =
waitForReferenceProcessingName;
this.waitForReferenceProcessingArgs =
waitForReferenceProcessingArgs;
this.waitForReferenceProcessingArgTypes =
waitForReferenceProcessingArgTypes;
}
+ @Nullable
private PendingCleanersRunner createPendingCleanersRunner() {
Review comment:
maybe use an `Optional` instead and use `Optional#orElse(null)` at the
call site.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
##########
@@ -295,22 +303,25 @@ public String toString() {
// and set the slot state to releasing so that
it gets eventually freed
tasks.values().forEach(task ->
task.failExternally(cause));
}
+
final CompletableFuture<Void> cleanupFuture =
FutureUtils
Review comment:
maybe rename this to `shutdownFuture`? We aren't quite sure whether the
cleanup was successful.
##########
File path:
flink-core/src/main/java/org/apache/flink/util/JavaGcCleanerWrapper.java
##########
@@ -303,32 +302,38 @@ private Runnable create(Object owner, Runnable
cleanupOperation) {
private static class PendingCleanersRunnerProvider {
private static final String REFERENCE_CLASS =
"java.lang.ref.Reference";
private final String cleanerName;
- private final ReflectionUtils reflectionUtils;
private final String waitForReferenceProcessingName;
private final Object[] waitForReferenceProcessingArgs;
private final Class<?>[] waitForReferenceProcessingArgTypes;
private PendingCleanersRunnerProvider(
String cleanerName,
- ReflectionUtils reflectionUtils,
String waitForReferenceProcessingName,
Object[] waitForReferenceProcessingArgs,
Class<?>[] waitForReferenceProcessingArgTypes) {
this.cleanerName = cleanerName;
- this.reflectionUtils = reflectionUtils;
this.waitForReferenceProcessingName =
waitForReferenceProcessingName;
this.waitForReferenceProcessingArgs =
waitForReferenceProcessingArgs;
this.waitForReferenceProcessingArgTypes =
waitForReferenceProcessingArgTypes;
}
+ @Nullable
private PendingCleanersRunner createPendingCleanersRunner() {
- Class<?> referenceClass =
reflectionUtils.findClass(REFERENCE_CLASS);
- Method waitForReferenceProcessingMethod =
reflectionUtils.findMethod(
- referenceClass,
- waitForReferenceProcessingName,
- waitForReferenceProcessingArgTypes);
- waitForReferenceProcessingMethod.setAccessible(true);
- return new
PendingCleanersRunner(waitForReferenceProcessingMethod,
waitForReferenceProcessingArgs);
+ try {
+ Class<?> referenceClass =
Class.forName(REFERENCE_CLASS);
+ Method waitForReferenceProcessingMethod =
referenceClass.getDeclaredMethod(
+ waitForReferenceProcessingName,
+ waitForReferenceProcessingArgTypes);
+
waitForReferenceProcessingMethod.setAccessible(true);
+ return new
PendingCleanersRunner(waitForReferenceProcessingMethod,
waitForReferenceProcessingArgs);
+ } catch (ClassNotFoundException | NoSuchMethodException
e) {
Review comment:
Why did you opt for not using ` ReflectionUtils` instead of catching
the `FlinkRuntimeException` and checking the type of the cause, or introducing
some sort of optional failure handler to `findClass()/findMethod()` (like
`findClass(String clazz, FunctionWithException<Class<?>,
FlinkRuntimeException>)`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -134,6 +143,12 @@ public TaskSlotTableImpl(
slotActions = null;
state = State.CREATED;
closingFuture = new CompletableFuture<>();
+
+ asyncExecutor = new ThreadPoolExecutor(
+ 0,
+ numberSlots,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>());
Review comment:
Whenever you use an executor that is not provided via some factory
method in `java.util.concurrent.Executors` I would recommend writing down some
tests for it to ensure it behaves exactly the way you expect it to.
This right here is probably fine since we know the upper limit of possible
tasks to be queued (== 1 task per slot), but a test would also be good for
documentation purposes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]