dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630431079
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
cb.messageListener(this);
Consumer<T> consumer = cb.subscribeAsync().join();
- inputConsumers.add(consumer);
+ inputConsumers.put(TopicName.get(topic), consumer);
+ }
+ if (sourceContext instanceof ExtendedSourceContext) {
+ ((ExtendedSourceContext)
sourceContext).setConsumerGetter(topicName -> {
+ try {
+ TopicName req = TopicName.get(topicName);
+ if (inputConsumers.containsKey(req)) {
+ return inputConsumers.get(req);
+ }
+ } catch (Exception e) {
+ return null;
Review comment:
`java.util.Function` only allow unchecked exceptions there, and I don't
think extra interface w/Exception is justified and that exception provides more
value. Exception can come from `TopicName.get()` witch throws RuntimeException
and IllegalArgumentException AFAICT. tbh I don't really see benefits of
figuring out which one happened.
I'll add logging.
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
##########
@@ -71,6 +73,19 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
ConsumerBuilder<T> cb = createConsumeBuilder(topic,
pulsarSourceConsumerConfig);
consumer = cb.subscribeAsync().join();
+ if (sourceContext instanceof ExtendedSourceContext) {
+ ((ExtendedSourceContext)
sourceContext).setConsumerGetter(topicName -> {
+ try {
+ TopicName src = TopicName.get(topic);
+ if (src.equals(TopicName.get(topicName))) {
+ return consumer;
+ }
+ } catch (Exception e) {
Review comment:
same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]