JunRuiLee commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464545618


##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -408,4 +431,104 @@ private <T extends Class> T loadClass(
             throw new IllegalArgumentException(errorMessage, e);
         }
     }
+
+    private void parseSerializationConfigWithExceptionHandling(
+            ClassLoader classLoader, Map<String, String> serializationConfigs) 
{
+        try {
+            parseSerializationConfig(classLoader, serializationConfigs);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    String.format("Could not configure serializers from %s.", 
serializationConfigs),
+                    e);
+        }
+    }
+
+    private void parseSerializationConfig(
+            ClassLoader classLoader, Map<String, String> serializationConfigs) 
{
+        final Map<Class<?>, Map<String, String>> serializationConfigByClass =
+                serializationConfigs.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        e ->
+                                                loadClass(
+                                                        e.getKey(),
+                                                        classLoader,
+                                                        "Could not load class 
for serialization config"),
+                                        e -> 
ConfigurationUtils.parseStringToMap(e.getValue())));
+        for (Map.Entry<Class<?>, Map<String, String>> entry :
+                serializationConfigByClass.entrySet()) {
+            Class<?> type = entry.getKey();
+            Map<String, String> config = entry.getValue();
+            String configType = config.get("type");
+            switch (configType) {
+                case "pojo":
+                    registerPojoType(type);
+                    break;
+                case "kryo":
+                    parseAndRegisterKryoType(classLoader, type, config);
+                    break;
+                case "typeinfo":
+                    parseAndRegisterTypeFactory(classLoader, type, config);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported type: " + 
configType);
+            }
+        }
+    }
+
+    private void parseAndRegisterKryoType(
+            ClassLoader classLoader, Class<?> t, Map<String, String> m) {
+        String kryoType = m.get("kryo-type");
+        if (kryoType == null) {
+            registerKryoType(t);
+        } else {
+            switch (kryoType) {
+                case "default":
+                    addDefaultKryoSerializer(
+                            t,
+                            loadClass(
+                                    m.get("class"),
+                                    classLoader,
+                                    "Could not load serializer's class"));
+                    break;
+                case "registered":
+                    registerTypeWithKryoSerializer(
+                            t,
+                            loadClass(
+                                    m.get("class"),
+                                    classLoader,
+                                    "Could not load serializer's class"));
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private void parseAndRegisterTypeFactory(
+            ClassLoader classLoader, Class<?> t, Map<String, String> m) {
+        Class<? extends TypeInfoFactory<?>> factoryClass =
+                loadClass(m.get("class"), classLoader, "Could not load 
TypeInfoFactory's class");
+        // Register in the global static factory map of TypeExtractor for now 
so that it can be
+        // accessed from
+        // the static methods of TypeExtractor where SerializerConfig is 
currently not accessible

Review Comment:
   ditto



##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable {
 
     private LinkedHashSet<Class<?>> registeredPojoTypes = new 
LinkedHashSet<>();
 
-    private LinkedHashMap<Class<?>, Class<TypeInfoFactory<?>>> 
registeredTypeFactories =
-            new LinkedHashMap<>();
+    // Order is not required as we will traverse the type hierarchy up to find 
the closest type
+    // information factory
+    // when extracting the type information.

Review Comment:
   can be included as a single line



##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration, 
ClassLoader classLoader) {
                 .getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
                 .map(c -> loadClasses(c, classLoader, "Could not load kryo 
type to be registered."))
                 .ifPresent(c -> this.registeredKryoTypes = c);
+
+        try {
+            configuration
+                    .getOptional(PipelineOptions.SERIALIZATION_CONFIG)
+                    .ifPresent(c -> 
parseSerializationConfigWithExceptionHandling(classLoader, c));
+        } catch (Exception e) {
+            if (!GlobalConfiguration.isStandardYaml()) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "%s is only supported with the standard YAML 
config parser, please use \"config.yaml\" as the config file.",
+                                PipelineOptions.SERIALIZATION_CONFIG.key()));
+            }

Review Comment:
   Shall we add the Exception e to UnsupportedOperationException object as root 
cause?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to