davsclaus commented on a change in pull request #5872:
URL: https://github.com/apache/camel/pull/5872#discussion_r677968641



##########
File path: 
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
         return strategy.create(endpoint);
     }
 
+    private Collection<ExecutorService> 
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint 
endpoint)
+            throws Exception {
+        int numThreads = 
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+        return consumers.stream().map(consumer -> {
+            ExecutorService executor = 
Executors.newFixedThreadPool(numThreads);
+            for (int i = 0; i < numThreads; i++) {
+                executor.submit(() -> {

Review comment:
       Also it may be worth to move this into its own private inner class so it 
has a class name that makes stacktraces and logging/debugging easier.

##########
File path: 
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
##########
@@ -31,6 +33,20 @@
     private PulsarUtils() {
     }
 
+    public static Queue<ExecutorService> stopExecutors(final 
Queue<ExecutorService> executors) {
+        for (ExecutorService executor : executors) {

Review comment:
       Use Camel's ExecutorServiceManager which has API for stopping thread 
pools

##########
File path: 
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
         return strategy.create(endpoint);
     }
 
+    private Collection<ExecutorService> 
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint 
endpoint)
+            throws Exception {
+        int numThreads = 
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+        return consumers.stream().map(consumer -> {
+            ExecutorService executor = 
Executors.newFixedThreadPool(numThreads);

Review comment:
       Use Camel's executor service API for this, see getExecutorServiceManager 
on CamelContext

##########
File path: 
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
         return strategy.create(endpoint);
     }
 
+    private Collection<ExecutorService> 
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint 
endpoint)
+            throws Exception {
+        int numThreads = 
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+        return consumers.stream().map(consumer -> {
+            ExecutorService executor = 
Executors.newFixedThreadPool(numThreads);
+            for (int i = 0; i < numThreads; i++) {
+                executor.submit(() -> {
+                    PulsarMessageListener listener = new 
PulsarMessageListener(endpoint, this);
+                    while (true) {
+                        try {
+                            Message<byte[]> msg = consumer.receive();
+                            listener.received(consumer, msg);
+                        } catch (PulsarClientException e) {
+                            if (e.getCause() instanceof InterruptedException) {
+                                // propagate interrupt
+                                LOGGER.info("Received shutdown signal, 
exiting");
+                                break;
+                            }
+                            LOGGER.error("Encountered exception while 
receiving message", e);

Review comment:
       Use getExceptionHandler to handle this exception.
   
   Also you would need to catch Exception to have a general catch and handle to 
keep the thread up and running.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to