JunRuiLee commented on code in PR #24182:
URL: https://github.com/apache/flink/pull/24182#discussion_r1464545618
##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -408,4 +431,104 @@ private <T extends Class> T loadClass(
throw new IllegalArgumentException(errorMessage, e);
}
}
+
+ private void parseSerializationConfigWithExceptionHandling(
+ ClassLoader classLoader, Map<String, String> serializationConfigs)
{
+ try {
+ parseSerializationConfig(classLoader, serializationConfigs);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Could not configure serializers from %s.",
serializationConfigs),
+ e);
+ }
+ }
+
+ private void parseSerializationConfig(
+ ClassLoader classLoader, Map<String, String> serializationConfigs)
{
+ final Map<Class<?>, Map<String, String>> serializationConfigByClass =
+ serializationConfigs.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e ->
+ loadClass(
+ e.getKey(),
+ classLoader,
+ "Could not load class
for serialization config"),
+ e ->
ConfigurationUtils.parseStringToMap(e.getValue())));
+ for (Map.Entry<Class<?>, Map<String, String>> entry :
+ serializationConfigByClass.entrySet()) {
+ Class<?> type = entry.getKey();
+ Map<String, String> config = entry.getValue();
+ String configType = config.get("type");
+ switch (configType) {
+ case "pojo":
+ registerPojoType(type);
+ break;
+ case "kryo":
+ parseAndRegisterKryoType(classLoader, type, config);
+ break;
+ case "typeinfo":
+ parseAndRegisterTypeFactory(classLoader, type, config);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
configType);
+ }
+ }
+ }
+
+ private void parseAndRegisterKryoType(
+ ClassLoader classLoader, Class<?> t, Map<String, String> m) {
+ String kryoType = m.get("kryo-type");
+ if (kryoType == null) {
+ registerKryoType(t);
+ } else {
+ switch (kryoType) {
+ case "default":
+ addDefaultKryoSerializer(
+ t,
+ loadClass(
+ m.get("class"),
+ classLoader,
+ "Could not load serializer's class"));
+ break;
+ case "registered":
+ registerTypeWithKryoSerializer(
+ t,
+ loadClass(
+ m.get("class"),
+ classLoader,
+ "Could not load serializer's class"));
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void parseAndRegisterTypeFactory(
+ ClassLoader classLoader, Class<?> t, Map<String, String> m) {
+ Class<? extends TypeInfoFactory<?>> factoryClass =
+ loadClass(m.get("class"), classLoader, "Could not load
TypeInfoFactory's class");
+ // Register in the global static factory map of TypeExtractor for now
so that it can be
+ // accessed from
+ // the static methods of TypeExtractor where SerializerConfig is
currently not accessible
Review Comment:
ditto
##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -69,8 +75,11 @@ public final class SerializerConfig implements Serializable {
private LinkedHashSet<Class<?>> registeredPojoTypes = new
LinkedHashSet<>();
- private LinkedHashMap<Class<?>, Class<TypeInfoFactory<?>>>
registeredTypeFactories =
- new LinkedHashMap<>();
+ // Order is not required as we will traverse the type hierarchy up to find
the closest type
+ // information factory
+ // when extracting the type information.
Review Comment:
can be included as a single line
##########
flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java:
##########
@@ -352,6 +361,20 @@ public void configure(ReadableConfig configuration,
ClassLoader classLoader) {
.getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
.map(c -> loadClasses(c, classLoader, "Could not load kryo
type to be registered."))
.ifPresent(c -> this.registeredKryoTypes = c);
+
+ try {
+ configuration
+ .getOptional(PipelineOptions.SERIALIZATION_CONFIG)
+ .ifPresent(c ->
parseSerializationConfigWithExceptionHandling(classLoader, c));
+ } catch (Exception e) {
+ if (!GlobalConfiguration.isStandardYaml()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s is only supported with the standard YAML
config parser, please use \"config.yaml\" as the config file.",
+ PipelineOptions.SERIALIZATION_CONFIG.key()));
+ }
Review Comment:
Shall we add the Exception e to UnsupportedOperationException object as root
cause?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]