sijie commented on a change in pull request #9481:
URL: https://github.com/apache/pulsar/pull/9481#discussion_r570723440



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -385,28 +388,43 @@ public void close() throws Exception {
         }
     }
 
+    @AllArgsConstructor
+    static class InitSchemaResult<T> {
+        final Schema<T> schema;
+        final boolean requireSink;
+    }
+
     @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Schema<T> initializeSchema() throws ClassNotFoundException {
+    InitSchemaResult<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            return (Schema<T>) Schema.BYTES;
+            return new InitSchemaResult((Schema<T>) Schema.BYTES, true);
         }
 
         Class<?> typeArg = 
Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), 
functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return null;
+            log.info("typeClassName is {}, no need to force a schema", 
this.pulsarSinkConfig.getTypeClassName());
+            return new InitSchemaResult(null, false);
+        }
+        if (GenericRecord.class.equals(typeArg)) {

Review comment:
       I am not sure this implementation here is correct. The schema should be 
inferred by `TopicSchema.getSchema`. You should be able to obtain the right 
schema instance by inferring it from the `typeArg`, `sinkSchemaType` 
(`sinkSchemaType` is same as the `sourceSchemaType`).
   
   I think the right direction is to fix 
https://github.com/apache/pulsar/blob/0469dfe2c7804bd9ca9ea34e95d83b2196216cf9/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java#L146,
 make sure we construct the right schema when type is `GenericRecord`.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to