This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6633719c728526a08344645e0a844f27d25629f4 Author: Weijie Guo <[email protected]> AuthorDate: Tue Sep 24 09:54:32 2024 +0800 [FLINK-36366][core] Remove deprecated serializer related config option and method This closes #25396 --- docs/content.zh/docs/dev/table/data_stream_api.md | 2 - docs/content/docs/dev/table/data_stream_api.md | 3 - .../apache/flink/api/common/ExecutionConfig.java | 354 --------------------- .../flink/api/common/SerializableSerializer.java | 47 +++ .../api/common/serialization/SerializerConfig.java | 21 -- .../common/serialization/SerializerConfigImpl.java | 35 +- .../java/typeutils/runtime/KryoRegistration.java | 11 +- .../typeutils/runtime/kryo/KryoSerializer.java | 34 +- .../runtime/kryo/KryoSerializerSnapshot.java | 2 +- .../runtime/kryo/KryoSerializerSnapshotData.java | 2 +- .../java/typeutils/runtime/kryo/Serializers.java | 3 +- .../flink/configuration/PipelineOptions.java | 69 ---- .../flink/api/common/ExecutionConfigTest.java | 88 ++--- .../serialization/SerializerConfigImplTest.java | 98 +----- .../api/common/state/StateDescriptorTest.java | 2 +- .../java/typeutils/runtime/PojoSerializerTest.java | 5 +- .../PojoSerializerUpgradeTestSpecifications.java | 18 +- .../tests/scala/JavaJobWithKryoSerializer.java | 12 +- .../avro/utils/AvroKryoSerializerUtils.java | 15 +- .../avro/typeutils/AvroTypeExtractionTest.java | 2 +- .../flink/api/java/ExecutionEnvironment.java | 82 ----- .../apache/flink/api/java/utils/PlanGenerator.java | 12 +- flink-python/pyflink/common/execution_config.py | 14 - .../pyflink/common/tests/test_execution_config.py | 30 -- .../itcases/AbstractQueryableStateTestBase.java | 10 +- .../environment/StreamExecutionEnvironment.java | 127 -------- .../runtime/state/OperatorStateBackendTest.java | 2 +- .../flink/runtime/state/StateBackendTestBase.java | 51 ++- .../runtime/util/bash/FlinkConfigLoaderTest.java | 20 -- .../api/operators/StateDescriptorPassingTest.java | 51 ++- .../api/operators/StreamingRuntimeContextTest.java | 22 +- .../changelog/ChangelogStateBackendTestUtils.java | 4 +- ...ecutionEnvironmentComplexConfigurationTest.java | 7 +- .../utils/DummyStreamExecutionEnvironment.java | 37 --- .../flink/test/operators/GroupReduceITCase.java | 5 +- .../RegisterTypeWithKryoSerializerITCase.java | 139 -------- 36 files changed, 233 insertions(+), 1203 deletions(-) diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md b/docs/content.zh/docs/dev/table/data_stream_api.md index c3492568a1f..d96c075f5e5 100644 --- a/docs/content.zh/docs/dev/table/data_stream_api.md +++ b/docs/content.zh/docs/dev/table/data_stream_api.md @@ -594,8 +594,6 @@ env = StreamExecutionEnvironment.get_execution_environment() # set various configuration early env.set_max_parallelism(256) -env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name") - env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE) # then switch to Python Table API diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md index a7217053b08..9f77e715fb1 100644 --- a/docs/content/docs/dev/table/data_stream_api.md +++ b/docs/content/docs/dev/table/data_stream_api.md @@ -592,9 +592,6 @@ env = StreamExecutionEnvironment.get_execution_environment() # set various configuration early env.set_max_parallelism(256) -env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name") -env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name") - env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE) # then switch to Python Table API diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 1b0d66dd1ef..b2adee5ec22 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -40,14 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Serializer; - import java.io.Serializable; import java.time.Duration; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -452,104 +448,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut return configuration.get(EXECUTION_MODE); } - /** - * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. In - * some cases this might be preferable. For example, when using interfaces with subclasses that - * cannot be analyzed as POJO. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#FORCE_KRYO}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void enableForceKryo() { - serializerConfig.setForceKryo(true); - } - - /** - * Disable use of Kryo serializer for all POJOs. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#FORCE_KRYO}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void disableForceKryo() { - serializerConfig.setForceKryo(false); - } - - /** @deprecated Use {@link SerializerConfig#isForceKryoEnabled}. */ - @Deprecated - public boolean isForceKryoEnabled() { - return serializerConfig.isForceKryoEnabled(); - } - - /** - * Enables the use generic types which are serialized via Kryo. - * - * <p>Generic types are enabled by default. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#GENERIC_TYPES}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - * @see #disableGenericTypes() - */ - @Deprecated - public void enableGenericTypes() { - serializerConfig.setGenericTypes(true); - } - - /** - * Disables the use of generic types (types that would be serialized via Kryo). If this option - * is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters a - * data type that would go through Kryo for serialization. - * - * <p>Disabling generic types can be helpful to eagerly find and eliminate the use of types that - * would go through Kryo serialization during runtime. Rather than checking types individually, - * using this option will throw exceptions eagerly in the places where generic types are used. - * - * <p><b>Important:</b> We recommend to use this option only during development and - * pre-production phases, not during actual production use. The application program and/or the - * input data may be such that new, previously unseen, types occur at some point. In that case, - * setting this option would cause the program to fail. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#GENERIC_TYPES}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - * @see #enableGenericTypes() - */ - @Deprecated - public void disableGenericTypes() { - serializerConfig.setGenericTypes(false); - } - - /** - * Checks whether generic types are supported. Generic types are types that go through Kryo - * during serialization. - * - * <p>Generic types are enabled by default. - * - * @deprecated Use {@link SerializerConfig#hasGenericTypesDisabled}. - * @see #enableGenericTypes() - * @see #disableGenericTypes() - */ - @Deprecated - public boolean hasGenericTypesDisabled() { - return serializerConfig.hasGenericTypesDisabled(); - } - /** * Enables the Flink runtime to auto-generate UID's for operators. * @@ -588,48 +486,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut return configuration.get(PipelineOptions.AUTO_GENERATE_UIDS); } - /** - * Forces Flink to use the Apache Avro serializer for POJOs. - * - * <p><b>Important:</b> Make sure to include the <i>flink-avro</i> module. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#FORCE_AVRO}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void enableForceAvro() { - serializerConfig.setForceAvro(true); - } - - /** - * Disables the Apache Avro serializer as the forced serializer for POJOs. - * - * @deprecated Configure serialization behavior through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#FORCE_AVRO}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void disableForceAvro() { - serializerConfig.setForceAvro(false); - } - - /** - * Returns whether the Apache Avro is the default serializer for POJOs. - * - * @deprecated Use {@link SerializerConfig#isForceAvroEnabled}. - */ - @Deprecated - public boolean isForceAvroEnabled() { - return serializerConfig.isForceAvroEnabled(); - } - /** * Enables reusing objects that Flink internally uses for deserialization and passing data to * user-code functions. Keep in mind that this can lead to bugs when the user-code function of @@ -682,189 +538,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut // Registry for types and serializers // -------------------------------------------------------------------------------------------- - /** - * Adds a new Kryo default serializer to the Runtime. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer( - Class<?> type, T serializer) { - serializerConfig.addDefaultKryoSerializer(type, serializer); - } - - /** - * Adds a new Kryo default serializer to the Runtime. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void addDefaultKryoSerializer( - Class<?> type, Class<? extends Serializer<?>> serializerClass) { - serializerConfig.addDefaultKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with a Kryo Serializer. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer( - Class<?> type, T serializer) { - serializerConfig.registerTypeWithKryoSerializer(type, serializer); - } - - /** - * Registers the given Serializer via its class as a serializer for the given type at the - * KryoSerializer. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - @SuppressWarnings("rawtypes") - public void registerTypeWithKryoSerializer( - Class<?> type, Class<? extends Serializer> serializerClass) { - serializerConfig.registerTypeWithKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with the serialization stack. If the type is eventually serialized - * as a POJO, then the type is registered with the POJO serializer. If the type ends up being - * serialized with Kryo, then it will be registered at Kryo to make sure that only tags are - * written. - * - * @param type The class of the type to register. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void registerPojoType(Class<?> type) { - serializerConfig.registerPojoType(type); - } - - /** - * Registers the given type with the serialization stack. If the type is eventually serialized - * as a POJO, then the type is registered with the POJO serializer. If the type ends up being - * serialized with Kryo, then it will be registered at Kryo to make sure that only tags are - * written. - * - * @param type The class of the type to register. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void registerKryoType(Class<?> type) { - serializerConfig.registerKryoType(type); - } - - /** - * Returns the registered types with Kryo Serializers. - * - * @deprecated Use {@link SerializerConfig#getRegisteredTypesWithKryoSerializers}. - */ - @Deprecated - public LinkedHashMap<Class<?>, SerializableSerializer<?>> - getRegisteredTypesWithKryoSerializers() { - return serializerConfig.getRegisteredTypesWithKryoSerializers(); - } - - /** - * Returns the registered types with their Kryo Serializer classes. - * - * @deprecated Use {@link SerializerConfig#getRegisteredTypesWithKryoSerializerClasses}. - */ - @Deprecated - public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> - getRegisteredTypesWithKryoSerializerClasses() { - return serializerConfig.getRegisteredTypesWithKryoSerializerClasses(); - } - - /** - * Returns the registered default Kryo Serializers. - * - * @deprecated Use {@link SerializerConfig#getDefaultKryoSerializers}. - */ - @Deprecated - public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() { - return serializerConfig.getDefaultKryoSerializers(); - } - - /** - * Returns the registered default Kryo Serializer classes. - * - * @deprecated Use {@link SerializerConfig#getDefaultKryoSerializerClasses}. - */ - @Deprecated - public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> - getDefaultKryoSerializerClasses() { - return serializerConfig.getDefaultKryoSerializerClasses(); - } - - /** - * Returns the registered Kryo types. - * - * @deprecated Use {@link SerializerConfig#getRegisteredKryoTypes}. - */ - @Deprecated - public LinkedHashSet<Class<?>> getRegisteredKryoTypes() { - return serializerConfig.getRegisteredKryoTypes(); - } - - /** - * Returns the registered POJO types. - * - * @deprecated Use {@link SerializerConfig#getRegisteredPojoTypes}. - */ - @Deprecated - public LinkedHashSet<Class<?>> getRegisteredPojoTypes() { - return serializerConfig.getRegisteredPojoTypes(); - } - /** * Get if the auto type registration is disabled. * @@ -966,33 +639,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut // ------------------------------ Utilities ---------------------------------- - /** - * @deprecated The class is deprecated because instance-type serializer definition where - * serializers are serialized and written into the snapshot and deserialized for use is - * deprecated. Use class-type serializer definition instead, where only the class name is - * written into the snapshot and new instance of the serializer is created for use. This is - * a breaking change, and it will be removed in Flink 2.0. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - @Public - public static class SerializableSerializer<T extends Serializer<?> & Serializable> - implements Serializable { - private static final long serialVersionUID = 4687893502781067189L; - - private T serializer; - - public SerializableSerializer(T serializer) { - this.serializer = serializer; - } - - public T getSerializer() { - return serializer; - } - } - /** * Abstract class for a custom user configuration object registered at the execution config. * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java new file mode 100644 index 00000000000..cb2dbf056f4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; + +import com.esotericsoftware.kryo.Serializer; + +import java.io.Serializable; + +/** + * The wrapper to make serializer serializable. + * + * <p>This can be removed after {@link KryoSerializer} only allow serializer class. + */ +@Internal +public class SerializableSerializer<T extends Serializer<?> & Serializable> + implements Serializable { + private static final long serialVersionUID = 4687893502781067189L; + + private T serializer; + + public SerializableSerializer(T serializer) { + this.serializer = serializer; + } + + public T getSerializer() { + return serializer; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java index 43c5e927e13..556390b564c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -20,7 +20,6 @@ package org.apache.flink.api.common.serialization; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; @@ -128,30 +127,10 @@ public interface SerializerConfig extends Serializable { @Internal void registerKryoType(Class<?> type); - /** - * Returns the registered types with Kryo Serializers. - * - * @deprecated The method is deprecated because instance-type Kryo serializer definition based - * on {@link ExecutionConfig.SerializableSerializer} is deprecated. Use class-type Kryo - * serializers instead. - */ - @Deprecated - LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - getRegisteredTypesWithKryoSerializers(); - /** Returns the registered types with their Kryo Serializer classes. */ LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses(); - /** - * Returns the registered default Kryo Serializers. - * - * @deprecated The method is deprecated because {@link ExecutionConfig.SerializableSerializer} - * is deprecated. - */ - @Deprecated - LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> getDefaultKryoSerializers(); - /** Returns the registered default Kryo Serializer classes. */ LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java index b4ad103c94f..d42695e0e6a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -19,7 +19,7 @@ package org.apache.flink.api.common.serialization; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -55,14 +55,14 @@ public final class SerializerConfigImpl implements SerializerConfig { // we store them in linked maps/sets to ensure they are registered in order in all kryo // instances. - private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - registeredTypesWithKryoSerializers = new LinkedHashMap<>(); + private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = + new LinkedHashMap<>(); private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); - private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - defaultKryoSerializers = new LinkedHashMap<>(); + private LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = + new LinkedHashMap<>(); private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>(); @@ -104,7 +104,7 @@ public final class SerializerConfigImpl implements SerializerConfig { throw new NullPointerException("Cannot register null class or serializer."); } - defaultKryoSerializers.put(type, new ExecutionConfig.SerializableSerializer<>(serializer)); + defaultKryoSerializers.put(type, new SerializableSerializer<>(serializer)); } /** @@ -137,8 +137,7 @@ public final class SerializerConfigImpl implements SerializerConfig { throw new NullPointerException("Cannot register null class or serializer."); } - registeredTypesWithKryoSerializers.put( - type, new ExecutionConfig.SerializableSerializer<>(serializer)); + registeredTypesWithKryoSerializers.put(type, new SerializableSerializer<>(serializer)); } /** @@ -192,7 +191,7 @@ public final class SerializerConfigImpl implements SerializerConfig { } /** Returns the registered types with Kryo Serializers. */ - public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> + public LinkedHashMap<Class<?>, SerializableSerializer<?>> getRegisteredTypesWithKryoSerializers() { return registeredTypesWithKryoSerializers; } @@ -204,8 +203,7 @@ public final class SerializerConfigImpl implements SerializerConfig { } /** Returns the registered default Kryo Serializers. */ - public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - getDefaultKryoSerializers() { + public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() { return defaultKryoSerializers; } @@ -359,21 +357,6 @@ public final class SerializerConfigImpl implements SerializerConfig { .getOptional(PipelineOptions.FORCE_KRYO_AVRO) .ifPresent(this::setForceKryoAvro); - configuration - .getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS) - .map(s -> parseKryoSerializersWithExceptionHandling(classLoader, s)) - .ifPresent(s -> this.defaultKryoSerializerClasses = s); - - configuration - .getOptional(PipelineOptions.POJO_REGISTERED_CLASSES) - .map(c -> loadClasses(c, classLoader, "Could not load pojo type to be registered.")) - .ifPresent(c -> this.registeredPojoTypes = c); - - configuration - .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES) - .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered.")) - .ifPresent(c -> this.registeredKryoTypes = c); - try { configuration .getOptional(PipelineOptions.SERIALIZATION_CONFIG) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java index afd15c0ed89..d45146e91be 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java @@ -19,7 +19,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.util.Preconditions; import com.esotericsoftware.kryo.Kryo; @@ -60,8 +60,7 @@ public class KryoRegistration implements Serializable { * serializer definition type is {@link SerializerDefinitionType#INSTANCE}. */ @Nullable - private final ExecutionConfig.SerializableSerializer<? extends Serializer<?>> - serializableSerializerInstance; + private final SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance; private final SerializerDefinitionType serializerDefinitionType; @@ -86,8 +85,7 @@ public class KryoRegistration implements Serializable { public KryoRegistration( Class<?> registeredClass, - ExecutionConfig.SerializableSerializer<? extends Serializer<?>> - serializableSerializerInstance) { + SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance) { this.registeredClass = Preconditions.checkNotNull(registeredClass); this.serializerClass = null; @@ -111,8 +109,7 @@ public class KryoRegistration implements Serializable { } @Nullable - public ExecutionConfig.SerializableSerializer<? extends Serializer<?>> - getSerializableSerializerInstance() { + public SerializableSerializer<? extends Serializer<?>> getSerializableSerializerInstance() { return serializableSerializerInstance; } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index d585e01aff3..57745de38f2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -19,9 +19,9 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.java.typeutils.AvroUtils; @@ -129,8 +129,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // ------------------------------------------------------------------------ - private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - defaultSerializers; + private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultSerializers; private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses; /** @@ -158,8 +157,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // ------------------------------------------------------------------------ // legacy fields; these fields cannot yet be removed to retain backwards compatibility - private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - registeredTypesWithSerializers; + private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithSerializers; private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses; private LinkedHashSet<Class<?>> registeredTypes; @@ -172,7 +170,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { public KryoSerializer(Class<T> type, SerializerConfig serializerConfig) { this.type = checkNotNull(type); - this.defaultSerializers = serializerConfig.getDefaultKryoSerializers(); + this.defaultSerializers = + ((SerializerConfigImpl) serializerConfig).getDefaultKryoSerializers(); this.defaultSerializerClasses = serializerConfig.getDefaultKryoSerializerClasses(); this.kryoRegistrations = @@ -180,7 +179,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { this.type, serializerConfig.getRegisteredKryoTypes(), serializerConfig.getRegisteredTypesWithKryoSerializerClasses(), - serializerConfig.getRegisteredTypesWithKryoSerializers(), + ((SerializerConfigImpl) serializerConfig) + .getRegisteredTypesWithKryoSerializers(), serializerConfig.isForceKryoAvroEnabled()); } @@ -197,7 +197,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { CollectionUtil.newLinkedHashMapWithExpectedSize(toCopy.kryoRegistrations.size()); // deep copy the serializer instances in defaultSerializers - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry : + for (Map.Entry<Class<?>, SerializableSerializer<?>> entry : toCopy.defaultSerializers.entrySet()) { this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); @@ -211,7 +211,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { if (kryoRegistration.getSerializerDefinitionType() == KryoRegistration.SerializerDefinitionType.INSTANCE) { - ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializerInstance = + SerializableSerializer<? extends Serializer<?>> serializerInstance = kryoRegistration.getSerializableSerializerInstance(); if (serializerInstance != null) { @@ -552,7 +552,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // Add default serializers first, so that the type registrations without a serializer // are registered with a default serializer - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry : + for (Map.Entry<Class<?>, SerializableSerializer<?>> entry : defaultSerializers.entrySet()) { kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer()); } @@ -597,8 +597,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { LinkedHashSet<Class<?>> registeredTypes, LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses, - LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> - registeredTypesWithSerializers, + LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithSerializers, TernaryBoolean isForceAvroKryoEnabledOpt) { final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>(); @@ -620,9 +619,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { registeredTypeWithSerializerClassEntry.getValue())); } - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> - registeredTypeWithSerializerEntry : - checkNotNull(registeredTypesWithSerializers).entrySet()) { + for (Map.Entry<Class<?>, SerializableSerializer<?>> registeredTypeWithSerializerEntry : + checkNotNull(registeredTypesWithSerializers).entrySet()) { kryoRegistrations.put( registeredTypeWithSerializerEntry.getKey().getName(), @@ -675,8 +673,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> { } } - private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> deepCopySerializer( - ExecutionConfig.SerializableSerializer<? extends Serializer<?>> original) { + private SerializableSerializer<? extends Serializer<?>> deepCopySerializer( + SerializableSerializer<? extends Serializer<?>> original) { try { return InstantiationUtil.clone( original, Thread.currentThread().getContextClassLoader()); diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java index 67de997c033..33b29d8629b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; -import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java index 45a93c1d470..b527d8abd27 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; -import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java index 7c07fc0e4ca..fd5bf7fa643 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.typeutils.AvroUtils; @@ -89,7 +90,7 @@ public class Serializers { if (type.isArray()) { recursivelyRegisterType(type.getComponentType(), config, alreadySeen); } else { - config.registerKryoType(type); + ((SerializerConfigImpl) config).registerKryoType(type); // add serializers for Avro type if necessary AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 0c2a9f2aaa5..984505dbafb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -226,75 +226,6 @@ public class PipelineOptions { + " data to user-code functions will be reused. Keep in mind that this can lead to bugs when the" + " user-code function of an operation is not aware of this behaviour."); - /** - * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS = - key("pipeline.default-kryo-serializers") - .stringType() - .asList() - .noDefaultValue() - .withDescription( - Description.builder() - .text( - "Semicolon separated list of pairs of class names and Kryo serializers class names to be used" - + " as Kryo default serializers") - .linebreak() - .linebreak() - .text("Example:") - .linebreak() - .add( - TextElement.code( - "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;" - + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2")) - .build()); - - /** - * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES = - key("pipeline.registered-kryo-types") - .stringType() - .asList() - .noDefaultValue() - .withDescription( - Description.builder() - .text( - "Semicolon separated list of types to be registered with the serialization stack. If the type" - + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the" - + " type ends up being serialized with Kryo, then it will be registered at Kryo to make" - + " sure that only tags are written.") - .build()); - - /** - * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES = - key("pipeline.registered-pojo-types") - .stringType() - .asList() - .noDefaultValue() - .withDescription( - Description.builder() - .text( - "Semicolon separated list of types to be registered with the serialization stack. If the type" - + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the" - + " type ends up being serialized with Kryo, then it will be registered at Kryo to make" - + " sure that only tags are written.") - .build()); - public static final ConfigOption<List<String>> SERIALIZATION_CONFIG = key("pipeline.serialization-config") .stringType() diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index a361777a912..2fdc3ec08de 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -54,12 +54,12 @@ public class ExecutionConfigTest { List<Class<?>> expectedTypes = Arrays.asList(Double.class, Integer.class); for (Class<?> tpe : types) { - config.getSerializerConfig().registerKryoType(tpe); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(tpe); } int counter = 0; - for (Class<?> tpe : config.getRegisteredKryoTypes()) { + for (Class<?> tpe : config.getSerializerConfig().getRegisteredKryoTypes()) { assertThat(tpe).isEqualTo(expectedTypes.get(counter++)); } @@ -176,74 +176,38 @@ public class ExecutionConfigTest { @Test void testLoadingRegisteredKryoTypesFromConfiguration() { - ExecutionConfig configFromSetters = new ExecutionConfig(); - configFromSetters.getSerializerConfig().registerKryoType(ExecutionConfigTest.class); - configFromSetters.getSerializerConfig().registerKryoType(TestSerializer1.class); - ExecutionConfig configFromConfiguration = new ExecutionConfig(); Configuration configuration = new Configuration(); - configuration.setString( - "pipeline.registered-kryo-types", - "org.apache.flink.api.common.ExecutionConfigTest;" - + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1"); + String serializationConfigStr = + "{org.apache.flink.api.common.ExecutionConfigTest: {type: kryo}, " + + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: kryo}}"; + configuration.setString("pipeline.serialization-config", serializationConfigStr); // mutate config according to configuration configFromConfiguration.configure( configuration, Thread.currentThread().getContextClassLoader()); - assertThat(configFromConfiguration.getRegisteredKryoTypes()) - .isEqualTo(configFromSetters.getRegisteredKryoTypes()); + assertThat(configFromConfiguration.getSerializerConfig().getRegisteredKryoTypes()) + .containsExactlyInAnyOrder(ExecutionConfigTest.class, TestSerializer1.class); } @Test void testLoadingRegisteredPojoTypesFromConfiguration() { - ExecutionConfig configFromSetters = new ExecutionConfig(); - configFromSetters.registerPojoType(ExecutionConfigTest.class); - configFromSetters.registerPojoType(TestSerializer1.class); - ExecutionConfig configFromConfiguration = new ExecutionConfig(); Configuration configuration = new Configuration(); - configuration.setString( - "pipeline.registered-pojo-types", - "org.apache.flink.api.common.ExecutionConfigTest;" - + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1"); + String serializationConfigStr = + "{org.apache.flink.api.common.ExecutionConfigTest: {type: pojo}, " + + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: pojo}}"; + configuration.setString("pipeline.serialization-config", serializationConfigStr); // mutate config according to configuration configFromConfiguration.configure( configuration, Thread.currentThread().getContextClassLoader()); - assertThat(configFromConfiguration.getRegisteredPojoTypes()) - .isEqualTo(configFromSetters.getRegisteredPojoTypes()); - } - - @Test - void testLoadingDefaultKryoSerializersFromConfiguration() { - ExecutionConfig configFromSetters = new ExecutionConfig(); - configFromSetters - .getSerializerConfig() - .addDefaultKryoSerializer(ExecutionConfigTest.class, TestSerializer1.class); - configFromSetters - .getSerializerConfig() - .addDefaultKryoSerializer(TestSerializer1.class, TestSerializer2.class); - - ExecutionConfig configFromConfiguration = new ExecutionConfig(); - - Configuration configuration = new Configuration(); - configuration.setString( - "pipeline.default-kryo-serializers", - "class:org.apache.flink.api.common.ExecutionConfigTest," - + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;" - + "class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1," - + "serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2"); - - // mutate config according to configuration - configFromConfiguration.configure( - configuration, Thread.currentThread().getContextClassLoader()); - - assertThat(configFromConfiguration.getSerializerConfig().getDefaultKryoSerializers()) - .isEqualTo(configFromSetters.getSerializerConfig().getDefaultKryoSerializers()); + assertThat(configFromConfiguration.getSerializerConfig().getRegisteredPojoTypes()) + .containsExactlyInAnyOrder(ExecutionConfigTest.class, TestSerializer1.class); } @Test @@ -268,8 +232,10 @@ public class ExecutionConfigTest { @Test void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() { ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(ExecutionConfigTest.class); - config.getSerializerConfig().registerKryoType(TestSerializer1.class); + ((SerializerConfigImpl) config.getSerializerConfig()) + .registerKryoType(ExecutionConfigTest.class); + ((SerializerConfigImpl) config.getSerializerConfig()) + .registerKryoType(TestSerializer1.class); Configuration configuration = new Configuration(); @@ -279,16 +245,17 @@ public class ExecutionConfigTest { LinkedHashSet<Object> set = new LinkedHashSet<>(); set.add(ExecutionConfigTest.class); set.add(TestSerializer1.class); - assertThat(config.getRegisteredKryoTypes()).isEqualTo(set); + assertThat(config.getSerializerConfig().getRegisteredKryoTypes()).isEqualTo(set); } @Test void testNotOverridingRegisteredPojoTypesWithDefaultsFromConfiguration() { ExecutionConfig config = new ExecutionConfig(); - config.registerPojoType(ExecutionConfigTest.class); - config.registerPojoType(TestSerializer1.class); - Configuration configuration = new Configuration(); + String serializationConfigStr = + "{org.apache.flink.api.common.ExecutionConfigTest: {type: pojo}, " + + "org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: pojo}}"; + configuration.setString("pipeline.serialization-config", serializationConfigStr); // mutate config according to configuration config.configure(configuration, Thread.currentThread().getContextClassLoader()); @@ -296,7 +263,7 @@ public class ExecutionConfigTest { LinkedHashSet<Object> set = new LinkedHashSet<>(); set.add(ExecutionConfigTest.class); set.add(TestSerializer1.class); - assertThat(config.getRegisteredPojoTypes()).isEqualTo(set); + assertThat(config.getSerializerConfig().getRegisteredPojoTypes()).isEqualTo(set); } @Test @@ -317,10 +284,9 @@ public class ExecutionConfigTest { @Test void testNotOverridingDefaultKryoSerializersFromConfiguration() { ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig() - .addDefaultKryoSerializer(ExecutionConfigTest.class, TestSerializer1.class); - config.getSerializerConfig() - .addDefaultKryoSerializer(TestSerializer1.class, TestSerializer2.class); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); + serializerConfig.addDefaultKryoSerializer(ExecutionConfigTest.class, TestSerializer1.class); + serializerConfig.addDefaultKryoSerializer(TestSerializer1.class, TestSerializer2.class); Configuration configuration = new Configuration(); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java index 96799e42f51..842ddea9655 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java @@ -20,7 +20,6 @@ package org.apache.flink.api.common.serialization; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import com.esotericsoftware.kryo.Kryo; @@ -33,39 +32,16 @@ import java.io.Serializable; import java.lang.reflect.Type; import java.util.AbstractMap; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import static org.apache.flink.configuration.PipelineOptions.KRYO_DEFAULT_SERIALIZERS; -import static org.apache.flink.configuration.PipelineOptions.KRYO_REGISTERED_CLASSES; -import static org.apache.flink.configuration.PipelineOptions.POJO_REGISTERED_CLASSES; import static org.apache.flink.configuration.PipelineOptions.SERIALIZATION_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class SerializerConfigImplTest { - private static final Map<ConfigOption<List<String>>, String> configs = new HashMap<>(); - - static { - configs.put( - KRYO_DEFAULT_SERIALIZERS, - "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest," - + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;" - + "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1," - + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2"); - configs.put( - KRYO_REGISTERED_CLASSES, - "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" - + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); - configs.put( - POJO_REGISTERED_CLASSES, - "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" - + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); - } - @Test void testReadingDefaultConfig() { SerializerConfig config = new SerializerConfigImpl(); @@ -79,7 +55,7 @@ class SerializerConfigImplTest { @Test void testDoubleTypeRegistration() { - SerializerConfig config = new SerializerConfigImpl(); + SerializerConfigImpl config = new SerializerConfigImpl(); List<Class<?>> types = Arrays.asList(Double.class, Integer.class, Double.class); List<Class<?>> expectedTypes = Arrays.asList(Double.class, Integer.class); @@ -96,72 +72,9 @@ class SerializerConfigImplTest { assertThat(expectedTypes).hasSize(counter); } - @Test - void testLoadingRegisteredKryoTypesFromConfiguration() { - SerializerConfig configFromSetters = new SerializerConfigImpl(); - configFromSetters.registerKryoType(SerializerConfigImplTest.class); - configFromSetters.registerKryoType(SerializerConfigImplTest.TestSerializer1.class); - - SerializerConfig configFromConfiguration = new SerializerConfigImpl(); - - Configuration configuration = new Configuration(); - configuration.setString( - KRYO_REGISTERED_CLASSES.key(), configs.get(KRYO_REGISTERED_CLASSES)); - - // mutate config according to configuration - configFromConfiguration.configure( - configuration, Thread.currentThread().getContextClassLoader()); - - assertThat(configFromConfiguration.getRegisteredKryoTypes()) - .isEqualTo(configFromSetters.getRegisteredKryoTypes()); - } - - @Test - void testLoadingRegisteredPojoTypesFromConfiguration() { - SerializerConfig configFromSetters = new SerializerConfigImpl(); - configFromSetters.registerPojoType(SerializerConfigImplTest.class); - configFromSetters.registerPojoType(SerializerConfigImplTest.TestSerializer1.class); - - SerializerConfig configFromConfiguration = new SerializerConfigImpl(); - - Configuration configuration = new Configuration(); - configuration.setString( - POJO_REGISTERED_CLASSES.key(), configs.get(POJO_REGISTERED_CLASSES)); - - // mutate config according to configuration - configFromConfiguration.configure( - configuration, Thread.currentThread().getContextClassLoader()); - - assertThat(configFromConfiguration.getRegisteredPojoTypes()) - .isEqualTo(configFromSetters.getRegisteredPojoTypes()); - } - - @Test - void testLoadingDefaultKryoSerializersFromConfiguration() { - SerializerConfig configFromSetters = new SerializerConfigImpl(); - configFromSetters.addDefaultKryoSerializer( - SerializerConfigImplTest.class, SerializerConfigImplTest.TestSerializer1.class); - configFromSetters.addDefaultKryoSerializer( - SerializerConfigImplTest.TestSerializer1.class, - SerializerConfigImplTest.TestSerializer2.class); - - SerializerConfig configFromConfiguration = new SerializerConfigImpl(); - - Configuration configuration = new Configuration(); - configuration.setString( - KRYO_DEFAULT_SERIALIZERS.key(), configs.get(KRYO_DEFAULT_SERIALIZERS)); - - // mutate config according to configuration - configFromConfiguration.configure( - configuration, Thread.currentThread().getContextClassLoader()); - - assertThat(configFromConfiguration.getDefaultKryoSerializers()) - .isEqualTo(configFromSetters.getDefaultKryoSerializers()); - } - @Test void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() { - SerializerConfig config = new SerializerConfigImpl(); + SerializerConfigImpl config = new SerializerConfigImpl(); config.registerKryoType(SerializerConfigImplTest.class); config.registerKryoType(SerializerConfigImplTest.TestSerializer1.class); @@ -178,7 +91,7 @@ class SerializerConfigImplTest { @Test void testNotOverridingRegisteredPojoTypesWithDefaultsFromConfiguration() { - SerializerConfig config = new SerializerConfigImpl(); + SerializerConfigImpl config = new SerializerConfigImpl(); config.registerPojoType(SerializerConfigImplTest.class); config.registerPojoType(SerializerConfigImplTest.TestSerializer1.class); @@ -195,7 +108,7 @@ class SerializerConfigImplTest { @Test void testNotOverridingDefaultKryoSerializersFromConfiguration() { - SerializerConfig config = new SerializerConfigImpl(); + SerializerConfigImpl config = new SerializerConfigImpl(); config.addDefaultKryoSerializer( SerializerConfigImplTest.class, SerializerConfigImplTest.TestSerializer1.class); config.addDefaultKryoSerializer( @@ -311,9 +224,8 @@ class SerializerConfigImplTest { @Test void testCopySerializerConfig() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); Configuration configuration = new Configuration(); - configs.forEach((k, v) -> configuration.setString(k.key(), v)); serializerConfig.configure(configuration, SerializerConfigImplTest.class.getClassLoader()); serializerConfig diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java index 6752d0a0dde..c9ca4700408 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java @@ -127,7 +127,7 @@ class StateDescriptorTest { .isEqualTo(-1); final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(File.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(File.class); final TestStateDescriptor<Path> original = new TestStateDescriptor<>("test", Path.class); TestStateDescriptor<Path> clone = CommonTestUtils.createCopySerializable(original); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index f0197003c02..709be364fd1 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; -import org.apache.flink.api.common.serialization.SerializerConfig; import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; @@ -370,7 +369,7 @@ class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserC */ @Test void testReconfigureDifferentSubclassRegistrationOrder() throws Exception { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(SubTestUserClassA.class); serializerConfig.registerPojoType(SubTestUserClassB.class); @@ -531,7 +530,7 @@ class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserC // instantiate new PojoSerializer, with new execution config that has the subclass // registrations - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(SubTestUserClassA.class); serializerConfig.registerPojoType(SubTestUserClassB.class); pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(serializerConfig); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java index 98861d32371..6f2966c1f7d 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java @@ -609,7 +609,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createPriorSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(SubclassPojoWithIntField.class); TypeSerializer<StaticSchemaPojo> serializer = @@ -646,7 +646,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(SubclassPojoWithStringField.class); TypeSerializer<StaticSchemaPojo> serializer = @@ -734,7 +734,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createPriorSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class); serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); @@ -756,7 +756,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); // different registration order than setup serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class); @@ -795,7 +795,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createPriorSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class); serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); @@ -817,7 +817,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); // missing registration for subclass A compared to setup serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); @@ -875,7 +875,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); // new registration for subclass A compared to setup serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class); @@ -912,7 +912,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createPriorSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class); serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); @@ -934,7 +934,7 @@ public class PojoSerializerUpgradeTestSpecifications { @Override public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() { - SerializerConfig serializerConfig = new SerializerConfigImpl(); + SerializerConfigImpl serializerConfig = new SerializerConfigImpl(); serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class); serializerConfig.registerPojoType(StaticSchemaPojoSubclassC.class); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java index 411b373b282..6df63073a6c 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java @@ -17,18 +17,24 @@ package org.apache.flink.tests.scala; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** Simple batch job in pure Java that uses a custom Kryo serializer. */ public class JavaJobWithKryoSerializer { public static void main(String[] args) throws Exception { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration configuration = new Configuration(); + configuration.setString( + PipelineOptions.SERIALIZATION_CONFIG.key(), + "{org.apache.flink.tests.scala.NonPojo: " + + "{type: kryo, kryo-type: default, class: org.apache.flink.tests.scala.NonPojoSerializer}}"); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); // we want to go through serialization to check for kryo issues env.disableOperatorChaining(); - env.addDefaultKryoSerializer(NonPojo.class, NonPojoSerializer.class); - env.fromData(new NonPojo()).map(x -> x); env.execute(); diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java index c54fe3731cd..55df2f78f4b 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java @@ -18,8 +18,9 @@ package org.apache.flink.formats.avro.utils; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.SerializableSerializer; import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.AvroUtils; @@ -48,9 +49,10 @@ public class AvroKryoSerializerUtils extends AvroUtils { // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type // because Kryo is not able to serialize them properly, we use this serializer for them - reg.registerTypeWithKryoSerializer( - GenericData.Array.class, - Serializers.SpecificInstanceCollectionSerializerForArrayList.class); + ((SerializerConfigImpl) reg) + .registerTypeWithKryoSerializer( + GenericData.Array.class, + Serializers.SpecificInstanceCollectionSerializerForArrayList.class); // We register this serializer for users who want to use untyped Avro records // (GenericData.Record). @@ -59,7 +61,8 @@ public class AvroKryoSerializerUtils extends AvroUtils { // a bad idea. // we add the serializer as a default serializer because Avro is using a private // sub-type at runtime. - reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); + ((SerializerConfigImpl) reg) + .addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class); } } @@ -70,7 +73,7 @@ public class AvroKryoSerializerUtils extends AvroUtils { GenericData.Array.class.getName(), new KryoRegistration( GenericData.Array.class, - new ExecutionConfig.SerializableSerializer<>( + new SerializableSerializer<>( new Serializers .SpecificInstanceCollectionSerializerForArrayList()))); } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java index 7bc51fd140e..1a02ae21224 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java @@ -308,7 +308,7 @@ class AvroTypeExtractionTest { // test if automatic registration of the Types worked ExecutionConfig ec = env.getConfig(); - assertThat(ec.getRegisteredKryoTypes()).contains(Fixed16.class); + assertThat(ec.getSerializerConfig().getRegisteredKryoTypes()).contains(Fixed16.class); switch (fieldName) { case "name": diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index ee2ce212f4c..eb0d7ec4143 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -42,7 +42,6 @@ import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; @@ -69,11 +68,9 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.WrappingRuntimeException; -import com.esotericsoftware.kryo.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -282,85 +279,6 @@ public class ExecutionEnvironment { return this.lastJobExecutionResult; } - // -------------------------------------------------------------------------------------------- - // Registry for types and serializers - // -------------------------------------------------------------------------------------------- - - /** - * Adds a new Kryo default serializer to the Runtime. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - */ - public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer( - Class<?> type, T serializer) { - config.getSerializerConfig().addDefaultKryoSerializer(type, serializer); - } - - /** - * Adds a new Kryo default serializer to the Runtime. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - */ - public void addDefaultKryoSerializer( - Class<?> type, Class<? extends Serializer<?>> serializerClass) { - config.getSerializerConfig().addDefaultKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with a Kryo Serializer. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - */ - public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer( - Class<?> type, T serializer) { - config.getSerializerConfig().registerTypeWithKryoSerializer(type, serializer); - } - - /** - * Registers the given Serializer via its class as a serializer for the given type at the - * KryoSerializer. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - */ - public void registerTypeWithKryoSerializer( - Class<?> type, Class<? extends Serializer<?>> serializerClass) { - config.getSerializerConfig().registerTypeWithKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with the serialization stack. If the type is eventually serialized - * as a POJO, then the type is registered with the POJO serializer. If the type ends up being - * serialized with Kryo, then it will be registered at Kryo to make sure that only tags are - * written. - * - * @param type The class of the type to register. - */ - public void registerType(Class<?> type) { - if (type == null) { - throw new NullPointerException("Cannot register null type class."); - } - - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type); - - if (typeInfo instanceof PojoTypeInfo) { - config.getSerializerConfig().registerPojoType(type); - } else { - config.getSerializerConfig().registerKryoType(type); - } - } - /** * Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link * PipelineOptions#CACHED_FILES}. It will reconfigure {@link ExecutionEnvironment} and {@link diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java index 97f9a07b20f..64d328675f9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java @@ -180,30 +180,22 @@ public class PlanGenerator { private int getNumberOfRegisteredTypes() { return config.getSerializerConfig().getRegisteredKryoTypes().size() + config.getSerializerConfig().getRegisteredPojoTypes().size() - + config.getSerializerConfig().getRegisteredTypesWithKryoSerializerClasses().size() - + config.getSerializerConfig().getRegisteredTypesWithKryoSerializers().size(); + + config.getSerializerConfig().getRegisteredTypesWithKryoSerializerClasses().size(); } private int getNumberOfDefaultKryoSerializers() { - return config.getSerializerConfig().getDefaultKryoSerializers().size() - + config.getSerializerConfig().getDefaultKryoSerializerClasses().size(); + return config.getSerializerConfig().getDefaultKryoSerializerClasses().size(); } private void logDebuggingTypeDetails() { LOG.debug( "Registered Kryo types: {}", config.getSerializerConfig().getRegisteredKryoTypes().toString()); - LOG.debug( - "Registered Kryo with Serializers types: {}", - config.getSerializerConfig().getRegisteredTypesWithKryoSerializers().entrySet()); LOG.debug( "Registered Kryo with Serializer Classes types: {}", config.getSerializerConfig() .getRegisteredTypesWithKryoSerializerClasses() .entrySet()); - LOG.debug( - "Registered Kryo default Serializers: {}", - config.getSerializerConfig().getDefaultKryoSerializers().entrySet()); LOG.debug( "Registered Kryo default Serializers Classes {}", config.getSerializerConfig().getDefaultKryoSerializerClasses().entrySet()); diff --git a/flink-python/pyflink/common/execution_config.py b/flink-python/pyflink/common/execution_config.py index b83369f60e5..caad612659e 100644 --- a/flink-python/pyflink/common/execution_config.py +++ b/flink-python/pyflink/common/execution_config.py @@ -477,20 +477,6 @@ class ExecutionConfig(object): self._j_execution_config.setGlobalJobParameters(j_global_job_parameters) return self - def get_registered_types_with_kryo_serializer_classes(self) -> Dict[str, str]: - """ - Returns the registered types with their Kryo Serializer classes. - - :return: The dict which the keys are full-qualified java class names of the registered - types and the values are full-qualified java class names of the Kryo Serializer - classes. - """ - j_clz_map = self._j_execution_config.getRegisteredTypesWithKryoSerializerClasses() - registered_serializers = {} - for key in j_clz_map: - registered_serializers[key.getName()] = j_clz_map[key].getName() - return registered_serializers - def get_default_kryo_serializer_classes(self) -> Dict[str, str]: """ Returns the registered default Kryo Serializer classes. diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py index bec8c968525..e08e5a23e83 100644 --- a/flink-python/pyflink/common/tests/test_execution_config.py +++ b/flink-python/pyflink/common/tests/test_execution_config.py @@ -109,26 +109,6 @@ class ExecutionConfigTests(PyFlinkTestCase): self.assertEqual(self.execution_config.get_execution_mode(), ExecutionMode.PIPELINED_FORCED) - def test_disable_enable_force_kryo(self): - - self.execution_config.disable_force_kryo() - - self.assertFalse(self.execution_config.is_force_kryo_enabled()) - - self.execution_config.enable_force_kryo() - - self.assertTrue(self.execution_config.is_force_kryo_enabled()) - - def test_disable_enable_generic_types(self): - - self.execution_config.disable_generic_types() - - self.assertTrue(self.execution_config.has_generic_types_disabled()) - - self.execution_config.enable_generic_types() - - self.assertFalse(self.execution_config.has_generic_types_disabled()) - def test_disable_enable_auto_generated_uids(self): self.execution_config.disable_auto_generated_uids() @@ -139,16 +119,6 @@ class ExecutionConfigTests(PyFlinkTestCase): self.assertTrue(self.execution_config.has_auto_generated_uids_enabled()) - def test_disable_enable_force_avro(self): - - self.execution_config.disable_force_avro() - - self.assertFalse(self.execution_config.is_force_avro_enabled()) - - self.execution_config.enable_force_avro() - - self.assertTrue(self.execution_config.is_force_avro_enabled()) - def test_disable_enable_object_reuse(self): self.execution_config.disable_object_reuse() diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 2f135919608..8ecbcda471e 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.CheckpointListener; @@ -377,11 +378,10 @@ public abstract class AbstractQueryableStateTestBase { RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 1000L); // Custom serializer is not needed, it's used just to check if serialization works. - env.getConfig() - .getSerializerConfig() - .addDefaultKryoSerializer( - Byte.class, - (Serializer<?> & Serializable) createSerializer(userClassLoader)); + Class<Serializer<?>> customSerializerClass = + (Class<Serializer<?>>) userClassLoader.loadClass("CustomKryo"); + ((SerializerConfigImpl) env.getConfig().getSerializerConfig()) + .addDefaultKryoSerializer(Byte.class, customSerializerClass); // Here we *force* using Kryo, to check if custom serializers are handled correctly WRT // classloading diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b740a62e0c5..9993444bed0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -47,7 +47,6 @@ import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.MissingTypeInfo; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.CheckpointingOptions; @@ -110,11 +109,8 @@ import org.apache.flink.util.StringUtils; import org.apache.flink.util.TernaryBoolean; import org.apache.flink.util.WrappingRuntimeException; -import com.esotericsoftware.kryo.Serializer; - import javax.annotation.Nullable; -import java.io.Serializable; import java.net.URI; import java.time.Duration; import java.util.ArrayList; @@ -712,129 +708,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { return path == null ? null : new Path(path); } - // -------------------------------------------------------------------------------------------- - // Registry for types and serializers - // -------------------------------------------------------------------------------------------- - - /** - * Adds a new Kryo default serializer to the Runtime. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. Instance-type serializer definition - * where serializers are serialized and written into the snapshot and deserialized for use - * is deprecated as well. Use class-type serializer definition by {@link - * PipelineOptions#SERIALIZATION_CONFIG} instead, where only the class name is written into - * the snapshot and new instance of the serializer is created for use. This is a breaking - * change, and it will be removed in Flink 2.0. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer( - Class<?> type, T serializer) { - config.getSerializerConfig().addDefaultKryoSerializer(type, serializer); - } - - /** - * Adds a new Kryo default serializer to the Runtime. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void addDefaultKryoSerializer( - Class<?> type, Class<? extends Serializer<?>> serializerClass) { - config.getSerializerConfig().addDefaultKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with a Kryo Serializer. - * - * <p>Note that the serializer instance must be serializable (as defined by - * java.io.Serializable), because it may be distributed to the worker nodes by java - * serialization. - * - * @param type The class of the types serialized with the given serializer. - * @param serializer The serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. Instance-type serializer definition - * where serializers are serialized and written into the snapshot and deserialized for use - * is deprecated as well. Use class-type serializer definition by {@link - * PipelineOptions#SERIALIZATION_CONFIG} instead, where only the class name is written into - * the snapshot and new instance of the serializer is created for use. This is a breaking - * change, and it will be removed in Flink 2.0. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer( - Class<?> type, T serializer) { - config.getSerializerConfig().registerTypeWithKryoSerializer(type, serializer); - } - - /** - * Registers the given Serializer via its class as a serializer for the given type at the - * KryoSerializer. - * - * @param type The class of the types serialized with the given serializer. - * @param serializerClass The class of the serializer to use. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - @SuppressWarnings("rawtypes") - public void registerTypeWithKryoSerializer( - Class<?> type, Class<? extends Serializer> serializerClass) { - config.getSerializerConfig().registerTypeWithKryoSerializer(type, serializerClass); - } - - /** - * Registers the given type with the serialization stack. If the type is eventually serialized - * as a POJO, then the type is registered with the POJO serializer. If the type ends up being - * serialized with Kryo, then it will be registered at Kryo to make sure that only tags are - * written. - * - * @param type The class of the type to register. - * @deprecated Register data types and serializers through hard codes is deprecated, because you - * need to modify the codes when upgrading job version. You should configure this by config - * option {@link PipelineOptions#SERIALIZATION_CONFIG}. - * @see <a - * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink"> - * FLIP-398: Improve Serialization Configuration And Usage In Flink</a> - */ - @Deprecated - public void registerType(Class<?> type) { - if (type == null) { - throw new NullPointerException("Cannot register null type class."); - } - - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type); - - if (typeInfo instanceof PojoTypeInfo) { - config.getSerializerConfig().registerPojoType(type); - } else { - config.getSerializerConfig().registerKryoType(type); - } - } - // -------------------------------------------------------------------------------------------- // Time characteristic // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 3c7a2f4f3f0..20518e9a63c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -124,7 +124,7 @@ class OperatorStateBackendTest { .isFalse(); final ExecutionConfig cfg = new ExecutionConfig(); - cfg.getSerializerConfig() + ((SerializerConfigImpl) cfg.getSerializerConfig()) .registerTypeWithKryoSerializer( registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 7b8febbb7f8..1bfc9d4237e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ListState; @@ -559,8 +560,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { try { // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .addDefaultKryoSerializer( TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); @@ -633,8 +633,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { try { // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .addDefaultKryoSerializer( TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); @@ -704,8 +703,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { void testBackendUsesRegisteredKryoSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerTypeWithKryoSerializer( TestPojo.class, ExceptionThrowingTestSerializer.class); @@ -777,8 +775,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { CheckpointStreamFactory streamFactory = createStreamFactory(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerTypeWithKryoSerializer( TestPojo.class, ExceptionThrowingTestSerializer.class); @@ -899,7 +896,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // ====================================== restore snapshot // ====================================== - env.getExecutionConfig().getSerializerConfig().registerKryoType(TestPojo.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerKryoType(TestPojo.class); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); @@ -976,8 +974,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // ========== // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .addDefaultKryoSerializer( TestPojo.class, (Class) CustomKryoTestSerializer.class); @@ -1012,8 +1009,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // ========= // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .addDefaultKryoSerializer( TestPojo.class, (Class) CustomKryoTestSerializer.class); @@ -1084,8 +1080,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) // ========== - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); @@ -1118,8 +1113,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) // ========= - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); assertRestoreKeyedBackendFail(snapshot2, kvId); @@ -1138,8 +1132,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); // register A first then B - env.getExecutionConfig().getSerializerConfig().registerKryoType(TestNestedPojoClassA.class); - env.getExecutionConfig().getSerializerConfig().registerKryoType(TestNestedPojoClassB.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerKryoType(TestNestedPojoClassA.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerKryoType(TestNestedPojoClassB.class); CheckpointableKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); @@ -1208,11 +1204,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { env.close(); env = buildMockEnv(); - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerKryoType(TestNestedPojoClassB.class); // this time register B first - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerKryoType(TestNestedPojoClassA.class); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); @@ -1267,8 +1261,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); // register A first then B - env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); - env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerPojoType(TestNestedPojoClassA.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerPojoType(TestNestedPojoClassB.class); CheckpointableKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); @@ -1322,9 +1318,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { env.close(); env = buildMockEnv(); - env.getExecutionConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerPojoType(TestNestedPojoClassB.class); // this time register B first - env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) + .registerPojoType(TestNestedPojoClassA.class); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java index 5312db4ba9b..ac3946b327f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java @@ -36,7 +36,6 @@ import java.io.FileWriter; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigOptions.key; import static org.assertj.core.api.Assertions.assertThat; @@ -261,12 +260,6 @@ public class FlinkConfigLoaderTest { + ": " + "name:file1,path:'file:///tmp/file1';name:file2,path:'hdfs:///tmp/file2'" + "\n"); - fw.write( - "pipeline.default-kryo-serializers" - + ": " - + "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;" - + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2" - + "\n"); } Map<String, String> configuration = ConfigurationFileMigrationUtils.loadLegacyYAMLResource(file); @@ -288,19 +281,6 @@ public class FlinkConfigLoaderTest { assertThat(configuration.getOrDefault(TEST_CONFIG_KEY, null)) .isEqualTo(standardYamlConfig.getString(TEST_CONFIG_KEY, null)); - List<String> serializers = - ConfigurationUtils.convertToList( - configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key()), - String.class); - assertThat( - serializers.stream() - .map(ConfigurationUtils::parseStringToMap) - .collect(Collectors.toList())) - .isEqualTo( - standardYamlConfig.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS).stream() - .map(ConfigurationUtils::parseStringToMap) - .collect(Collectors.toList())); - List<String> cachedFiles = ConfigurationUtils.convertToList( configuration.get(PipelineOptions.CACHED_FILES.key()), String.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index c6bb0b6e5db..8207143a8a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -54,13 +56,16 @@ import static org.assertj.core.api.Assertions.assertThat; * * <p>The tests use an arbitrary generic type to validate the behavior. */ -@SuppressWarnings("serial") class StateDescriptorPassingTest { @Test void testReduceWindowState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); DataStream<File> src = env.fromData(new File("/")) @@ -92,8 +97,12 @@ class StateDescriptorPassingTest { @Test void testApplyWindowState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); DataStream<File> src = env.fromData(new File("/")) @@ -126,8 +135,12 @@ class StateDescriptorPassingTest { @Test void testProcessWindowState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); DataStream<File> src = env.fromData(new File("/")) @@ -160,8 +173,12 @@ class StateDescriptorPassingTest { @Test void testProcessAllWindowState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); // simulate ingestion time DataStream<File> src = @@ -187,8 +204,12 @@ class StateDescriptorPassingTest { @Test void testReduceWindowAllState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); // simulate ingestion time DataStream<File> src = @@ -214,8 +235,12 @@ class StateDescriptorPassingTest { @Test void testApplyWindowAllState() { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); + Configuration configuration = new Configuration(); + String serializerConfigStr = + "{java.io.File: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}}"; + configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializerConfigStr); + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); // simulate ingestion time DataStream<File> src = diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index d0eb27d0659..db05b4e4b6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.SerializerFactory; -import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -92,7 +92,7 @@ class StreamingRuntimeContextTest { void testValueStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(Path.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -113,7 +113,7 @@ class StreamingRuntimeContextTest { void testReducingStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(Path.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -139,7 +139,7 @@ class StreamingRuntimeContextTest { @Test void testAggregatingStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(Path.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -168,7 +168,7 @@ class StreamingRuntimeContextTest { void testListStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(Path.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -209,7 +209,7 @@ class StreamingRuntimeContextTest { void testMapStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - config.getSerializerConfig().registerKryoType(Path.class); + ((SerializerConfigImpl) config.getSerializerConfig()).registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -252,7 +252,7 @@ class StreamingRuntimeContextTest { void testV2ValueStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - SerializerConfig serializerConfig = config.getSerializerConfig(); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); serializerConfig.registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -276,7 +276,7 @@ class StreamingRuntimeContextTest { @Test void testV2ListStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - SerializerConfig serializerConfig = config.getSerializerConfig(); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); serializerConfig.registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -300,7 +300,7 @@ class StreamingRuntimeContextTest { @Test void testV2MapStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - SerializerConfig serializerConfig = config.getSerializerConfig(); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); serializerConfig.registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -328,7 +328,7 @@ class StreamingRuntimeContextTest { @Test void testV2ReducingStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - SerializerConfig serializerConfig = config.getSerializerConfig(); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); serializerConfig.registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); @@ -358,7 +358,7 @@ class StreamingRuntimeContextTest { @Test void testV2AggregatingStateInstantiation() throws Exception { final ExecutionConfig config = new ExecutionConfig(); - SerializerConfig serializerConfig = config.getSerializerConfig(); + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); serializerConfig.registerKryoType(Path.class); final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java index ca248ba9345..15b53ef3d9c 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -275,8 +276,7 @@ public class ChangelogStateBackendTestUtils { // ============================ restore snapshot =============================== - env.getExecutionConfig() - .getSerializerConfig() + ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig()) .registerKryoType(StateBackendTestBase.TestPojo.class); keyedBackend = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java index a441fc754d0..70cc78f2baf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java @@ -131,9 +131,10 @@ class StreamExecutionEnvironmentComplexConfigurationTest { void testLoadingKryoSerializersFromConfiguration() { Configuration configuration = new Configuration(); configuration.setString( - "pipeline.default-kryo-serializers", - "class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'" - + ",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'"); + "pipeline.serialization-config", + "{org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo:" + + " {type: kryo, kryo-type: default, class:" + + " org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer}}"); // mutate config according to configuration StreamExecutionEnvironment envFromConfiguration = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java index 76041cb216f..422323fcf8a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java @@ -34,9 +34,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.delegation.Planner; -import com.esotericsoftware.kryo.Serializer; - -import java.io.Serializable; import java.util.List; /** @@ -177,40 +174,6 @@ public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment return realExecEnv.getCheckpointingConsistencyMode(); } - @Override - public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer( - Class<?> type, T serializer) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported."); - } - - @Override - public void addDefaultKryoSerializer( - Class<?> type, Class<? extends Serializer<?>> serializerClass) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, addDefaultKryoSerializer method is unsupported."); - } - - @Override - public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer( - Class<?> type, T serializer) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported."); - } - - @Override - public void registerTypeWithKryoSerializer( - Class<?> type, Class<? extends Serializer> serializerClass) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, registerTypeWithKryoSerializer method is unsupported."); - } - - @Override - public void registerType(Class<?> type) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, registerType method is unsupported."); - } - @Override public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { throw new UnsupportedOperationException( diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java index cc80c8c4a4e..042fa97e610 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java @@ -1088,8 +1088,9 @@ public class GroupReduceITCase extends MultipleProgramsTestBaseJUnit4 { ExecutionConfig ec = env.getConfig(); // check if automatic type registration with Kryo worked - Assert.assertTrue(ec.getRegisteredKryoTypes().contains(BigInt.class)); - Assert.assertFalse(ec.getRegisteredKryoTypes().contains(java.sql.Date.class)); + Assert.assertTrue(ec.getSerializerConfig().getRegisteredKryoTypes().contains(BigInt.class)); + Assert.assertFalse( + ec.getSerializerConfig().getRegisteredKryoTypes().contains(java.sql.Date.class)); String expected = null; diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java deleted file mode 100644 index 7102842b3fe..00000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.runtime; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - -import static org.apache.flink.test.util.TestBaseUtils.compareResultCollections; - -/** Test registering types with Kryo. */ -@RunWith(Parameterized.class) -public class RegisterTypeWithKryoSerializerITCase extends MultipleProgramsTestBaseJUnit4 { - - public RegisterTypeWithKryoSerializerITCase(TestExecutionMode mode) { - super(mode); - } - - /** - * Tests whether the kryo serializer is forwarded via the ExecutionConfig. - * - * @throws Exception - */ - @Test - public void testRegisterTypeWithKryoSerializer() throws Exception { - int numElements = 10; - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - env.registerTypeWithKryoSerializer(TestClass.class, new TestClassSerializer()); - - DataSet<Long> input = env.generateSequence(0, numElements - 1); - - DataSet<TestClass> mapped = - input.map( - new MapFunction<Long, TestClass>() { - private static final long serialVersionUID = -529116076312998262L; - - @Override - public TestClass map(Long value) throws Exception { - return new TestClass(value); - } - }); - - List<TestClass> expected = new ArrayList<>(numElements); - - for (int i = 0; i < numElements; i++) { - expected.add(new TestClass(42)); - } - - compareResultCollections( - expected, - mapped.collect(), - new Comparator<TestClass>() { - @Override - public int compare(TestClass o1, TestClass o2) { - return (int) (o1.getValue() - o2.getValue()); - } - }); - } - - static class TestClass { - private final long value; - private Object obj = new Object(); - - public TestClass(long value) { - this.value = value; - } - - public long getValue() { - return value; - } - - @Override - public String toString() { - return "TestClass(" + value + ")"; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TestClass) { - TestClass other = (TestClass) obj; - - return value == other.value; - } else { - return false; - } - } - - @Override - public int hashCode() { - return (int) value; - } - } - - static class TestClassSerializer extends Serializer<TestClass> implements Serializable { - - private static final long serialVersionUID = -3585880741695717533L; - - @Override - public void write(Kryo kryo, Output output, TestClass testClass) { - output.writeLong(42); - } - - @Override - public TestClass read(Kryo kryo, Input input, Class<TestClass> aClass) { - return new TestClass(input.readLong()); - } - } -}
