sijie commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r577022682
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
- consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+ if (GenericRecord.class.isAssignableFrom(typeArg)) {
Review comment:
@eolivelli Yes and no on my original comment.
My original comment is to make sure we returned the write schema information
via TopicSchema. Because we are using `AUTO_CONSUME` in the PulsarSink to
indicate `GenericRecord` are published to the Pulsar topic. `AUTO_CONSUME` can
be used by both source and sink. In order to not impact sources, I didn't add
the logic in `TopicSchema`. Instead, I add it in PulsarSink to make it more
explicit, which results in one line of similar change as your initial change.
But it doesn't your original and current implementation is in the right
direction.
The main problem of your previous and current implementation on #9481 is you
are trying to hijack the existing AVRO implementation to introduce the support
of lazy schema initialization. The lazy schema initialization is already
implemented as part of multi-schema write support. So you don't need to add
such a hack.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]