davsclaus commented on a change in pull request #5872: URL: https://github.com/apache/camel/pull/5872#discussion_r677968641
########## File path: components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java ########## @@ -76,4 +90,31 @@ protected void doResume() throws Exception { return strategy.create(endpoint); } + private Collection<ExecutorService> subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint endpoint) + throws Exception { + int numThreads = endpoint.getPulsarConfiguration().getNumberOfConsumerThreads(); + return consumers.stream().map(consumer -> { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { Review comment: Also it may be worth to move this into its own private inner class so it has a class name that makes stacktraces and logging/debugging easier. ########## File path: components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java ########## @@ -31,6 +33,20 @@ private PulsarUtils() { } + public static Queue<ExecutorService> stopExecutors(final Queue<ExecutorService> executors) { + for (ExecutorService executor : executors) { Review comment: Use Camel's ExecutorServiceManager which has API for stopping thread pools ########## File path: components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java ########## @@ -76,4 +90,31 @@ protected void doResume() throws Exception { return strategy.create(endpoint); } + private Collection<ExecutorService> subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint endpoint) + throws Exception { + int numThreads = endpoint.getPulsarConfiguration().getNumberOfConsumerThreads(); + return consumers.stream().map(consumer -> { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); Review comment: Use Camel's executor service API for this, see getExecutorServiceManager on CamelContext ########## File path: components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java ########## @@ -76,4 +90,31 @@ protected void doResume() throws Exception { return strategy.create(endpoint); } + private Collection<ExecutorService> subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint endpoint) + throws Exception { + int numThreads = endpoint.getPulsarConfiguration().getNumberOfConsumerThreads(); + return consumers.stream().map(consumer -> { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + PulsarMessageListener listener = new PulsarMessageListener(endpoint, this); + while (true) { + try { + Message<byte[]> msg = consumer.receive(); + listener.received(consumer, msg); + } catch (PulsarClientException e) { + if (e.getCause() instanceof InterruptedException) { + // propagate interrupt + LOGGER.info("Received shutdown signal, exiting"); + break; + } + LOGGER.error("Encountered exception while receiving message", e); Review comment: Use getExceptionHandler to handle this exception. Also you would need to catch Exception to have a general catch and handle to keep the thread up and running. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org