sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix pulsar can't load the customized SerDe URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349691
########## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java ########## @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class<?> pojoClazz) { } } + @SuppressWarnings("unchecked") + private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input, ClassLoader classLoader) { + // The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de + // class name. + + if (StringUtils.isEmpty(schemaTypeOrClassName) || DEFAULT_SERDE.equals(schemaTypeOrClassName)) { + // No preferred schema was provided, auto-discover schema or fallback to defaults + return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic, clazz)); + } + + SchemaType schemaType = null; + try { + schemaType = SchemaType.valueOf(schemaTypeOrClassName.toUpperCase()); + } catch (IllegalArgumentException e) { + // schemaType is not referring to builtin type + } + + if (schemaType != null) { + // The parameter passed was indeed a valid builtin schema type + return newSchemaInstance(clazz, schemaType); + } + + // At this point, the string can represent either a schema or serde class name. Create an instance and + // check if it complies with either interface + + // First try with Schema + try { + return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName, + classLoader, clazz, input); + } catch (Throwable t) { + // Now try with Serde or just fail + SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName, + classLoader, clazz, input); + return new SerDeSchema<>(serDe); + } + } + @SuppressWarnings("unchecked") private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) { Review comment: Can you rewrite this method by calling the method in line 172? ``` private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) { return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, Thread.currentThread().getContextClassLoader()); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services