This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e568639  [FLINK-13369] Track references of already visited object in 
ClosureCleaner
e568639 is described below

commit e568639f97cb2c8be3ed85edf9c7dad3a36c4b73
Author: David Moravek <[email protected]>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200

    [FLINK-13369] Track references of already visited object in ClosureCleaner
---
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index f9d7ab0..4c54291 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
 
 /**
  * The closure cleaner is a utility that tries to truncate the closure 
(enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
         *                          be loaded, in order to process during the 
closure cleaning.
         */
        public static void clean(Object func, 
ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+               clean(func, level, checkSerializable, 
Collections.newSetFromMap(new IdentityHashMap<>()));
+       }
+
+       private static void clean(Object func, 
ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, 
Set<Object> visited) {
                if (func == null) {
                        return;
                }
 
+               if (!visited.add(func)) {
+                       return;
+               }
+
                final Class<?> cls = func.getClass();
 
                if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
                                                LOG.debug("Dig to clean the 
{}", fieldObject.getClass().getName());
                                        }
 
-                                       clean(fieldObject, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+                                       clean(fieldObject, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
                                }
                        }
                }
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
                Assert.assertEquals(result, 4);
        }
 
+       @Test
+       public void testSelfReferencingClean() {
+               final NestedSelfReferencing selfReferencing = new 
NestedSelfReferencing();
+               ClosureCleaner.clean(selfReferencing, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+       }
+
        class InnerCustomMap implements MapFunction<Integer, Integer> {
 
                @Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
        }
 }
 
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+       private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+       NestedSelfReferencing() {
+               this.cycle = () -> this;
+       }
+
+       public SerializableSupplier<NestedSelfReferencing> getCycle() {
+               return cycle;
+       }
+}
+

Reply via email to