This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new 655c78f [FLINK-13586] Make ClosureCleaner.clean() backwards compatible with 1.8.0 655c78f is described below commit 655c78f31b45ec3a96e53e269fca64b4682025ff Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Mon Sep 2 10:39:31 2019 +0200 [FLINK-13586] Make ClosureCleaner.clean() backwards compatible with 1.8.0 --- .../connectors/cassandra/CassandraCommitter.java | 3 +-- .../cassandra/CassandraRowWriteAheadSink.java | 3 +-- .../connectors/cassandra/CassandraSinkBase.java | 3 +-- .../cassandra/CassandraTupleWriteAheadSink.java | 3 +-- .../connectors/kafka/FlinkKafkaProducer011.java | 3 +-- .../connectors/kafka/FlinkKafkaConsumerBase.java | 5 ++--- .../connectors/kafka/FlinkKafkaProducerBase.java | 3 +-- .../connectors/kafka/FlinkKafkaProducer.java | 3 +-- .../connectors/kinesis/FlinkKinesisConsumer.java | 7 +++---- .../org/apache/flink/api/java/ClosureCleaner.java | 20 ++++++++++++++++++ .../api/java/functions/ClosureCleanerTest.java | 24 +++++++++++----------- .../java/org/apache/flink/cep/pattern/Pattern.java | 7 +++---- .../rpc/RpcGlobalAggregateManager.java | 7 +++---- .../flink/runtime/jobmaster/JobMasterTest.java | 2 +- .../tasks/OneInputStreamTaskTestHarness.java | 3 +-- .../KeyedOneInputStreamOperatorTestHarness.java | 5 ++--- .../KeyedTwoInputStreamOperatorTestHarness.java | 5 ++--- 17 files changed, 56 insertions(+), 50 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java index 47c55a1..b3948b2 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.cassandra; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; @@ -55,7 +54,7 @@ public class CassandraCommitter extends CheckpointCommitter { public CassandraCommitter(ClusterBuilder builder) { this.builder = builder; - ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(builder, true); } public CassandraCommitter(ClusterBuilder builder, String keySpace) { diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java index 4374deb..6b3d418 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.cassandra; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.typeutils.runtime.RowSerializer; @@ -63,7 +62,7 @@ public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> { super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); this.insertQuery = insertQuery; this.builder = builder; - ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(builder, true); } public void open() throws Exception { diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 0e7eb6f..5d758be 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.cassandra; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -65,7 +64,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl this.builder = builder; this.config = config; this.failureHandler = Preconditions.checkNotNull(failureHandler); - ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(builder, true); } @Override diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java index c8992a7..b028055 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.cassandra; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple; @@ -63,7 +62,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); this.insertQuery = insertQuery; this.builder = builder; - ClosureCleaner.clean(builder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(builder, true); } public void open() throws Exception { diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 3377ec1..5d22cc5 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.ListState; @@ -491,7 +490,7 @@ public class FlinkKafkaProducer011<IN> this.kafkaProducersPoolSize = kafkaProducersPoolSize; checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); - ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(this.flinkKafkaPartitioner, true); ClosureCleaner.ensureSerializable(serializationSchema); // set the producer configuration properties for kafka record key value serializers. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 034a50b..6cf1839 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -297,7 +296,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti throw new IllegalStateException("A periodic watermark emitter has already been set."); } try { - ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(assigner, true); this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); return this; } catch (Exception e) { @@ -332,7 +331,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti throw new IllegalStateException("A punctuated watermark emitter has already been set."); } try { - ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(assigner, true); this.periodicWatermarkAssigner = new SerializedValue<>(assigner); return this; } catch (Exception e) { diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index b23058c..3a1d1b6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; @@ -144,7 +143,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im requireNonNull(defaultTopicId, "TopicID not set"); requireNonNull(serializationSchema, "serializationSchema not set"); requireNonNull(producerConfig, "producerConfig not set"); - ClosureCleaner.clean(customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(customPartitioner, true); ClosureCleaner.ensureSerializable(serializationSchema); this.defaultTopicId = defaultTopicId; diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 230c138..d7c8129 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.ListState; @@ -493,7 +492,7 @@ public class FlinkKafkaProducer<IN> this.kafkaProducersPoolSize = kafkaProducersPoolSize; checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); - ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(this.flinkKafkaPartitioner, true); ClosureCleaner.ensureSerializable(serializationSchema); // set the producer configuration properties for kafka record key value serializers. diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 8916c34..5b24ded 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.ListState; @@ -239,7 +238,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple */ public void setShardAssigner(KinesisShardAssigner shardAssigner) { this.shardAssigner = checkNotNull(shardAssigner, "function can not be null"); - ClosureCleaner.clean(shardAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(shardAssigner, true); } public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner() { @@ -254,7 +253,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple public void setPeriodicWatermarkAssigner( AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner) { this.periodicWatermarkAssigner = periodicWatermarkAssigner; - ClosureCleaner.clean(this.periodicWatermarkAssigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(this.periodicWatermarkAssigner, true); } public WatermarkTracker getWatermarkTracker() { @@ -268,7 +267,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple */ public void setWatermarkTracker(WatermarkTracker watermarkTracker) { this.watermarkTracker = watermarkTracker; - ClosureCleaner.clean(this.watermarkTracker, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(this.watermarkTracker, true); } // ------------------------------------------------------------------------ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java index bd91b2d..abb9c5b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java @@ -53,6 +53,26 @@ public class ClosureCleaner { private static final Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class); /** + * Tries to clean the closure of the given object, if the object is a non-static inner class. + * This will use the default closure cleaner level of {@link ExecutionConfig.ClosureCleanerLevel#RECURSIVE}. + * + * @param func The object whose closure should be cleaned. + * @param checkSerializable Flag to indicate whether serializability should be checked after + * the closure cleaning attempt. + * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was + * not serializable after the closure cleaning. + * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could + * not be loaded, in order to process during the closure cleaning. + */ + public static void clean(Object func, boolean checkSerializable) { + clean( + func, + ExecutionConfig.ClosureCleanerLevel.RECURSIVE, + checkSerializable, + Collections.newSetFromMap(new IdentityHashMap<>())); + } + + /** * Tries to clean the closure of the given object, if the object is a non-static inner * class. * diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java index 7b93e8a..ec7d42e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java @@ -54,7 +54,7 @@ public class ClosureCleanerTest { MapCreator creator = new NonSerializableMapCreator(); MapFunction<Integer, Integer> map = creator.getMap(); - ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(map, true); int result = map.map(3); Assert.assertEquals(result, 4); @@ -65,7 +65,7 @@ public class ClosureCleanerTest { MapCreator creator = new SerializableMapCreator(1); MapFunction<Integer, Integer> map = creator.getMap(); - ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(map, true); int result = map.map(3); Assert.assertEquals(result, 4); @@ -76,7 +76,7 @@ public class ClosureCleanerTest { MapCreator creator = new NestedSerializableMapCreator(1); MapFunction<Integer, Integer> map = creator.getMap(); - ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(map, true); ClosureCleaner.ensureSerializable(map); @@ -89,7 +89,7 @@ public class ClosureCleanerTest { MapCreator creator = new NestedNonSerializableMapCreator(1); MapFunction<Integer, Integer> map = creator.getMap(); - ClosureCleaner.clean(map, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(map, true); ClosureCleaner.ensureSerializable(map); @@ -104,7 +104,7 @@ public class ClosureCleanerTest { WrapperMapFunction wrapped = new WrapperMapFunction(notCleanedMap); - ClosureCleaner.clean(wrapped, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(wrapped, true); ClosureCleaner.ensureSerializable(wrapped); @@ -116,7 +116,7 @@ public class ClosureCleanerTest { public void testComplexTopLevelClassClean() throws Exception { MapFunction<Integer, Integer> complexMap = new ComplexMap((MapFunction<Integer, Integer>) value -> value + 1); - ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(complexMap, true); int result = complexMap.map(3); @@ -127,7 +127,7 @@ public class ClosureCleanerTest { public void testComplexInnerClassClean() throws Exception { MapFunction<Integer, Integer> complexMap = new InnerComplexMap((MapFunction<Integer, Integer>) value -> value + 1); - ClosureCleaner.clean(complexMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(complexMap, true); int result = complexMap.map(3); @@ -137,7 +137,7 @@ public class ClosureCleanerTest { @Test public void testSelfReferencingClean() { final NestedSelfReferencing selfReferencing = new NestedSelfReferencing(); - ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(selfReferencing, true); } class InnerCustomMap implements MapFunction<Integer, Integer> { @@ -191,7 +191,7 @@ public class ClosureCleanerTest { MapFunction<Integer, Integer> wrappedMap = new WrapperMapFunction(nestedMap); - ClosureCleaner.clean(wrappedMap, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(wrappedMap, true); ClosureCleaner.ensureSerializable(wrappedMap); } @@ -204,7 +204,7 @@ public class ClosureCleanerTest { Tuple1<MapFunction<Integer, Integer>> tuple = new Tuple1<>(wrappedMap); - ClosureCleaner.clean(tuple, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(tuple, true); ClosureCleaner.ensureSerializable(tuple); } @@ -213,7 +213,7 @@ public class ClosureCleanerTest { public void testRecursiveClass() { RecursiveClass recursiveClass = new RecursiveClass(new RecursiveClass()); - ClosureCleaner.clean(recursiveClass, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(recursiveClass, true); ClosureCleaner.ensureSerializable(recursiveClass); } @@ -230,7 +230,7 @@ public class ClosureCleanerTest { public void testWriteReplaceRecursive() { WithWriteReplace writeReplace = new WithWriteReplace(new WithWriteReplace.Payload("text")); Assert.assertEquals("text", writeReplace.getPayload().getRaw()); - ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(writeReplace, true); } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 49aaf6d..6423676 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -18,7 +18,6 @@ package org.apache.flink.cep.pattern; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; @@ -156,7 +155,7 @@ public class Pattern<T, F extends T> { public Pattern<T, F> where(IterativeCondition<F> condition) { Preconditions.checkNotNull(condition, "The condition cannot be null."); - ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(condition, true); if (this.condition == null) { this.condition = condition; } else { @@ -178,7 +177,7 @@ public class Pattern<T, F extends T> { public Pattern<T, F> or(IterativeCondition<F> condition) { Preconditions.checkNotNull(condition, "The condition cannot be null."); - ClosureCleaner.clean(condition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(condition, true); if (this.condition == null) { this.condition = condition; @@ -228,7 +227,7 @@ public class Pattern<T, F extends T> { throw new MalformedPatternException("The until condition is only applicable to looping states."); } - ClosureCleaner.clean(untilCondition, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(untilCondition, true); this.untilCondition = untilCondition; return this; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java index 709a582..c566743 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcGlobalAggregateManager.java @@ -18,15 +18,14 @@ package org.apache.flink.runtime.taskexecutor.rpc; -import java.io.IOException; - -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.util.InstantiationUtil; +import java.io.IOException; + public class RpcGlobalAggregateManager implements GlobalAggregateManager { private final JobMasterGateway jobMasterGateway; @@ -38,7 +37,7 @@ public class RpcGlobalAggregateManager implements GlobalAggregateManager { @Override public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, Object aggregand, AggregateFunction<IN, ACC, OUT> aggregateFunction) throws IOException { - ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE,true); + ClosureCleaner.clean(aggregateFunction, true); byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction); Object result = null; try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f42030e..836bea3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1636,7 +1636,7 @@ public class JobMasterTest extends TestLogger { AggregateFunction<Integer, Integer, Integer> aggregateFunction = createAggregateFunction(); - ClosureCleaner.clean(aggregateFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean(aggregateFunction, true); byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction); updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index d07b892..7ac0cf3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; @@ -139,7 +138,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes public <K> void configureForKeyedStream( KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { - ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + ClosureCleaner.clean(keySelector, false); streamConfig.setStatePartitioner(0, keySelector); streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 31b10f5..caf846f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; @@ -44,7 +43,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> int subtaskIndex) throws Exception { super(operator, maxParallelism, numSubtasks, subtaskIndex); - ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); } @@ -64,7 +63,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> super(operator, environment); - ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index eb6222f..c00e59a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.util; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; @@ -44,8 +43,8 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> int subtaskIndex) throws Exception { super(operator, maxParallelism, numSubtasks, subtaskIndex); - ClosureCleaner.clean(keySelector1, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); - ClosureCleaner.clean(keySelector2, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + ClosureCleaner.clean(keySelector1, false); + ClosureCleaner.clean(keySelector2, false); config.setStatePartitioner(0, keySelector1); config.setStatePartitioner(1, keySelector2); config.setStateKeySerializer(keyType.createSerializer(executionConfig));