sijie commented on a change in pull request #5357: [functions][Issue:5350]Fix 
pulsar can't load the customized SerDe
URL: https://github.com/apache/pulsar/pull/5357#discussion_r339349691
 
 

 ##########
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 ##########
 @@ -163,6 +168,43 @@ private static boolean isProtobufClass(Class<?> 
pojoClazz) {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName, boolean input, ClassLoader classLoader) {
+        // The schemaTypeOrClassName can represent multiple thing, either a 
schema type, a schema class name or a ser-de
+        // class name.
+
+        if (StringUtils.isEmpty(schemaTypeOrClassName) || 
DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
+            // No preferred schema was provided, auto-discover schema or 
fallback to defaults
+            return newSchemaInstance(clazz, getSchemaTypeOrDefault(topic, 
clazz));
+        }
+
+        SchemaType schemaType = null;
+        try {
+            schemaType = 
SchemaType.valueOf(schemaTypeOrClassName.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            // schemaType is not referring to builtin type
+        }
+
+        if (schemaType != null) {
+            // The parameter passed was indeed a valid builtin schema type
+            return newSchemaInstance(clazz, schemaType);
+        }
+
+        // At this point, the string can represent either a schema or serde 
class name. Create an instance and
+        // check if it complies with either interface
+
+        // First try with Schema
+        try {
+            return (Schema<T>) 
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
+                    classLoader, clazz, input);
+        } catch (Throwable t) {
+            // Now try with Serde or just fail
+            SerDe<T> serDe = (SerDe<T>) 
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
+                    classLoader, clazz, input);
+            return new SerDeSchema<>(serDe);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName, boolean input) {
 
 Review comment:
   Can you rewrite this method by calling the method in line 172?
   
   ```
   private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String 
schemaTypeOrClassName, boolean input) {
       return newSchemaInstance(topic, clazz, schemaTypeOrClassName, input, 
Thread.currentThread().getContextClassLoader());
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to