eolivelli commented on a change in pull request #9481:
URL: https://github.com/apache/pulsar/pull/9481#discussion_r570977387



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -385,28 +388,43 @@ public void close() throws Exception {
         }
     }
 
+    @AllArgsConstructor
+    static class InitSchemaResult<T> {
+        final Schema<T> schema;
+        final boolean requireSink;
+    }
+
     @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Schema<T> initializeSchema() throws ClassNotFoundException {
+    InitSchemaResult<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            return (Schema<T>) Schema.BYTES;
+            return new InitSchemaResult((Schema<T>) Schema.BYTES, true);
         }
 
         Class<?> typeArg = 
Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), 
functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return null;
+            log.info("typeClassName is {}, no need to force a schema", 
this.pulsarSinkConfig.getTypeClassName());
+            return new InitSchemaResult(null, false);
+        }
+        if (GenericRecord.class.equals(typeArg)) {

Review comment:
       @sijie thank you for your suggestion.
   
   I have updated the patch according this comment.
   I have also tested manually and it works like a charm.
   
   The integration test asserts that everything is working properly  
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to