sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349691
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
##########
@@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class<?>
pojoClazz) {
}
}
+ @SuppressWarnings("unchecked")
+ private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz,
String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
+ // The schemaTypeOrClassName can represent multiple thing, either a
schema type, a schema class name or a ser-de
+ // class name.
+
+ if (StringUtils.isEmpty(schemaTypeOrClassName) ||
DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
+ // No preferred schema was provided, auto-discover schema or
fallback to defaults
+ return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic,
clazz));
+ }
+
+ SchemaType schemaType = null;
+ try {
+ schemaType =
SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ // schemaType is not referring to builtin type
+ }
+
+ if (schemaType != null) {
+ // The parameter passed was indeed a valid builtin schema type
+ return newSchemaInstance(clazz, schemaType);
+ }
+
+ // At this point, the string can represent either a schema or serde
class name. Create an instance and
+ // check if it complies with either interface
+
+ // First try with Schema
+ try {
+ return (Schema<T>)
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
+ classLoader, clazz, input);
+ } catch (Throwable t) {
+ // Now try with Serde or just fail
+ SerDe<T> serDe = (SerDe<T>)
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
+ classLoader, clazz, input);
+ return new SerDeSchema<>(serDe);
+ }
+ }
+
@SuppressWarnings("unchecked")
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz,
String schemaTypeOrClassName, boolean input) {
Review comment:
Can you rewrite this method by calling the method in line 172?
```
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String
schemaTypeOrClassName, boolean input) {
return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input,
Thread.currentThread().getContextClassLoader());
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services