This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 655c78f  [FLINK-13586] Make ClosureCleaner.clean() backwards 
compatible with 1.8.0
655c78f is described below

commit 655c78f31b45ec3a96e53e269fca64b4682025ff
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Mon Sep 2 10:39:31 2019 +0200

    [FLINK-13586] Make ClosureCleaner.clean() backwards compatible with 1.8.0
---
 .../connectors/cassandra/CassandraCommitter.java   |  3 +--
 .../cassandra/CassandraRowWriteAheadSink.java      |  3 +--
 .../connectors/cassandra/CassandraSinkBase.java    |  3 +--
 .../cassandra/CassandraTupleWriteAheadSink.java    |  3 +--
 .../connectors/kafka/FlinkKafkaProducer011.java    |  3 +--
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  5 ++---
 .../connectors/kafka/FlinkKafkaProducerBase.java   |  3 +--
 .../connectors/kafka/FlinkKafkaProducer.java       |  3 +--
 .../connectors/kinesis/FlinkKinesisConsumer.java   |  7 +++----
 .../org/apache/flink/api/java/ClosureCleaner.java  | 20 ++++++++++++++++++
 .../api/java/functions/ClosureCleanerTest.java     | 24 +++++++++++-----------
 .../java/org/apache/flink/cep/pattern/Pattern.java |  7 +++----
 .../rpc/RpcGlobalAggregateManager.java             |  7 +++----
 .../flink/runtime/jobmaster/JobMasterTest.java     |  2 +-
 .../tasks/OneInputStreamTaskTestHarness.java       |  3 +--
 .../KeyedOneInputStreamOperatorTestHarness.java    |  5 ++---
 .../KeyedTwoInputStreamOperatorTestHarness.java    |  5 ++---
 17 files changed, 56 insertions(+), 50 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
index 47c55a1..b3948b2 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 
@@ -55,7 +54,7 @@ public class CassandraCommitter extends CheckpointCommitter {
 
        public CassandraCommitter(ClusterBuilder builder) {
                this.builder = builder;
-               ClosureCleaner.clean(builder, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(builder, true);
        }
 
        public CassandraCommitter(ClusterBuilder builder, String keySpace) {
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
index 4374deb..6b3d418 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
@@ -63,7 +62,7 @@ public class CassandraRowWriteAheadSink extends 
GenericWriteAheadSink<Row> {
                super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
                this.insertQuery = insertQuery;
                this.builder = builder;
-               ClosureCleaner.clean(builder, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(builder, true);
        }
 
        public void open() throws Exception {
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 0e7eb6f..5d758be 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.cassandra;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -65,7 +64,7 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> impl
                this.builder = builder;
                this.config = config;
                this.failureHandler = 
Preconditions.checkNotNull(failureHandler);
-               ClosureCleaner.clean(builder, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(builder, true);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index c8992a7..b028055 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -63,7 +62,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> 
extends GenericWrite
                super(committer, serializer, 
UUID.randomUUID().toString().replace("-", "_"));
                this.insertQuery = insertQuery;
                this.builder = builder;
-               ClosureCleaner.clean(builder, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(builder, true);
        }
 
        public void open() throws Exception {
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3377ec1..5d22cc5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -491,7 +490,7 @@ public class FlinkKafkaProducer011<IN>
                this.kafkaProducersPoolSize = kafkaProducersPoolSize;
                checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize 
must be non empty");
 
-               ClosureCleaner.clean(this.flinkKafkaPartitioner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
                ClosureCleaner.ensureSerializable(serializationSchema);
 
                // set the producer configuration properties for kafka record 
key value serializers.
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 034a50b..6cf1839 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -297,7 +296,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        throw new IllegalStateException("A periodic watermark 
emitter has already been set.");
                }
                try {
-                       ClosureCleaner.clean(assigner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+                       ClosureCleaner.clean(assigner, true);
                        this.punctuatedWatermarkAssigner = new 
SerializedValue<>(assigner);
                        return this;
                } catch (Exception e) {
@@ -332,7 +331,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        throw new IllegalStateException("A punctuated watermark 
emitter has already been set.");
                }
                try {
-                       ClosureCleaner.clean(assigner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+                       ClosureCleaner.clean(assigner, true);
                        this.periodicWatermarkAssigner = new 
SerializedValue<>(assigner);
                        return this;
                } catch (Exception e) {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index b23058c..3a1d1b6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -144,7 +143,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                requireNonNull(defaultTopicId, "TopicID not set");
                requireNonNull(serializationSchema, "serializationSchema not 
set");
                requireNonNull(producerConfig, "producerConfig not set");
-               ClosureCleaner.clean(customPartitioner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(customPartitioner, true);
                ClosureCleaner.ensureSerializable(serializationSchema);
 
                this.defaultTopicId = defaultTopicId;
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 230c138..d7c8129 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -493,7 +492,7 @@ public class FlinkKafkaProducer<IN>
                this.kafkaProducersPoolSize = kafkaProducersPoolSize;
                checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize 
must be non empty");
 
-               ClosureCleaner.clean(this.flinkKafkaPartitioner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
                ClosureCleaner.ensureSerializable(serializationSchema);
 
                // set the producer configuration properties for kafka record 
key value serializers.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 8916c34..5b24ded 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.state.ListState;
@@ -239,7 +238,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
         */
        public void setShardAssigner(KinesisShardAssigner shardAssigner) {
                this.shardAssigner = checkNotNull(shardAssigner, "function can 
not be null");
-               ClosureCleaner.clean(shardAssigner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(shardAssigner, true);
        }
 
        public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() 
{
@@ -254,7 +253,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
        public void setPeriodicWatermarkAssigner(
                AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) {
                this.periodicWatermarkAssigner = periodicWatermarkAssigner;
-               ClosureCleaner.clean(this.periodicWatermarkAssigner, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(this.periodicWatermarkAssigner, true);
        }
 
        public WatermarkTracker getWatermarkTracker() {
@@ -268,7 +267,7 @@ public class FlinkKinesisConsumer<T> extends 
RichParallelSourceFunction<T> imple
         */
        public void setWatermarkTracker(WatermarkTracker watermarkTracker) {
                this.watermarkTracker = watermarkTracker;
-               ClosureCleaner.clean(this.watermarkTracker, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(this.watermarkTracker, true);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index bd91b2d..abb9c5b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -53,6 +53,26 @@ public class ClosureCleaner {
        private static final Logger LOG = 
LoggerFactory.getLogger(ClosureCleaner.class);
 
        /**
+        * Tries to clean the closure of the given object, if the object is a 
non-static inner class.
+        * This will use the default closure cleaner level of {@link 
ExecutionConfig.ClosureCleanerLevel#RECURSIVE}.
+        *
+        * @param func The object whose closure should be cleaned.
+        * @param checkSerializable Flag to indicate whether serializability 
should be checked after
+        *              the closure cleaning attempt.
+        * @throws InvalidProgramException Thrown, if 'checkSerializable' is 
true, and the object was
+        *              not serializable after the closure cleaning.
+        * @throws RuntimeException A RuntimeException may be thrown, if the 
code of the class could
+        *              not be loaded, in order to process during the closure 
cleaning.
+        */
+       public static void clean(Object func, boolean checkSerializable) {
+               clean(
+                               func,
+                               ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+                               checkSerializable,
+                               Collections.newSetFromMap(new 
IdentityHashMap<>()));
+       }
+
+       /**
         * Tries to clean the closure of the given object, if the object is a 
non-static inner
         * class.
         *
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 7b93e8a..ec7d42e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -54,7 +54,7 @@ public class ClosureCleanerTest {
                MapCreator creator = new NonSerializableMapCreator();
                MapFunction<Integer, Integer> map = creator.getMap();
 
-               ClosureCleaner.clean(map, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(map, true);
 
                int result = map.map(3);
                Assert.assertEquals(result, 4);
@@ -65,7 +65,7 @@ public class ClosureCleanerTest {
                MapCreator creator = new SerializableMapCreator(1);
                MapFunction<Integer, Integer> map = creator.getMap();
 
-               ClosureCleaner.clean(map, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(map, true);
 
                int result = map.map(3);
                Assert.assertEquals(result, 4);
@@ -76,7 +76,7 @@ public class ClosureCleanerTest {
                MapCreator creator = new NestedSerializableMapCreator(1);
                MapFunction<Integer, Integer> map = creator.getMap();
 
-               ClosureCleaner.clean(map, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(map, true);
 
                ClosureCleaner.ensureSerializable(map);
 
@@ -89,7 +89,7 @@ public class ClosureCleanerTest {
                MapCreator creator = new NestedNonSerializableMapCreator(1);
                MapFunction<Integer, Integer> map = creator.getMap();
 
-               ClosureCleaner.clean(map, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(map, true);
 
                ClosureCleaner.ensureSerializable(map);
 
@@ -104,7 +104,7 @@ public class ClosureCleanerTest {
 
                WrapperMapFunction wrapped = new 
WrapperMapFunction(notCleanedMap);
 
-               ClosureCleaner.clean(wrapped, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(wrapped, true);
 
                ClosureCleaner.ensureSerializable(wrapped);
 
@@ -116,7 +116,7 @@ public class ClosureCleanerTest {
        public void testComplexTopLevelClassClean() throws Exception {
                MapFunction<Integer, Integer> complexMap = new 
ComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
 
-               ClosureCleaner.clean(complexMap, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(complexMap, true);
 
                int result = complexMap.map(3);
 
@@ -127,7 +127,7 @@ public class ClosureCleanerTest {
        public void testComplexInnerClassClean() throws Exception {
                MapFunction<Integer, Integer> complexMap = new 
InnerComplexMap((MapFunction<Integer, Integer>) value -> value + 1);
 
-               ClosureCleaner.clean(complexMap, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(complexMap, true);
 
                int result = complexMap.map(3);
 
@@ -137,7 +137,7 @@ public class ClosureCleanerTest {
        @Test
        public void testSelfReferencingClean() {
                final NestedSelfReferencing selfReferencing = new 
NestedSelfReferencing();
-               ClosureCleaner.clean(selfReferencing, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(selfReferencing, true);
        }
 
        class InnerCustomMap implements MapFunction<Integer, Integer> {
@@ -191,7 +191,7 @@ public class ClosureCleanerTest {
 
                MapFunction<Integer, Integer> wrappedMap = new 
WrapperMapFunction(nestedMap);
 
-               ClosureCleaner.clean(wrappedMap, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(wrappedMap, true);
 
                ClosureCleaner.ensureSerializable(wrappedMap);
        }
@@ -204,7 +204,7 @@ public class ClosureCleanerTest {
 
                Tuple1<MapFunction<Integer, Integer>> tuple = new 
Tuple1<>(wrappedMap);
 
-               ClosureCleaner.clean(tuple, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(tuple, true);
 
                ClosureCleaner.ensureSerializable(tuple);
        }
@@ -213,7 +213,7 @@ public class ClosureCleanerTest {
        public void testRecursiveClass() {
                RecursiveClass recursiveClass = new RecursiveClass(new 
RecursiveClass());
 
-               ClosureCleaner.clean(recursiveClass, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(recursiveClass, true);
 
                ClosureCleaner.ensureSerializable(recursiveClass);
        }
@@ -230,7 +230,7 @@ public class ClosureCleanerTest {
        public void testWriteReplaceRecursive() {
                WithWriteReplace writeReplace = new WithWriteReplace(new 
WithWriteReplace.Payload("text"));
                Assert.assertEquals("text", writeReplace.getPayload().getRaw());
-               ClosureCleaner.clean(writeReplace, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(writeReplace, true);
        }
 }
 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 49aaf6d..6423676 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
@@ -156,7 +155,7 @@ public class Pattern<T, F extends T> {
        public Pattern<T, F> where(IterativeCondition<F> condition) {
                Preconditions.checkNotNull(condition, "The condition cannot be 
null.");
 
-               ClosureCleaner.clean(condition, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(condition, true);
                if (this.condition == null) {
                        this.condition = condition;
                } else {
@@ -178,7 +177,7 @@ public class Pattern<T, F extends T> {
        public Pattern<T, F> or(IterativeCondition<F> condition) {
                Preconditions.checkNotNull(condition, "The condition cannot be 
null.");
 
-               ClosureCleaner.clean(condition, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(condition, true);
 
                if (this.condition == null) {
                        this.condition = condition;
@@ -228,7 +227,7 @@ public class Pattern<T, F extends T> {
                        throw new MalformedPatternException("The until 
condition is only applicable to looping states.");
                }
 
-               ClosureCleaner.clean(untilCondition, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+               ClosureCleaner.clean(untilCondition, true);
                this.untilCondition = untilCondition;
 
                return this;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
index 709a582..c566743 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.taskexecutor.rpc;
 
-import java.io.IOException;
-
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.util.InstantiationUtil;
 
+import java.io.IOException;
+
 public class RpcGlobalAggregateManager implements GlobalAggregateManager {
 
        private final JobMasterGateway jobMasterGateway;
@@ -38,7 +37,7 @@ public class RpcGlobalAggregateManager implements 
GlobalAggregateManager {
        @Override
        public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, 
Object aggregand, AggregateFunction<IN, ACC, OUT> aggregateFunction)
                throws IOException {
-               ClosureCleaner.clean(aggregateFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE,true);
+               ClosureCleaner.clean(aggregateFunction, true);
                byte[] serializedAggregateFunction = 
InstantiationUtil.serializeObject(aggregateFunction);
                Object result = null;
                try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f42030e..836bea3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1636,7 +1636,7 @@ public class JobMasterTest extends TestLogger {
 
                        AggregateFunction<Integer, Integer, Integer> 
aggregateFunction = createAggregateFunction();
 
-                       ClosureCleaner.clean(aggregateFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+                       ClosureCleaner.clean(aggregateFunction, true);
                        byte[] serializedAggregateFunction = 
InstantiationUtil.serializeObject(aggregateFunction);
 
                        updateAggregateFuture = 
jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index d07b892..7ac0cf3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -139,7 +138,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
        public <K> void configureForKeyedStream(
                        KeySelector<IN, K> keySelector,
                        TypeInformation<K> keyType) {
-               ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+               ClosureCleaner.clean(keySelector, false);
                streamConfig.setStatePartitioner(0, keySelector);
                
streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 31b10f5..caf846f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -44,7 +43,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                        int subtaskIndex) throws Exception {
                super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
-               ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+               ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
        }
@@ -64,7 +63,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
 
                super(operator, environment);
 
-               ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+               ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index eb6222f..c00e59a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -44,8 +43,8 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
                        int subtaskIndex) throws Exception {
                super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
-               ClosureCleaner.clean(keySelector1, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
-               ClosureCleaner.clean(keySelector2, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
+               ClosureCleaner.clean(keySelector1, false);
+               ClosureCleaner.clean(keySelector2, false);
                config.setStatePartitioner(0, keySelector1);
                config.setStatePartitioner(1, keySelector2);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));

Reply via email to