This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 56a2742c5d0b482170b21236419c5da1bb3a8565 Author: David Moravek <[email protected]> AuthorDate: Tue Jul 23 14:56:26 2019 +0200 [FLINK-13369] Track references of already visited object in ClosureCleaner --- .../org/apache/flink/api/java/ClosureCleaner.java | 13 ++++++++++- .../api/java/functions/ClosureCleanerTest.java | 25 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java index f9d7ab0..4c54291 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java @@ -37,6 +37,9 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; /** * The closure cleaner is a utility that tries to truncate the closure (enclosing instance) @@ -65,10 +68,18 @@ public class ClosureCleaner { * be loaded, in order to process during the closure cleaning. */ public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) { + clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>())); + } + + private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) { if (func == null) { return; } + if (!visited.add(func)) { + return; + } + final Class<?> cls = func.getClass(); if (ClassUtils.isPrimitiveOrWrapper(cls)) { @@ -112,7 +123,7 @@ public class ClosureCleaner { LOG.debug("Dig to clean the {}", fieldObject.getClass().getName()); } - clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited); } } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java index 82a08b1..3c9775a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.functions; +import java.util.function.Supplier; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; @@ -133,6 +134,12 @@ public class ClosureCleanerTest { Assert.assertEquals(result, 4); } + @Test + public void testSelfReferencingClean() { + final NestedSelfReferencing selfReferencing = new NestedSelfReferencing(); + ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + } + class InnerCustomMap implements MapFunction<Integer, Integer> { @Override @@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator { } } +@FunctionalInterface +interface SerializableSupplier<T> extends Supplier<T>, Serializable { + +} + +class NestedSelfReferencing implements Serializable { + + private final SerializableSupplier<NestedSelfReferencing> cycle; + + NestedSelfReferencing() { + this.cycle = () -> this; + } + + public SerializableSupplier<NestedSelfReferencing> getCycle() { + return cycle; + } +} +
