dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r629744063
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
cb.messageListener(this);
Consumer<T> consumer = cb.subscribeAsync().join();
- inputConsumers.add(consumer);
+ inputConsumers.put(TopicName.get(topic), consumer);
+ }
+ if (sourceContext instanceof ExtendedSourceContext) {
Review comment:
One way or another the PulsarSource will need to set data to the
context.
In this case we either make pulsar source depend on concrete ContextImpl, or
extend SourceContext with a setter that is not needed in actual Source, or add
the interface that extends SourceContext to not make that method a part of the
public API.
Whether it sets map of consumers or a func that resolves it is a matter of
choice. I preferred the func to keep topic comparison logic in the PulsarSource
plus case PulsarSource ever decides to create consumers dynamically/after open()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]