This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6633719c728526a08344645e0a844f27d25629f4
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Sep 24 09:54:32 2024 +0800

    [FLINK-36366][core] Remove deprecated serializer related config option and 
method
    
    This closes #25396
---
 docs/content.zh/docs/dev/table/data_stream_api.md  |   2 -
 docs/content/docs/dev/table/data_stream_api.md     |   3 -
 .../apache/flink/api/common/ExecutionConfig.java   | 354 ---------------------
 .../flink/api/common/SerializableSerializer.java   |  47 +++
 .../api/common/serialization/SerializerConfig.java |  21 --
 .../common/serialization/SerializerConfigImpl.java |  35 +-
 .../java/typeutils/runtime/KryoRegistration.java   |  11 +-
 .../typeutils/runtime/kryo/KryoSerializer.java     |  34 +-
 .../runtime/kryo/KryoSerializerSnapshot.java       |   2 +-
 .../runtime/kryo/KryoSerializerSnapshotData.java   |   2 +-
 .../java/typeutils/runtime/kryo/Serializers.java   |   3 +-
 .../flink/configuration/PipelineOptions.java       |  69 ----
 .../flink/api/common/ExecutionConfigTest.java      |  88 ++---
 .../serialization/SerializerConfigImplTest.java    |  98 +-----
 .../api/common/state/StateDescriptorTest.java      |   2 +-
 .../java/typeutils/runtime/PojoSerializerTest.java |   5 +-
 .../PojoSerializerUpgradeTestSpecifications.java   |  18 +-
 .../tests/scala/JavaJobWithKryoSerializer.java     |  12 +-
 .../avro/utils/AvroKryoSerializerUtils.java        |  15 +-
 .../avro/typeutils/AvroTypeExtractionTest.java     |   2 +-
 .../flink/api/java/ExecutionEnvironment.java       |  82 -----
 .../apache/flink/api/java/utils/PlanGenerator.java |  12 +-
 flink-python/pyflink/common/execution_config.py    |  14 -
 .../pyflink/common/tests/test_execution_config.py  |  30 --
 .../itcases/AbstractQueryableStateTestBase.java    |  10 +-
 .../environment/StreamExecutionEnvironment.java    | 127 --------
 .../runtime/state/OperatorStateBackendTest.java    |   2 +-
 .../flink/runtime/state/StateBackendTestBase.java  |  51 ++-
 .../runtime/util/bash/FlinkConfigLoaderTest.java   |  20 --
 .../api/operators/StateDescriptorPassingTest.java  |  51 ++-
 .../api/operators/StreamingRuntimeContextTest.java |  22 +-
 .../changelog/ChangelogStateBackendTestUtils.java  |   4 +-
 ...ecutionEnvironmentComplexConfigurationTest.java |   7 +-
 .../utils/DummyStreamExecutionEnvironment.java     |  37 ---
 .../flink/test/operators/GroupReduceITCase.java    |   5 +-
 .../RegisterTypeWithKryoSerializerITCase.java      | 139 --------
 36 files changed, 233 insertions(+), 1203 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md 
b/docs/content.zh/docs/dev/table/data_stream_api.md
index c3492568a1f..d96c075f5e5 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -594,8 +594,6 @@ env = StreamExecutionEnvironment.get_execution_environment()
 # set various configuration early
 env.set_max_parallelism(256)
 
-env.get_config().add_default_kryo_serializer("type_class_name", 
"serializer_class_name")
-
 
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
 
 # then switch to Python Table API
diff --git a/docs/content/docs/dev/table/data_stream_api.md 
b/docs/content/docs/dev/table/data_stream_api.md
index a7217053b08..9f77e715fb1 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -592,9 +592,6 @@ env = StreamExecutionEnvironment.get_execution_environment()
 # set various configuration early
 env.set_max_parallelism(256)
 
-env.get_config().add_default_kryo_serializer("type_class_name", 
"serializer_class_name")
-env.get_config().add_default_kryo_serializer("type_class_name", 
"serializer_class_name")
-
 
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
 
 # then switch to Python Table API
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 1b0d66dd1ef..b2adee5ec22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -40,14 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Serializer;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -452,104 +448,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         return configuration.get(EXECUTION_MODE);
     }
 
-    /**
-     * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO. In
-     * some cases this might be preferable. For example, when using interfaces 
with subclasses that
-     * cannot be analyzed as POJO.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#FORCE_KRYO}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void enableForceKryo() {
-        serializerConfig.setForceKryo(true);
-    }
-
-    /**
-     * Disable use of Kryo serializer for all POJOs.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#FORCE_KRYO}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void disableForceKryo() {
-        serializerConfig.setForceKryo(false);
-    }
-
-    /** @deprecated Use {@link SerializerConfig#isForceKryoEnabled}. */
-    @Deprecated
-    public boolean isForceKryoEnabled() {
-        return serializerConfig.isForceKryoEnabled();
-    }
-
-    /**
-     * Enables the use generic types which are serialized via Kryo.
-     *
-     * <p>Generic types are enabled by default.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#GENERIC_TYPES}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     * @see #disableGenericTypes()
-     */
-    @Deprecated
-    public void enableGenericTypes() {
-        serializerConfig.setGenericTypes(true);
-    }
-
-    /**
-     * Disables the use of generic types (types that would be serialized via 
Kryo). If this option
-     * is used, Flink will throw an {@code UnsupportedOperationException} 
whenever it encounters a
-     * data type that would go through Kryo for serialization.
-     *
-     * <p>Disabling generic types can be helpful to eagerly find and eliminate 
the use of types that
-     * would go through Kryo serialization during runtime. Rather than 
checking types individually,
-     * using this option will throw exceptions eagerly in the places where 
generic types are used.
-     *
-     * <p><b>Important:</b> We recommend to use this option only during 
development and
-     * pre-production phases, not during actual production use. The 
application program and/or the
-     * input data may be such that new, previously unseen, types occur at some 
point. In that case,
-     * setting this option would cause the program to fail.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#GENERIC_TYPES}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     * @see #enableGenericTypes()
-     */
-    @Deprecated
-    public void disableGenericTypes() {
-        serializerConfig.setGenericTypes(false);
-    }
-
-    /**
-     * Checks whether generic types are supported. Generic types are types 
that go through Kryo
-     * during serialization.
-     *
-     * <p>Generic types are enabled by default.
-     *
-     * @deprecated Use {@link SerializerConfig#hasGenericTypesDisabled}.
-     * @see #enableGenericTypes()
-     * @see #disableGenericTypes()
-     */
-    @Deprecated
-    public boolean hasGenericTypesDisabled() {
-        return serializerConfig.hasGenericTypesDisabled();
-    }
-
     /**
      * Enables the Flink runtime to auto-generate UID's for operators.
      *
@@ -588,48 +486,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         return configuration.get(PipelineOptions.AUTO_GENERATE_UIDS);
     }
 
-    /**
-     * Forces Flink to use the Apache Avro serializer for POJOs.
-     *
-     * <p><b>Important:</b> Make sure to include the <i>flink-avro</i> module.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#FORCE_AVRO}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void enableForceAvro() {
-        serializerConfig.setForceAvro(true);
-    }
-
-    /**
-     * Disables the Apache Avro serializer as the forced serializer for POJOs.
-     *
-     * @deprecated Configure serialization behavior through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#FORCE_AVRO}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void disableForceAvro() {
-        serializerConfig.setForceAvro(false);
-    }
-
-    /**
-     * Returns whether the Apache Avro is the default serializer for POJOs.
-     *
-     * @deprecated Use {@link SerializerConfig#isForceAvroEnabled}.
-     */
-    @Deprecated
-    public boolean isForceAvroEnabled() {
-        return serializerConfig.isForceAvroEnabled();
-    }
-
     /**
      * Enables reusing objects that Flink internally uses for deserialization 
and passing data to
      * user-code functions. Keep in mind that this can lead to bugs when the 
user-code function of
@@ -682,189 +538,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
     //  Registry for types and serializers
     // 
--------------------------------------------------------------------------------------------
 
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public <T extends Serializer<?> & Serializable> void 
addDefaultKryoSerializer(
-            Class<?> type, T serializer) {
-        serializerConfig.addDefaultKryoSerializer(type, serializer);
-    }
-
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void addDefaultKryoSerializer(
-            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
-        serializerConfig.addDefaultKryoSerializer(type, serializerClass);
-    }
-
-    /**
-     * Registers the given type with a Kryo Serializer.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public <T extends Serializer<?> & Serializable> void 
registerTypeWithKryoSerializer(
-            Class<?> type, T serializer) {
-        serializerConfig.registerTypeWithKryoSerializer(type, serializer);
-    }
-
-    /**
-     * Registers the given Serializer via its class as a serializer for the 
given type at the
-     * KryoSerializer.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    @SuppressWarnings("rawtypes")
-    public void registerTypeWithKryoSerializer(
-            Class<?> type, Class<? extends Serializer> serializerClass) {
-        serializerConfig.registerTypeWithKryoSerializer(type, serializerClass);
-    }
-
-    /**
-     * Registers the given type with the serialization stack. If the type is 
eventually serialized
-     * as a POJO, then the type is registered with the POJO serializer. If the 
type ends up being
-     * serialized with Kryo, then it will be registered at Kryo to make sure 
that only tags are
-     * written.
-     *
-     * @param type The class of the type to register.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void registerPojoType(Class<?> type) {
-        serializerConfig.registerPojoType(type);
-    }
-
-    /**
-     * Registers the given type with the serialization stack. If the type is 
eventually serialized
-     * as a POJO, then the type is registered with the POJO serializer. If the 
type ends up being
-     * serialized with Kryo, then it will be registered at Kryo to make sure 
that only tags are
-     * written.
-     *
-     * @param type The class of the type to register.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void registerKryoType(Class<?> type) {
-        serializerConfig.registerKryoType(type);
-    }
-
-    /**
-     * Returns the registered types with Kryo Serializers.
-     *
-     * @deprecated Use {@link 
SerializerConfig#getRegisteredTypesWithKryoSerializers}.
-     */
-    @Deprecated
-    public LinkedHashMap<Class<?>, SerializableSerializer<?>>
-            getRegisteredTypesWithKryoSerializers() {
-        return serializerConfig.getRegisteredTypesWithKryoSerializers();
-    }
-
-    /**
-     * Returns the registered types with their Kryo Serializer classes.
-     *
-     * @deprecated Use {@link 
SerializerConfig#getRegisteredTypesWithKryoSerializerClasses}.
-     */
-    @Deprecated
-    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
-            getRegisteredTypesWithKryoSerializerClasses() {
-        return serializerConfig.getRegisteredTypesWithKryoSerializerClasses();
-    }
-
-    /**
-     * Returns the registered default Kryo Serializers.
-     *
-     * @deprecated Use {@link SerializerConfig#getDefaultKryoSerializers}.
-     */
-    @Deprecated
-    public LinkedHashMap<Class<?>, SerializableSerializer<?>> 
getDefaultKryoSerializers() {
-        return serializerConfig.getDefaultKryoSerializers();
-    }
-
-    /**
-     * Returns the registered default Kryo Serializer classes.
-     *
-     * @deprecated Use {@link 
SerializerConfig#getDefaultKryoSerializerClasses}.
-     */
-    @Deprecated
-    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
-            getDefaultKryoSerializerClasses() {
-        return serializerConfig.getDefaultKryoSerializerClasses();
-    }
-
-    /**
-     * Returns the registered Kryo types.
-     *
-     * @deprecated Use {@link SerializerConfig#getRegisteredKryoTypes}.
-     */
-    @Deprecated
-    public LinkedHashSet<Class<?>> getRegisteredKryoTypes() {
-        return serializerConfig.getRegisteredKryoTypes();
-    }
-
-    /**
-     * Returns the registered POJO types.
-     *
-     * @deprecated Use {@link SerializerConfig#getRegisteredPojoTypes}.
-     */
-    @Deprecated
-    public LinkedHashSet<Class<?>> getRegisteredPojoTypes() {
-        return serializerConfig.getRegisteredPojoTypes();
-    }
-
     /**
      * Get if the auto type registration is disabled.
      *
@@ -966,33 +639,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
 
     // ------------------------------ Utilities  
----------------------------------
 
-    /**
-     * @deprecated The class is deprecated because instance-type serializer 
definition where
-     *     serializers are serialized and written into the snapshot and 
deserialized for use is
-     *     deprecated. Use class-type serializer definition instead, where 
only the class name is
-     *     written into the snapshot and new instance of the serializer is 
created for use. This is
-     *     a breaking change, and it will be removed in Flink 2.0.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    @Public
-    public static class SerializableSerializer<T extends Serializer<?> & 
Serializable>
-            implements Serializable {
-        private static final long serialVersionUID = 4687893502781067189L;
-
-        private T serializer;
-
-        public SerializableSerializer(T serializer) {
-            this.serializer = serializer;
-        }
-
-        public T getSerializer() {
-            return serializer;
-        }
-    }
-
     /**
      * Abstract class for a custom user configuration object registered at the 
execution config.
      *
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java
new file mode 100644
index 00000000000..cb2dbf056f4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/SerializableSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+import com.esotericsoftware.kryo.Serializer;
+
+import java.io.Serializable;
+
+/**
+ * The wrapper to make serializer serializable.
+ *
+ * <p>This can be removed after {@link KryoSerializer} only allow serializer 
class.
+ */
+@Internal
+public class SerializableSerializer<T extends Serializer<?> & Serializable>
+        implements Serializable {
+    private static final long serialVersionUID = 4687893502781067189L;
+
+    private T serializer;
+
+    public SerializableSerializer(T serializer) {
+        this.serializer = serializer;
+    }
+
+    public T getSerializer() {
+        return serializer;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
index 43c5e927e13..556390b564c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.serialization;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
@@ -128,30 +127,10 @@ public interface SerializerConfig extends Serializable {
     @Internal
     void registerKryoType(Class<?> type);
 
-    /**
-     * Returns the registered types with Kryo Serializers.
-     *
-     * @deprecated The method is deprecated because instance-type Kryo 
serializer definition based
-     *     on {@link ExecutionConfig.SerializableSerializer} is deprecated. 
Use class-type Kryo
-     *     serializers instead.
-     */
-    @Deprecated
-    LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-            getRegisteredTypesWithKryoSerializers();
-
     /** Returns the registered types with their Kryo Serializer classes. */
     LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
             getRegisteredTypesWithKryoSerializerClasses();
 
-    /**
-     * Returns the registered default Kryo Serializers.
-     *
-     * @deprecated The method is deprecated because {@link 
ExecutionConfig.SerializableSerializer}
-     *     is deprecated.
-     */
-    @Deprecated
-    LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> 
getDefaultKryoSerializers();
-
     /** Returns the registered default Kryo Serializer classes. */
     LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
getDefaultKryoSerializerClasses();
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
index b4ad103c94f..d42695e0e6a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.serialization;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -55,14 +55,14 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
     // we store them in linked maps/sets to ensure they are registered in 
order in all kryo
     // instances.
 
-    private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-            registeredTypesWithKryoSerializers = new LinkedHashMap<>();
+    private LinkedHashMap<Class<?>, SerializableSerializer<?>> 
registeredTypesWithKryoSerializers =
+            new LinkedHashMap<>();
 
     private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
             registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
 
-    private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-            defaultKryoSerializers = new LinkedHashMap<>();
+    private LinkedHashMap<Class<?>, SerializableSerializer<?>> 
defaultKryoSerializers =
+            new LinkedHashMap<>();
 
     private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultKryoSerializerClasses =
             new LinkedHashMap<>();
@@ -104,7 +104,7 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
             throw new NullPointerException("Cannot register null class or 
serializer.");
         }
 
-        defaultKryoSerializers.put(type, new 
ExecutionConfig.SerializableSerializer<>(serializer));
+        defaultKryoSerializers.put(type, new 
SerializableSerializer<>(serializer));
     }
 
     /**
@@ -137,8 +137,7 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
             throw new NullPointerException("Cannot register null class or 
serializer.");
         }
 
-        registeredTypesWithKryoSerializers.put(
-                type, new 
ExecutionConfig.SerializableSerializer<>(serializer));
+        registeredTypesWithKryoSerializers.put(type, new 
SerializableSerializer<>(serializer));
     }
 
     /**
@@ -192,7 +191,7 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
     }
 
     /** Returns the registered types with Kryo Serializers. */
-    public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
+    public LinkedHashMap<Class<?>, SerializableSerializer<?>>
             getRegisteredTypesWithKryoSerializers() {
         return registeredTypesWithKryoSerializers;
     }
@@ -204,8 +203,7 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
     }
 
     /** Returns the registered default Kryo Serializers. */
-    public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-            getDefaultKryoSerializers() {
+    public LinkedHashMap<Class<?>, SerializableSerializer<?>> 
getDefaultKryoSerializers() {
         return defaultKryoSerializers;
     }
 
@@ -359,21 +357,6 @@ public final class SerializerConfigImpl implements 
SerializerConfig {
                 .getOptional(PipelineOptions.FORCE_KRYO_AVRO)
                 .ifPresent(this::setForceKryoAvro);
 
-        configuration
-                .getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
-                .map(s -> 
parseKryoSerializersWithExceptionHandling(classLoader, s))
-                .ifPresent(s -> this.defaultKryoSerializerClasses = s);
-
-        configuration
-                .getOptional(PipelineOptions.POJO_REGISTERED_CLASSES)
-                .map(c -> loadClasses(c, classLoader, "Could not load pojo 
type to be registered."))
-                .ifPresent(c -> this.registeredPojoTypes = c);
-
-        configuration
-                .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
-                .map(c -> loadClasses(c, classLoader, "Could not load kryo 
type to be registered."))
-                .ifPresent(c -> this.registeredKryoTypes = c);
-
         try {
             configuration
                     .getOptional(PipelineOptions.SERIALIZATION_CONFIG)
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
index afd15c0ed89..d45146e91be 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.util.Preconditions;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -60,8 +60,7 @@ public class KryoRegistration implements Serializable {
      * serializer definition type is {@link SerializerDefinitionType#INSTANCE}.
      */
     @Nullable
-    private final ExecutionConfig.SerializableSerializer<? extends 
Serializer<?>>
-            serializableSerializerInstance;
+    private final SerializableSerializer<? extends Serializer<?>> 
serializableSerializerInstance;
 
     private final SerializerDefinitionType serializerDefinitionType;
 
@@ -86,8 +85,7 @@ public class KryoRegistration implements Serializable {
 
     public KryoRegistration(
             Class<?> registeredClass,
-            ExecutionConfig.SerializableSerializer<? extends Serializer<?>>
-                    serializableSerializerInstance) {
+            SerializableSerializer<? extends Serializer<?>> 
serializableSerializerInstance) {
         this.registeredClass = Preconditions.checkNotNull(registeredClass);
 
         this.serializerClass = null;
@@ -111,8 +109,7 @@ public class KryoRegistration implements Serializable {
     }
 
     @Nullable
-    public ExecutionConfig.SerializableSerializer<? extends Serializer<?>>
-            getSerializableSerializerInstance() {
+    public SerializableSerializer<? extends Serializer<?>> 
getSerializableSerializerInstance() {
         return serializableSerializerInstance;
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index d585e01aff3..57745de38f2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -19,9 +19,9 @@
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.typeutils.AvroUtils;
@@ -129,8 +129,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
     // ------------------------------------------------------------------------
 
-    private final LinkedHashMap<Class<?>, 
ExecutionConfig.SerializableSerializer<?>>
-            defaultSerializers;
+    private final LinkedHashMap<Class<?>, SerializableSerializer<?>> 
defaultSerializers;
     private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultSerializerClasses;
 
     /**
@@ -158,8 +157,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
     // ------------------------------------------------------------------------
     // legacy fields; these fields cannot yet be removed to retain backwards 
compatibility
 
-    private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-            registeredTypesWithSerializers;
+    private LinkedHashMap<Class<?>, SerializableSerializer<?>> 
registeredTypesWithSerializers;
     private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
             registeredTypesWithSerializerClasses;
     private LinkedHashSet<Class<?>> registeredTypes;
@@ -172,7 +170,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
     public KryoSerializer(Class<T> type, SerializerConfig serializerConfig) {
         this.type = checkNotNull(type);
 
-        this.defaultSerializers = serializerConfig.getDefaultKryoSerializers();
+        this.defaultSerializers =
+                ((SerializerConfigImpl) 
serializerConfig).getDefaultKryoSerializers();
         this.defaultSerializerClasses = 
serializerConfig.getDefaultKryoSerializerClasses();
 
         this.kryoRegistrations =
@@ -180,7 +179,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                         this.type,
                         serializerConfig.getRegisteredKryoTypes(),
                         
serializerConfig.getRegisteredTypesWithKryoSerializerClasses(),
-                        
serializerConfig.getRegisteredTypesWithKryoSerializers(),
+                        ((SerializerConfigImpl) serializerConfig)
+                                .getRegisteredTypesWithKryoSerializers(),
                         serializerConfig.isForceKryoAvroEnabled());
     }
 
@@ -197,7 +197,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                 
CollectionUtil.newLinkedHashMapWithExpectedSize(toCopy.kryoRegistrations.size());
 
         // deep copy the serializer instances in defaultSerializers
-        for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> 
entry :
+        for (Map.Entry<Class<?>, SerializableSerializer<?>> entry :
                 toCopy.defaultSerializers.entrySet()) {
 
             this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
@@ -211,7 +211,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
             if (kryoRegistration.getSerializerDefinitionType()
                     == KryoRegistration.SerializerDefinitionType.INSTANCE) {
 
-                ExecutionConfig.SerializableSerializer<? extends 
Serializer<?>> serializerInstance =
+                SerializableSerializer<? extends Serializer<?>> 
serializerInstance =
                         kryoRegistration.getSerializableSerializerInstance();
 
                 if (serializerInstance != null) {
@@ -552,7 +552,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
             // Add default serializers first, so that the type registrations 
without a serializer
             // are registered with a default serializer
-            for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry :
+            for (Map.Entry<Class<?>, SerializableSerializer<?>> entry :
                     defaultSerializers.entrySet()) {
                 kryo.addDefaultSerializer(entry.getKey(), 
entry.getValue().getSerializer());
             }
@@ -597,8 +597,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
             LinkedHashSet<Class<?>> registeredTypes,
             LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
                     registeredTypesWithSerializerClasses,
-            LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-                    registeredTypesWithSerializers,
+            LinkedHashMap<Class<?>, SerializableSerializer<?>> 
registeredTypesWithSerializers,
             TernaryBoolean isForceAvroKryoEnabledOpt) {
 
         final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new 
LinkedHashMap<>();
@@ -620,9 +619,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                             
registeredTypeWithSerializerClassEntry.getValue()));
         }
 
-        for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
-                registeredTypeWithSerializerEntry :
-                        
checkNotNull(registeredTypesWithSerializers).entrySet()) {
+        for (Map.Entry<Class<?>, SerializableSerializer<?>> 
registeredTypeWithSerializerEntry :
+                checkNotNull(registeredTypesWithSerializers).entrySet()) {
 
             kryoRegistrations.put(
                     registeredTypeWithSerializerEntry.getKey().getName(),
@@ -675,8 +673,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
         }
     }
 
-    private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> 
deepCopySerializer(
-            ExecutionConfig.SerializableSerializer<? extends Serializer<?>> 
original) {
+    private SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
+            SerializableSerializer<? extends Serializer<?>> original) {
         try {
             return InstantiationUtil.clone(
                     original, Thread.currentThread().getContextClassLoader());
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
index 67de997c033..33b29d8629b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
index 45a93c1d470..b527d8abd27 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 7c07fc0e4ca..fd5bf7fa643 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.AvroUtils;
@@ -89,7 +90,7 @@ public class Serializers {
         if (type.isArray()) {
             recursivelyRegisterType(type.getComponentType(), config, 
alreadySeen);
         } else {
-            config.registerKryoType(type);
+            ((SerializerConfigImpl) config).registerKryoType(type);
             // add serializers for Avro type if necessary
             AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, 
type);
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
index 0c2a9f2aaa5..984505dbafb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
@@ -226,75 +226,6 @@ public class PipelineOptions {
                                     + " data to user-code functions will be 
reused. Keep in mind that this can lead to bugs when the"
                                     + " user-code function of an operation is 
not aware of this behaviour.");
 
-    /**
-     * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
-            key("pipeline.default-kryo-serializers")
-                    .stringType()
-                    .asList()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Semicolon separated list of pairs 
of class names and Kryo serializers class names to be used"
-                                                    + " as Kryo default 
serializers")
-                                    .linebreak()
-                                    .linebreak()
-                                    .text("Example:")
-                                    .linebreak()
-                                    .add(
-                                            TextElement.code(
-                                                    
"class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
-                                                            + " 
class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
-                                    .build());
-
-    /**
-     * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
-            key("pipeline.registered-kryo-types")
-                    .stringType()
-                    .asList()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Semicolon separated list of types 
to be registered with the serialization stack. If the type"
-                                                    + " is eventually 
serialized as a POJO, then the type is registered with the POJO serializer. If 
the"
-                                                    + " type ends up being 
serialized with Kryo, then it will be registered at Kryo to make"
-                                                    + " sure that only tags 
are written.")
-                                    .build());
-
-    /**
-     * @deprecated The config is subsumed by {@link #SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
-            key("pipeline.registered-pojo-types")
-                    .stringType()
-                    .asList()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Semicolon separated list of types 
to be registered with the serialization stack. If the type"
-                                                    + " is eventually 
serialized as a POJO, then the type is registered with the POJO serializer. If 
the"
-                                                    + " type ends up being 
serialized with Kryo, then it will be registered at Kryo to make"
-                                                    + " sure that only tags 
are written.")
-                                    .build());
-
     public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
             key("pipeline.serialization-config")
                     .stringType()
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index a361777a912..2fdc3ec08de 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -54,12 +54,12 @@ public class ExecutionConfigTest {
         List<Class<?>> expectedTypes = Arrays.asList(Double.class, 
Integer.class);
 
         for (Class<?> tpe : types) {
-            config.getSerializerConfig().registerKryoType(tpe);
+            ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(tpe);
         }
 
         int counter = 0;
 
-        for (Class<?> tpe : config.getRegisteredKryoTypes()) {
+        for (Class<?> tpe : 
config.getSerializerConfig().getRegisteredKryoTypes()) {
             assertThat(tpe).isEqualTo(expectedTypes.get(counter++));
         }
 
@@ -176,74 +176,38 @@ public class ExecutionConfigTest {
 
     @Test
     void testLoadingRegisteredKryoTypesFromConfiguration() {
-        ExecutionConfig configFromSetters = new ExecutionConfig();
-        
configFromSetters.getSerializerConfig().registerKryoType(ExecutionConfigTest.class);
-        
configFromSetters.getSerializerConfig().registerKryoType(TestSerializer1.class);
-
         ExecutionConfig configFromConfiguration = new ExecutionConfig();
 
         Configuration configuration = new Configuration();
-        configuration.setString(
-                "pipeline.registered-kryo-types",
-                "org.apache.flink.api.common.ExecutionConfigTest;"
-                        + 
"org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1");
+        String serializationConfigStr =
+                "{org.apache.flink.api.common.ExecutionConfigTest: {type: 
kryo}, "
+                        + 
"org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: 
kryo}}";
+        configuration.setString("pipeline.serialization-config", 
serializationConfigStr);
 
         // mutate config according to configuration
         configFromConfiguration.configure(
                 configuration, Thread.currentThread().getContextClassLoader());
 
-        assertThat(configFromConfiguration.getRegisteredKryoTypes())
-                .isEqualTo(configFromSetters.getRegisteredKryoTypes());
+        
assertThat(configFromConfiguration.getSerializerConfig().getRegisteredKryoTypes())
+                .containsExactlyInAnyOrder(ExecutionConfigTest.class, 
TestSerializer1.class);
     }
 
     @Test
     void testLoadingRegisteredPojoTypesFromConfiguration() {
-        ExecutionConfig configFromSetters = new ExecutionConfig();
-        configFromSetters.registerPojoType(ExecutionConfigTest.class);
-        configFromSetters.registerPojoType(TestSerializer1.class);
-
         ExecutionConfig configFromConfiguration = new ExecutionConfig();
 
         Configuration configuration = new Configuration();
-        configuration.setString(
-                "pipeline.registered-pojo-types",
-                "org.apache.flink.api.common.ExecutionConfigTest;"
-                        + 
"org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1");
+        String serializationConfigStr =
+                "{org.apache.flink.api.common.ExecutionConfigTest: {type: 
pojo}, "
+                        + 
"org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: 
pojo}}";
+        configuration.setString("pipeline.serialization-config", 
serializationConfigStr);
 
         // mutate config according to configuration
         configFromConfiguration.configure(
                 configuration, Thread.currentThread().getContextClassLoader());
 
-        assertThat(configFromConfiguration.getRegisteredPojoTypes())
-                .isEqualTo(configFromSetters.getRegisteredPojoTypes());
-    }
-
-    @Test
-    void testLoadingDefaultKryoSerializersFromConfiguration() {
-        ExecutionConfig configFromSetters = new ExecutionConfig();
-        configFromSetters
-                .getSerializerConfig()
-                .addDefaultKryoSerializer(ExecutionConfigTest.class, 
TestSerializer1.class);
-        configFromSetters
-                .getSerializerConfig()
-                .addDefaultKryoSerializer(TestSerializer1.class, 
TestSerializer2.class);
-
-        ExecutionConfig configFromConfiguration = new ExecutionConfig();
-
-        Configuration configuration = new Configuration();
-        configuration.setString(
-                "pipeline.default-kryo-serializers",
-                "class:org.apache.flink.api.common.ExecutionConfigTest,"
-                        + 
"serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;"
-                        + 
"class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,"
-                        + 
"serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2");
-
-        // mutate config according to configuration
-        configFromConfiguration.configure(
-                configuration, Thread.currentThread().getContextClassLoader());
-
-        
assertThat(configFromConfiguration.getSerializerConfig().getDefaultKryoSerializers())
-                
.isEqualTo(configFromSetters.getSerializerConfig().getDefaultKryoSerializers());
+        
assertThat(configFromConfiguration.getSerializerConfig().getRegisteredPojoTypes())
+                .containsExactlyInAnyOrder(ExecutionConfigTest.class, 
TestSerializer1.class);
     }
 
     @Test
@@ -268,8 +232,10 @@ public class ExecutionConfigTest {
     @Test
     void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() {
         ExecutionConfig config = new ExecutionConfig();
-        
config.getSerializerConfig().registerKryoType(ExecutionConfigTest.class);
-        config.getSerializerConfig().registerKryoType(TestSerializer1.class);
+        ((SerializerConfigImpl) config.getSerializerConfig())
+                .registerKryoType(ExecutionConfigTest.class);
+        ((SerializerConfigImpl) config.getSerializerConfig())
+                .registerKryoType(TestSerializer1.class);
 
         Configuration configuration = new Configuration();
 
@@ -279,16 +245,17 @@ public class ExecutionConfigTest {
         LinkedHashSet<Object> set = new LinkedHashSet<>();
         set.add(ExecutionConfigTest.class);
         set.add(TestSerializer1.class);
-        assertThat(config.getRegisteredKryoTypes()).isEqualTo(set);
+        
assertThat(config.getSerializerConfig().getRegisteredKryoTypes()).isEqualTo(set);
     }
 
     @Test
     void testNotOverridingRegisteredPojoTypesWithDefaultsFromConfiguration() {
         ExecutionConfig config = new ExecutionConfig();
-        config.registerPojoType(ExecutionConfigTest.class);
-        config.registerPojoType(TestSerializer1.class);
-
         Configuration configuration = new Configuration();
+        String serializationConfigStr =
+                "{org.apache.flink.api.common.ExecutionConfigTest: {type: 
pojo}, "
+                        + 
"org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1: {type: 
pojo}}";
+        configuration.setString("pipeline.serialization-config", 
serializationConfigStr);
 
         // mutate config according to configuration
         config.configure(configuration, 
Thread.currentThread().getContextClassLoader());
@@ -296,7 +263,7 @@ public class ExecutionConfigTest {
         LinkedHashSet<Object> set = new LinkedHashSet<>();
         set.add(ExecutionConfigTest.class);
         set.add(TestSerializer1.class);
-        assertThat(config.getRegisteredPojoTypes()).isEqualTo(set);
+        
assertThat(config.getSerializerConfig().getRegisteredPojoTypes()).isEqualTo(set);
     }
 
     @Test
@@ -317,10 +284,9 @@ public class ExecutionConfigTest {
     @Test
     void testNotOverridingDefaultKryoSerializersFromConfiguration() {
         ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig()
-                .addDefaultKryoSerializer(ExecutionConfigTest.class, 
TestSerializer1.class);
-        config.getSerializerConfig()
-                .addDefaultKryoSerializer(TestSerializer1.class, 
TestSerializer2.class);
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
+        serializerConfig.addDefaultKryoSerializer(ExecutionConfigTest.class, 
TestSerializer1.class);
+        serializerConfig.addDefaultKryoSerializer(TestSerializer1.class, 
TestSerializer2.class);
 
         Configuration configuration = new Configuration();
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
index 96799e42f51..842ddea9655 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.serialization;
 
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -33,39 +32,16 @@ import java.io.Serializable;
 import java.lang.reflect.Type;
 import java.util.AbstractMap;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.configuration.PipelineOptions.KRYO_DEFAULT_SERIALIZERS;
-import static 
org.apache.flink.configuration.PipelineOptions.KRYO_REGISTERED_CLASSES;
-import static 
org.apache.flink.configuration.PipelineOptions.POJO_REGISTERED_CLASSES;
 import static 
org.apache.flink.configuration.PipelineOptions.SERIALIZATION_CONFIG;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class SerializerConfigImplTest {
-    private static final Map<ConfigOption<List<String>>, String> configs = new 
HashMap<>();
-
-    static {
-        configs.put(
-                KRYO_DEFAULT_SERIALIZERS,
-                
"class:org.apache.flink.api.common.serialization.SerializerConfigImplTest,"
-                        + 
"serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;"
-                        + 
"class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1,"
-                        + 
"serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2");
-        configs.put(
-                KRYO_REGISTERED_CLASSES,
-                
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
-                        + 
"org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
-        configs.put(
-                POJO_REGISTERED_CLASSES,
-                
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
-                        + 
"org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
-    }
-
     @Test
     void testReadingDefaultConfig() {
         SerializerConfig config = new SerializerConfigImpl();
@@ -79,7 +55,7 @@ class SerializerConfigImplTest {
 
     @Test
     void testDoubleTypeRegistration() {
-        SerializerConfig config = new SerializerConfigImpl();
+        SerializerConfigImpl config = new SerializerConfigImpl();
         List<Class<?>> types = Arrays.asList(Double.class, Integer.class, 
Double.class);
         List<Class<?>> expectedTypes = Arrays.asList(Double.class, 
Integer.class);
 
@@ -96,72 +72,9 @@ class SerializerConfigImplTest {
         assertThat(expectedTypes).hasSize(counter);
     }
 
-    @Test
-    void testLoadingRegisteredKryoTypesFromConfiguration() {
-        SerializerConfig configFromSetters = new SerializerConfigImpl();
-        configFromSetters.registerKryoType(SerializerConfigImplTest.class);
-        
configFromSetters.registerKryoType(SerializerConfigImplTest.TestSerializer1.class);
-
-        SerializerConfig configFromConfiguration = new SerializerConfigImpl();
-
-        Configuration configuration = new Configuration();
-        configuration.setString(
-                KRYO_REGISTERED_CLASSES.key(), 
configs.get(KRYO_REGISTERED_CLASSES));
-
-        // mutate config according to configuration
-        configFromConfiguration.configure(
-                configuration, Thread.currentThread().getContextClassLoader());
-
-        assertThat(configFromConfiguration.getRegisteredKryoTypes())
-                .isEqualTo(configFromSetters.getRegisteredKryoTypes());
-    }
-
-    @Test
-    void testLoadingRegisteredPojoTypesFromConfiguration() {
-        SerializerConfig configFromSetters = new SerializerConfigImpl();
-        configFromSetters.registerPojoType(SerializerConfigImplTest.class);
-        
configFromSetters.registerPojoType(SerializerConfigImplTest.TestSerializer1.class);
-
-        SerializerConfig configFromConfiguration = new SerializerConfigImpl();
-
-        Configuration configuration = new Configuration();
-        configuration.setString(
-                POJO_REGISTERED_CLASSES.key(), 
configs.get(POJO_REGISTERED_CLASSES));
-
-        // mutate config according to configuration
-        configFromConfiguration.configure(
-                configuration, Thread.currentThread().getContextClassLoader());
-
-        assertThat(configFromConfiguration.getRegisteredPojoTypes())
-                .isEqualTo(configFromSetters.getRegisteredPojoTypes());
-    }
-
-    @Test
-    void testLoadingDefaultKryoSerializersFromConfiguration() {
-        SerializerConfig configFromSetters = new SerializerConfigImpl();
-        configFromSetters.addDefaultKryoSerializer(
-                SerializerConfigImplTest.class, 
SerializerConfigImplTest.TestSerializer1.class);
-        configFromSetters.addDefaultKryoSerializer(
-                SerializerConfigImplTest.TestSerializer1.class,
-                SerializerConfigImplTest.TestSerializer2.class);
-
-        SerializerConfig configFromConfiguration = new SerializerConfigImpl();
-
-        Configuration configuration = new Configuration();
-        configuration.setString(
-                KRYO_DEFAULT_SERIALIZERS.key(), 
configs.get(KRYO_DEFAULT_SERIALIZERS));
-
-        // mutate config according to configuration
-        configFromConfiguration.configure(
-                configuration, Thread.currentThread().getContextClassLoader());
-
-        assertThat(configFromConfiguration.getDefaultKryoSerializers())
-                .isEqualTo(configFromSetters.getDefaultKryoSerializers());
-    }
-
     @Test
     void testNotOverridingRegisteredKryoTypesWithDefaultsFromConfiguration() {
-        SerializerConfig config = new SerializerConfigImpl();
+        SerializerConfigImpl config = new SerializerConfigImpl();
         config.registerKryoType(SerializerConfigImplTest.class);
         
config.registerKryoType(SerializerConfigImplTest.TestSerializer1.class);
 
@@ -178,7 +91,7 @@ class SerializerConfigImplTest {
 
     @Test
     void testNotOverridingRegisteredPojoTypesWithDefaultsFromConfiguration() {
-        SerializerConfig config = new SerializerConfigImpl();
+        SerializerConfigImpl config = new SerializerConfigImpl();
         config.registerPojoType(SerializerConfigImplTest.class);
         
config.registerPojoType(SerializerConfigImplTest.TestSerializer1.class);
 
@@ -195,7 +108,7 @@ class SerializerConfigImplTest {
 
     @Test
     void testNotOverridingDefaultKryoSerializersFromConfiguration() {
-        SerializerConfig config = new SerializerConfigImpl();
+        SerializerConfigImpl config = new SerializerConfigImpl();
         config.addDefaultKryoSerializer(
                 SerializerConfigImplTest.class, 
SerializerConfigImplTest.TestSerializer1.class);
         config.addDefaultKryoSerializer(
@@ -311,9 +224,8 @@ class SerializerConfigImplTest {
 
     @Test
     void testCopySerializerConfig() {
-        SerializerConfig serializerConfig = new SerializerConfigImpl();
+        SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
         Configuration configuration = new Configuration();
-        configs.forEach((k, v) -> configuration.setString(k.key(), v));
 
         serializerConfig.configure(configuration, 
SerializerConfigImplTest.class.getClassLoader());
         serializerConfig
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 6752d0a0dde..c9ca4700408 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -127,7 +127,7 @@ class StateDescriptorTest {
                 .isEqualTo(-1);
 
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(File.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(File.class);
 
         final TestStateDescriptor<Path> original = new 
TestStateDescriptor<>("test", Path.class);
         TestStateDescriptor<Path> clone = 
CommonTestUtils.createCopySerializable(original);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index f0197003c02..709be364fd1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
@@ -370,7 +369,7 @@ class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.TestUserC
      */
     @Test
     void testReconfigureDifferentSubclassRegistrationOrder() throws Exception {
-        SerializerConfig serializerConfig = new SerializerConfigImpl();
+        SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
         serializerConfig.registerPojoType(SubTestUserClassA.class);
         serializerConfig.registerPojoType(SubTestUserClassB.class);
 
@@ -531,7 +530,7 @@ class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.TestUserC
 
         // instantiate new PojoSerializer, with new execution config that has 
the subclass
         // registrations
-        SerializerConfig serializerConfig = new SerializerConfigImpl();
+        SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
         serializerConfig.registerPojoType(SubTestUserClassA.class);
         serializerConfig.registerPojoType(SubTestUserClassB.class);
         pojoSerializer = (PojoSerializer<TestUserClass>) 
type.createSerializer(serializerConfig);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
index 98861d32371..6f2966c1f7d 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTestSpecifications.java
@@ -609,7 +609,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createPriorSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             serializerConfig.registerPojoType(SubclassPojoWithIntField.class);
 
             TypeSerializer<StaticSchemaPojo> serializer =
@@ -646,7 +646,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             
serializerConfig.registerPojoType(SubclassPojoWithStringField.class);
 
             TypeSerializer<StaticSchemaPojo> serializer =
@@ -734,7 +734,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createPriorSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class);
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
 
@@ -756,7 +756,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             // different registration order than setup
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class);
@@ -795,7 +795,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createPriorSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class);
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
 
@@ -817,7 +817,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             // missing registration for subclass A compared to setup
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
 
@@ -875,7 +875,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             // new registration for subclass A compared to setup
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class);
 
@@ -912,7 +912,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createPriorSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassA.class);
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
 
@@ -934,7 +934,7 @@ public class PojoSerializerUpgradeTestSpecifications {
 
         @Override
         public TypeSerializer<StaticSchemaPojo> createUpgradedSerializer() {
-            SerializerConfig serializerConfig = new SerializerConfigImpl();
+            SerializerConfigImpl serializerConfig = new SerializerConfigImpl();
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassB.class);
             serializerConfig.registerPojoType(StaticSchemaPojoSubclassC.class);
 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
index 411b373b282..6df63073a6c 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-scala/src/main/java/org/apache/flink/tests/scala/JavaJobWithKryoSerializer.java
@@ -17,18 +17,24 @@
 
 package org.apache.flink.tests.scala;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 /** Simple batch job in pure Java that uses a custom Kryo serializer. */
 public class JavaJobWithKryoSerializer {
     public static void main(String[] args) throws Exception {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        configuration.setString(
+                PipelineOptions.SERIALIZATION_CONFIG.key(),
+                "{org.apache.flink.tests.scala.NonPojo: "
+                        + "{type: kryo, kryo-type: default, class: 
org.apache.flink.tests.scala.NonPojoSerializer}}");
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // we want to go through serialization to check for kryo issues
         env.disableOperatorChaining();
 
-        env.addDefaultKryoSerializer(NonPojo.class, NonPojoSerializer.class);
-
         env.fromData(new NonPojo()).map(x -> x);
 
         env.execute();
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index c54fe3731cd..55df2f78f4b 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.formats.avro.utils;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.SerializableSerializer;
 import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.AvroUtils;
@@ -48,9 +49,10 @@ public class AvroKryoSerializerUtils extends AvroUtils {
 
             // Avro POJOs contain java.util.List which have GenericData.Array 
as their runtime type
             // because Kryo is not able to serialize them properly, we use 
this serializer for them
-            reg.registerTypeWithKryoSerializer(
-                    GenericData.Array.class,
-                    
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+            ((SerializerConfigImpl) reg)
+                    .registerTypeWithKryoSerializer(
+                            GenericData.Array.class,
+                            
Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
 
             // We register this serializer for users who want to use untyped 
Avro records
             // (GenericData.Record).
@@ -59,7 +61,8 @@ public class AvroKryoSerializerUtils extends AvroUtils {
             // a bad idea.
             // we add the serializer as a default serializer because Avro is 
using a private
             // sub-type at runtime.
-            reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+            ((SerializerConfigImpl) reg)
+                    .addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
         }
     }
 
@@ -70,7 +73,7 @@ public class AvroKryoSerializerUtils extends AvroUtils {
                 GenericData.Array.class.getName(),
                 new KryoRegistration(
                         GenericData.Array.class,
-                        new ExecutionConfig.SerializableSerializer<>(
+                        new SerializableSerializer<>(
                                 new Serializers
                                         
.SpecificInstanceCollectionSerializerForArrayList())));
     }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index 7bc51fd140e..1a02ae21224 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -308,7 +308,7 @@ class AvroTypeExtractionTest {
 
         // test if automatic registration of the Types worked
         ExecutionConfig ec = env.getConfig();
-        assertThat(ec.getRegisteredKryoTypes()).contains(Fixed16.class);
+        
assertThat(ec.getSerializerConfig().getRegisteredKryoTypes()).contains(Fixed16.class);
 
         switch (fieldName) {
             case "name":
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index ee2ce212f4c..eb0d7ec4143 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -42,7 +42,6 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
@@ -69,11 +68,9 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SplittableIterator;
 import org.apache.flink.util.WrappingRuntimeException;
 
-import com.esotericsoftware.kryo.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -282,85 +279,6 @@ public class ExecutionEnvironment {
         return this.lastJobExecutionResult;
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    //  Registry for types and serializers
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     */
-    public <T extends Serializer<?> & Serializable> void 
addDefaultKryoSerializer(
-            Class<?> type, T serializer) {
-        config.getSerializerConfig().addDefaultKryoSerializer(type, 
serializer);
-    }
-
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     */
-    public void addDefaultKryoSerializer(
-            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
-        config.getSerializerConfig().addDefaultKryoSerializer(type, 
serializerClass);
-    }
-
-    /**
-     * Registers the given type with a Kryo Serializer.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     */
-    public <T extends Serializer<?> & Serializable> void 
registerTypeWithKryoSerializer(
-            Class<?> type, T serializer) {
-        config.getSerializerConfig().registerTypeWithKryoSerializer(type, 
serializer);
-    }
-
-    /**
-     * Registers the given Serializer via its class as a serializer for the 
given type at the
-     * KryoSerializer.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     */
-    public void registerTypeWithKryoSerializer(
-            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
-        config.getSerializerConfig().registerTypeWithKryoSerializer(type, 
serializerClass);
-    }
-
-    /**
-     * Registers the given type with the serialization stack. If the type is 
eventually serialized
-     * as a POJO, then the type is registered with the POJO serializer. If the 
type ends up being
-     * serialized with Kryo, then it will be registered at Kryo to make sure 
that only tags are
-     * written.
-     *
-     * @param type The class of the type to register.
-     */
-    public void registerType(Class<?> type) {
-        if (type == null) {
-            throw new NullPointerException("Cannot register null type class.");
-        }
-
-        TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type);
-
-        if (typeInfo instanceof PojoTypeInfo) {
-            config.getSerializerConfig().registerPojoType(type);
-        } else {
-            config.getSerializerConfig().registerKryoType(type);
-        }
-    }
-
     /**
      * Sets all relevant options contained in the {@link ReadableConfig} such 
as e.g. {@link
      * PipelineOptions#CACHED_FILES}. It will reconfigure {@link 
ExecutionEnvironment} and {@link
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
index 97f9a07b20f..64d328675f9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
@@ -180,30 +180,22 @@ public class PlanGenerator {
     private int getNumberOfRegisteredTypes() {
         return config.getSerializerConfig().getRegisteredKryoTypes().size()
                 + config.getSerializerConfig().getRegisteredPojoTypes().size()
-                + 
config.getSerializerConfig().getRegisteredTypesWithKryoSerializerClasses().size()
-                + 
config.getSerializerConfig().getRegisteredTypesWithKryoSerializers().size();
+                + 
config.getSerializerConfig().getRegisteredTypesWithKryoSerializerClasses().size();
     }
 
     private int getNumberOfDefaultKryoSerializers() {
-        return config.getSerializerConfig().getDefaultKryoSerializers().size()
-                + 
config.getSerializerConfig().getDefaultKryoSerializerClasses().size();
+        return 
config.getSerializerConfig().getDefaultKryoSerializerClasses().size();
     }
 
     private void logDebuggingTypeDetails() {
         LOG.debug(
                 "Registered Kryo types: {}",
                 
config.getSerializerConfig().getRegisteredKryoTypes().toString());
-        LOG.debug(
-                "Registered Kryo with Serializers types: {}",
-                
config.getSerializerConfig().getRegisteredTypesWithKryoSerializers().entrySet());
         LOG.debug(
                 "Registered Kryo with Serializer Classes types: {}",
                 config.getSerializerConfig()
                         .getRegisteredTypesWithKryoSerializerClasses()
                         .entrySet());
-        LOG.debug(
-                "Registered Kryo default Serializers: {}",
-                
config.getSerializerConfig().getDefaultKryoSerializers().entrySet());
         LOG.debug(
                 "Registered Kryo default Serializers Classes {}",
                 
config.getSerializerConfig().getDefaultKryoSerializerClasses().entrySet());
diff --git a/flink-python/pyflink/common/execution_config.py 
b/flink-python/pyflink/common/execution_config.py
index b83369f60e5..caad612659e 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -477,20 +477,6 @@ class ExecutionConfig(object):
         
self._j_execution_config.setGlobalJobParameters(j_global_job_parameters)
         return self
 
-    def get_registered_types_with_kryo_serializer_classes(self) -> Dict[str, 
str]:
-        """
-        Returns the registered types with their Kryo Serializer classes.
-
-        :return: The dict which the keys are full-qualified java class names 
of the registered
-                 types and the values are full-qualified java class names of 
the Kryo Serializer
-                 classes.
-        """
-        j_clz_map = 
self._j_execution_config.getRegisteredTypesWithKryoSerializerClasses()
-        registered_serializers = {}
-        for key in j_clz_map:
-            registered_serializers[key.getName()] = j_clz_map[key].getName()
-        return registered_serializers
-
     def get_default_kryo_serializer_classes(self) -> Dict[str, str]:
         """
         Returns the registered default Kryo Serializer classes.
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py 
b/flink-python/pyflink/common/tests/test_execution_config.py
index bec8c968525..e08e5a23e83 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -109,26 +109,6 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.PIPELINED_FORCED)
 
-    def test_disable_enable_force_kryo(self):
-
-        self.execution_config.disable_force_kryo()
-
-        self.assertFalse(self.execution_config.is_force_kryo_enabled())
-
-        self.execution_config.enable_force_kryo()
-
-        self.assertTrue(self.execution_config.is_force_kryo_enabled())
-
-    def test_disable_enable_generic_types(self):
-
-        self.execution_config.disable_generic_types()
-
-        self.assertTrue(self.execution_config.has_generic_types_disabled())
-
-        self.execution_config.enable_generic_types()
-
-        self.assertFalse(self.execution_config.has_generic_types_disabled())
-
     def test_disable_enable_auto_generated_uids(self):
 
         self.execution_config.disable_auto_generated_uids()
@@ -139,16 +119,6 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         
self.assertTrue(self.execution_config.has_auto_generated_uids_enabled())
 
-    def test_disable_enable_force_avro(self):
-
-        self.execution_config.disable_force_avro()
-
-        self.assertFalse(self.execution_config.is_force_avro_enabled())
-
-        self.execution_config.enable_force_avro()
-
-        self.assertTrue(self.execution_config.is_force_avro_enabled())
-
     def test_disable_enable_object_reuse(self):
 
         self.execution_config.disable_object_reuse()
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 2f135919608..8ecbcda471e 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -377,11 +378,10 @@ public abstract class AbstractQueryableStateTestBase {
         RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 
Integer.MAX_VALUE, 1000L);
 
         // Custom serializer is not needed, it's used just to check if 
serialization works.
-        env.getConfig()
-                .getSerializerConfig()
-                .addDefaultKryoSerializer(
-                        Byte.class,
-                        (Serializer<?> & Serializable) 
createSerializer(userClassLoader));
+        Class<Serializer<?>> customSerializerClass =
+                (Class<Serializer<?>>) userClassLoader.loadClass("CustomKryo");
+        ((SerializerConfigImpl) env.getConfig().getSerializerConfig())
+                .addDefaultKryoSerializer(Byte.class, customSerializerClass);
 
         // Here we *force* using Kryo, to check if custom serializers are 
handled correctly WRT
         // classloading
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b740a62e0c5..9993444bed0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -47,7 +47,6 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -110,11 +109,8 @@ import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TernaryBoolean;
 import org.apache.flink.util.WrappingRuntimeException;
 
-import com.esotericsoftware.kryo.Serializer;
-
 import javax.annotation.Nullable;
 
-import java.io.Serializable;
 import java.net.URI;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -712,129 +708,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         return path == null ? null : new Path(path);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Registry for types and serializers
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. Instance-type 
serializer definition
-     *     where serializers are serialized and written into the snapshot and 
deserialized for use
-     *     is deprecated as well. Use class-type serializer definition by 
{@link
-     *     PipelineOptions#SERIALIZATION_CONFIG} instead, where only the class 
name is written into
-     *     the snapshot and new instance of the serializer is created for use. 
This is a breaking
-     *     change, and it will be removed in Flink 2.0.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public <T extends Serializer<?> & Serializable> void 
addDefaultKryoSerializer(
-            Class<?> type, T serializer) {
-        config.getSerializerConfig().addDefaultKryoSerializer(type, 
serializer);
-    }
-
-    /**
-     * Adds a new Kryo default serializer to the Runtime.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void addDefaultKryoSerializer(
-            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
-        config.getSerializerConfig().addDefaultKryoSerializer(type, 
serializerClass);
-    }
-
-    /**
-     * Registers the given type with a Kryo Serializer.
-     *
-     * <p>Note that the serializer instance must be serializable (as defined by
-     * java.io.Serializable), because it may be distributed to the worker 
nodes by java
-     * serialization.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializer The serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. Instance-type 
serializer definition
-     *     where serializers are serialized and written into the snapshot and 
deserialized for use
-     *     is deprecated as well. Use class-type serializer definition by 
{@link
-     *     PipelineOptions#SERIALIZATION_CONFIG} instead, where only the class 
name is written into
-     *     the snapshot and new instance of the serializer is created for use. 
This is a breaking
-     *     change, and it will be removed in Flink 2.0.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public <T extends Serializer<?> & Serializable> void 
registerTypeWithKryoSerializer(
-            Class<?> type, T serializer) {
-        config.getSerializerConfig().registerTypeWithKryoSerializer(type, 
serializer);
-    }
-
-    /**
-     * Registers the given Serializer via its class as a serializer for the 
given type at the
-     * KryoSerializer.
-     *
-     * @param type The class of the types serialized with the given serializer.
-     * @param serializerClass The class of the serializer to use.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    @SuppressWarnings("rawtypes")
-    public void registerTypeWithKryoSerializer(
-            Class<?> type, Class<? extends Serializer> serializerClass) {
-        config.getSerializerConfig().registerTypeWithKryoSerializer(type, 
serializerClass);
-    }
-
-    /**
-     * Registers the given type with the serialization stack. If the type is 
eventually serialized
-     * as a POJO, then the type is registered with the POJO serializer. If the 
type ends up being
-     * serialized with Kryo, then it will be registered at Kryo to make sure 
that only tags are
-     * written.
-     *
-     * @param type The class of the type to register.
-     * @deprecated Register data types and serializers through hard codes is 
deprecated, because you
-     *     need to modify the codes when upgrading job version. You should 
configure this by config
-     *     option {@link PipelineOptions#SERIALIZATION_CONFIG}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-398:+Improve+Serialization+Configuration+And+Usage+In+Flink";>
-     *     FLIP-398: Improve Serialization Configuration And Usage In Flink</a>
-     */
-    @Deprecated
-    public void registerType(Class<?> type) {
-        if (type == null) {
-            throw new NullPointerException("Cannot register null type class.");
-        }
-
-        TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type);
-
-        if (typeInfo instanceof PojoTypeInfo) {
-            config.getSerializerConfig().registerPojoType(type);
-        } else {
-            config.getSerializerConfig().registerKryoType(type);
-        }
-    }
-
     // 
--------------------------------------------------------------------------------------------
     //  Time characteristic
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 3c7a2f4f3f0..20518e9a63c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -124,7 +124,7 @@ class OperatorStateBackendTest {
                 .isFalse();
 
         final ExecutionConfig cfg = new ExecutionConfig();
-        cfg.getSerializerConfig()
+        ((SerializerConfigImpl) cfg.getSerializerConfig())
                 .registerTypeWithKryoSerializer(
                         registeredType, 
com.esotericsoftware.kryo.serializers.JavaSerializer.class);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7b8febbb7f8..1bfc9d4237e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -559,8 +560,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
         try {
             // cast because our test serializer is not typed to TestPojo
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .addDefaultKryoSerializer(
                             TestPojo.class, (Class) 
ExceptionThrowingTestSerializer.class);
 
@@ -633,8 +633,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
         try {
             // cast because our test serializer is not typed to TestPojo
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .addDefaultKryoSerializer(
                             TestPojo.class, (Class) 
ExceptionThrowingTestSerializer.class);
 
@@ -704,8 +703,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
     void testBackendUsesRegisteredKryoSerializer() throws Exception {
         CheckpointStreamFactory streamFactory = createStreamFactory();
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
-        env.getExecutionConfig()
-                .getSerializerConfig()
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
                 .registerTypeWithKryoSerializer(
                         TestPojo.class, ExceptionThrowingTestSerializer.class);
 
@@ -777,8 +775,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
         CheckpointStreamFactory streamFactory = createStreamFactory();
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
 
-        env.getExecutionConfig()
-                .getSerializerConfig()
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
                 .registerTypeWithKryoSerializer(
                         TestPojo.class, ExceptionThrowingTestSerializer.class);
 
@@ -899,7 +896,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             // ====================================== restore snapshot
             // ======================================
 
-            
env.getExecutionConfig().getSerializerConfig().registerKryoType(TestPojo.class);
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
+                    .registerKryoType(TestPojo.class);
 
             backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
 
@@ -976,8 +974,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             // ==========
 
             // cast because our test serializer is not typed to TestPojo
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .addDefaultKryoSerializer(
                             TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
 
@@ -1012,8 +1009,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             // =========
 
             // cast because our test serializer is not typed to TestPojo
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .addDefaultKryoSerializer(
                             TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
 
@@ -1084,8 +1080,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             // ========== restore snapshot - should use specific serializer 
(ONLY SERIALIZATION)
             // ==========
 
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
 
             backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
@@ -1118,8 +1113,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             // ========= restore snapshot - should use specific serializer 
(FAIL ON DESERIALIZATION)
             // =========
 
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
 
             assertRestoreKeyedBackendFail(snapshot2, kvId);
@@ -1138,8 +1132,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
 
         // register A first then B
-        
env.getExecutionConfig().getSerializerConfig().registerKryoType(TestNestedPojoClassA.class);
-        
env.getExecutionConfig().getSerializerConfig().registerKryoType(TestNestedPojoClassB.class);
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
+                .registerKryoType(TestNestedPojoClassA.class);
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
+                .registerKryoType(TestNestedPojoClassB.class);
 
         CheckpointableKeyedStateBackend<Integer> backend =
                 createKeyedBackend(IntSerializer.INSTANCE, env);
@@ -1208,11 +1204,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             env.close();
             env = buildMockEnv();
 
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerKryoType(TestNestedPojoClassB.class); // this 
time register B first
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerKryoType(TestNestedPojoClassA.class);
 
             backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
@@ -1267,8 +1261,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
 
         // register A first then B
-        env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
-        env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
+                .registerPojoType(TestNestedPojoClassA.class);
+        ((SerializerConfigImpl) env.getExecutionConfig().getSerializerConfig())
+                .registerPojoType(TestNestedPojoClassB.class);
 
         CheckpointableKeyedStateBackend<Integer> backend =
                 createKeyedBackend(IntSerializer.INSTANCE, env);
@@ -1322,9 +1318,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
             env.close();
             env = buildMockEnv();
 
-            env.getExecutionConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerPojoType(TestNestedPojoClassB.class); // this 
time register B first
-            
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
+                    .registerPojoType(TestNestedPojoClassA.class);
 
             backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
index 5312db4ba9b..ac3946b327f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
@@ -36,7 +36,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -261,12 +260,6 @@ public class FlinkConfigLoaderTest {
                             + ": "
                             + 
"name:file1,path:'file:///tmp/file1';name:file2,path:'hdfs:///tmp/file2'"
                             + "\n");
-            fw.write(
-                    "pipeline.default-kryo-serializers"
-                            + ": "
-                            + 
"class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
-                            + " 
class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"
-                            + "\n");
         }
         Map<String, String> configuration =
                 ConfigurationFileMigrationUtils.loadLegacyYAMLResource(file);
@@ -288,19 +281,6 @@ public class FlinkConfigLoaderTest {
         assertThat(configuration.getOrDefault(TEST_CONFIG_KEY, null))
                 .isEqualTo(standardYamlConfig.getString(TEST_CONFIG_KEY, 
null));
 
-        List<String> serializers =
-                ConfigurationUtils.convertToList(
-                        
configuration.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS.key()),
-                        String.class);
-        assertThat(
-                        serializers.stream()
-                                .map(ConfigurationUtils::parseStringToMap)
-                                .collect(Collectors.toList()))
-                .isEqualTo(
-                        
standardYamlConfig.get(PipelineOptions.KRYO_DEFAULT_SERIALIZERS).stream()
-                                .map(ConfigurationUtils::parseStringToMap)
-                                .collect(Collectors.toList()));
-
         List<String> cachedFiles =
                 ConfigurationUtils.convertToList(
                         configuration.get(PipelineOptions.CACHED_FILES.key()), 
String.class);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index c6bb0b6e5db..8207143a8a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -54,13 +56,16 @@ import static org.assertj.core.api.Assertions.assertThat;
  *
  * <p>The tests use an arbitrary generic type to validate the behavior.
  */
-@SuppressWarnings("serial")
 class StateDescriptorPassingTest {
 
     @Test
     void testReduceWindowState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         DataStream<File> src =
                 env.fromData(new File("/"))
@@ -92,8 +97,12 @@ class StateDescriptorPassingTest {
 
     @Test
     void testApplyWindowState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         DataStream<File> src =
                 env.fromData(new File("/"))
@@ -126,8 +135,12 @@ class StateDescriptorPassingTest {
 
     @Test
     void testProcessWindowState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         DataStream<File> src =
                 env.fromData(new File("/"))
@@ -160,8 +173,12 @@ class StateDescriptorPassingTest {
 
     @Test
     void testProcessAllWindowState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // simulate ingestion time
         DataStream<File> src =
@@ -187,8 +204,12 @@ class StateDescriptorPassingTest {
 
     @Test
     void testReduceWindowAllState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // simulate ingestion time
         DataStream<File> src =
@@ -214,8 +235,12 @@ class StateDescriptorPassingTest {
 
     @Test
     void testApplyWindowAllState() {
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+        Configuration configuration = new Configuration();
+        String serializerConfigStr =
+                "{java.io.File: {type: kryo, kryo-type: registered, class: 
com.esotericsoftware.kryo.serializers.JavaSerializer}}";
+        configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), 
serializerConfigStr);
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         // simulate ingestion time
         DataStream<File> src =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index d0eb27d0659..db05b4e4b6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.SerializerFactory;
-import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -92,7 +92,7 @@ class StreamingRuntimeContextTest {
     void testValueStateInstantiation() throws Exception {
 
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(Path.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
@@ -113,7 +113,7 @@ class StreamingRuntimeContextTest {
     void testReducingStateInstantiation() throws Exception {
 
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(Path.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
@@ -139,7 +139,7 @@ class StreamingRuntimeContextTest {
     @Test
     void testAggregatingStateInstantiation() throws Exception {
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(Path.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
@@ -168,7 +168,7 @@ class StreamingRuntimeContextTest {
     void testListStateInstantiation() throws Exception {
 
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(Path.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
@@ -209,7 +209,7 @@ class StreamingRuntimeContextTest {
     void testMapStateInstantiation() throws Exception {
 
         final ExecutionConfig config = new ExecutionConfig();
-        config.getSerializerConfig().registerKryoType(Path.class);
+        ((SerializerConfigImpl) 
config.getSerializerConfig()).registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
@@ -252,7 +252,7 @@ class StreamingRuntimeContextTest {
     void testV2ValueStateInstantiation() throws Exception {
 
         final ExecutionConfig config = new ExecutionConfig();
-        SerializerConfig serializerConfig = config.getSerializerConfig();
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
         serializerConfig.registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
@@ -276,7 +276,7 @@ class StreamingRuntimeContextTest {
     @Test
     void testV2ListStateInstantiation() throws Exception {
         final ExecutionConfig config = new ExecutionConfig();
-        SerializerConfig serializerConfig = config.getSerializerConfig();
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
         serializerConfig.registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
@@ -300,7 +300,7 @@ class StreamingRuntimeContextTest {
     @Test
     void testV2MapStateInstantiation() throws Exception {
         final ExecutionConfig config = new ExecutionConfig();
-        SerializerConfig serializerConfig = config.getSerializerConfig();
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
         serializerConfig.registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
@@ -328,7 +328,7 @@ class StreamingRuntimeContextTest {
     @Test
     void testV2ReducingStateInstantiation() throws Exception {
         final ExecutionConfig config = new ExecutionConfig();
-        SerializerConfig serializerConfig = config.getSerializerConfig();
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
         serializerConfig.registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
@@ -358,7 +358,7 @@ class StreamingRuntimeContextTest {
     @Test
     void testV2AggregatingStateInstantiation() throws Exception {
         final ExecutionConfig config = new ExecutionConfig();
-        SerializerConfig serializerConfig = config.getSerializerConfig();
+        SerializerConfigImpl serializerConfig = (SerializerConfigImpl) 
config.getSerializerConfig();
         serializerConfig.registerKryoType(Path.class);
 
         final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index ca248ba9345..15b53ef3d9c 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -275,8 +276,7 @@ public class ChangelogStateBackendTestUtils {
 
             // ============================ restore snapshot 
===============================
 
-            env.getExecutionConfig()
-                    .getSerializerConfig()
+            ((SerializerConfigImpl) 
env.getExecutionConfig().getSerializerConfig())
                     .registerKryoType(StateBackendTestBase.TestPojo.class);
 
             keyedBackend =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
index a441fc754d0..70cc78f2baf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.java
@@ -131,9 +131,10 @@ class StreamExecutionEnvironmentComplexConfigurationTest {
     void testLoadingKryoSerializersFromConfiguration() {
         Configuration configuration = new Configuration();
         configuration.setString(
-                "pipeline.default-kryo-serializers",
-                
"class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo'"
-                        + 
",serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
+                "pipeline.serialization-config",
+                
"{org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo:"
+                        + " {type: kryo, kryo-type: default, class:"
+                        + " 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer}}");
 
         // mutate config according to configuration
         StreamExecutionEnvironment envFromConfiguration =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
index 76041cb216f..422323fcf8a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -34,9 +34,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.delegation.Planner;
 
-import com.esotericsoftware.kryo.Serializer;
-
-import java.io.Serializable;
 import java.util.List;
 
 /**
@@ -177,40 +174,6 @@ public class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment
         return realExecEnv.getCheckpointingConsistencyMode();
     }
 
-    @Override
-    public <T extends Serializer<?> & Serializable> void 
addDefaultKryoSerializer(
-            Class<?> type, T serializer) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
addDefaultKryoSerializer method is unsupported.");
-    }
-
-    @Override
-    public void addDefaultKryoSerializer(
-            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
addDefaultKryoSerializer method is unsupported.");
-    }
-
-    @Override
-    public <T extends Serializer<?> & Serializable> void 
registerTypeWithKryoSerializer(
-            Class<?> type, T serializer) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
registerTypeWithKryoSerializer method is unsupported.");
-    }
-
-    @Override
-    public void registerTypeWithKryoSerializer(
-            Class<?> type, Class<? extends Serializer> serializerClass) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
registerTypeWithKryoSerializer method is unsupported.");
-    }
-
-    @Override
-    public void registerType(Class<?> type) {
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, registerType 
method is unsupported.");
-    }
-
     @Override
     public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) 
{
         throw new UnsupportedOperationException(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java
index cc80c8c4a4e..042fa97e610 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/GroupReduceITCase.java
@@ -1088,8 +1088,9 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBaseJUnit4 {
         ExecutionConfig ec = env.getConfig();
 
         // check if automatic type registration with Kryo worked
-        Assert.assertTrue(ec.getRegisteredKryoTypes().contains(BigInt.class));
-        
Assert.assertFalse(ec.getRegisteredKryoTypes().contains(java.sql.Date.class));
+        
Assert.assertTrue(ec.getSerializerConfig().getRegisteredKryoTypes().contains(BigInt.class));
+        Assert.assertFalse(
+                
ec.getSerializerConfig().getRegisteredKryoTypes().contains(java.sql.Date.class));
 
         String expected = null;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
deleted file mode 100644
index 7102842b3fe..00000000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/RegisterTypeWithKryoSerializerITCase.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.runtime;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-import static 
org.apache.flink.test.util.TestBaseUtils.compareResultCollections;
-
-/** Test registering types with Kryo. */
-@RunWith(Parameterized.class)
-public class RegisterTypeWithKryoSerializerITCase extends 
MultipleProgramsTestBaseJUnit4 {
-
-    public RegisterTypeWithKryoSerializerITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    /**
-     * Tests whether the kryo serializer is forwarded via the ExecutionConfig.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testRegisterTypeWithKryoSerializer() throws Exception {
-        int numElements = 10;
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-        env.registerTypeWithKryoSerializer(TestClass.class, new 
TestClassSerializer());
-
-        DataSet<Long> input = env.generateSequence(0, numElements - 1);
-
-        DataSet<TestClass> mapped =
-                input.map(
-                        new MapFunction<Long, TestClass>() {
-                            private static final long serialVersionUID = 
-529116076312998262L;
-
-                            @Override
-                            public TestClass map(Long value) throws Exception {
-                                return new TestClass(value);
-                            }
-                        });
-
-        List<TestClass> expected = new ArrayList<>(numElements);
-
-        for (int i = 0; i < numElements; i++) {
-            expected.add(new TestClass(42));
-        }
-
-        compareResultCollections(
-                expected,
-                mapped.collect(),
-                new Comparator<TestClass>() {
-                    @Override
-                    public int compare(TestClass o1, TestClass o2) {
-                        return (int) (o1.getValue() - o2.getValue());
-                    }
-                });
-    }
-
-    static class TestClass {
-        private final long value;
-        private Object obj = new Object();
-
-        public TestClass(long value) {
-            this.value = value;
-        }
-
-        public long getValue() {
-            return value;
-        }
-
-        @Override
-        public String toString() {
-            return "TestClass(" + value + ")";
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj instanceof TestClass) {
-                TestClass other = (TestClass) obj;
-
-                return value == other.value;
-            } else {
-                return false;
-            }
-        }
-
-        @Override
-        public int hashCode() {
-            return (int) value;
-        }
-    }
-
-    static class TestClassSerializer extends Serializer<TestClass> implements 
Serializable {
-
-        private static final long serialVersionUID = -3585880741695717533L;
-
-        @Override
-        public void write(Kryo kryo, Output output, TestClass testClass) {
-            output.writeLong(42);
-        }
-
-        @Override
-        public TestClass read(Kryo kryo, Input input, Class<TestClass> aClass) 
{
-            return new TestClass(input.readLong());
-        }
-    }
-}

Reply via email to