This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e568639 [FLINK-13369] Track references of already visited object in
ClosureCleaner
e568639 is described below
commit e568639f97cb2c8be3ed85edf9c7dad3a36c4b73
Author: David Moravek <[email protected]>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200
[FLINK-13369] Track references of already visited object in ClosureCleaner
---
.../org/apache/flink/api/java/ClosureCleaner.java | 13 ++++++++++-
.../api/java/functions/ClosureCleanerTest.java | 25 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index f9d7ab0..4c54291 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
/**
* The closure cleaner is a utility that tries to truncate the closure
(enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
* be loaded, in order to process during the
closure cleaning.
*/
public static void clean(Object func,
ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+ clean(func, level, checkSerializable,
Collections.newSetFromMap(new IdentityHashMap<>()));
+ }
+
+ private static void clean(Object func,
ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable,
Set<Object> visited) {
if (func == null) {
return;
}
+ if (!visited.add(func)) {
+ return;
+ }
+
final Class<?> cls = func.getClass();
if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
LOG.debug("Dig to clean the
{}", fieldObject.getClass().getName());
}
- clean(fieldObject,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ clean(fieldObject,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}
}
}
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
---
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.functions;
+import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
Assert.assertEquals(result, 4);
}
+ @Test
+ public void testSelfReferencingClean() {
+ final NestedSelfReferencing selfReferencing = new
NestedSelfReferencing();
+ ClosureCleaner.clean(selfReferencing,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ }
+
class InnerCustomMap implements MapFunction<Integer, Integer> {
@Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
}
}
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+ private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+ NestedSelfReferencing() {
+ this.cycle = () -> this;
+ }
+
+ public SerializableSupplier<NestedSelfReferencing> getCycle() {
+ return cycle;
+ }
+}
+