davsclaus commented on a change in pull request #5872:
URL: https://github.com/apache/camel/pull/5872#discussion_r677968641
##########
File path:
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
return strategy.create(endpoint);
}
+ private Collection<ExecutorService>
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint
endpoint)
+ throws Exception {
+ int numThreads =
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+ return consumers.stream().map(consumer -> {
+ ExecutorService executor =
Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(() -> {
Review comment:
Also it may be worth to move this into its own private inner class so it
has a class name that makes stacktraces and logging/debugging easier.
##########
File path:
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/PulsarUtils.java
##########
@@ -31,6 +33,20 @@
private PulsarUtils() {
}
+ public static Queue<ExecutorService> stopExecutors(final
Queue<ExecutorService> executors) {
+ for (ExecutorService executor : executors) {
Review comment:
Use Camel's ExecutorServiceManager which has API for stopping thread
pools
##########
File path:
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
return strategy.create(endpoint);
}
+ private Collection<ExecutorService>
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint
endpoint)
+ throws Exception {
+ int numThreads =
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+ return consumers.stream().map(consumer -> {
+ ExecutorService executor =
Executors.newFixedThreadPool(numThreads);
Review comment:
Use Camel's executor service API for this, see getExecutorServiceManager
on CamelContext
##########
File path:
components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConsumer.java
##########
@@ -76,4 +90,31 @@ protected void doResume() throws Exception {
return strategy.create(endpoint);
}
+ private Collection<ExecutorService>
subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint
endpoint)
+ throws Exception {
+ int numThreads =
endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
+ return consumers.stream().map(consumer -> {
+ ExecutorService executor =
Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(() -> {
+ PulsarMessageListener listener = new
PulsarMessageListener(endpoint, this);
+ while (true) {
+ try {
+ Message<byte[]> msg = consumer.receive();
+ listener.received(consumer, msg);
+ } catch (PulsarClientException e) {
+ if (e.getCause() instanceof InterruptedException) {
+ // propagate interrupt
+ LOGGER.info("Received shutdown signal,
exiting");
+ break;
+ }
+ LOGGER.error("Encountered exception while
receiving message", e);
Review comment:
Use getExceptionHandler to handle this exception.
Also you would need to catch Exception to have a general catch and handle to
keep the thread up and running.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]