This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fb8cf8a4ed Fix PubSub concurrent access error on shutdown (#7347)
3fb8cf8a4ed is described below

commit 3fb8cf8a4edf67f668b8ac92c49124ea85a3b75d
Author: vpaturet <[email protected]>
AuthorDate: Mon Apr 4 18:07:58 2022 +0200

    Fix PubSub concurrent access error on shutdown (#7347)
---
 .../camel/component/google/pubsub/GooglePubsubConsumer.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 06a9942de4a..e639e400e22 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -61,7 +61,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
-        this.subscribers = new LinkedList<>();
+        this.subscribers = Collections.synchronizedList(new LinkedList<>());
         this.pendingSynchronousPullResponses = Collections.synchronizedSet(new 
HashSet<>());
         String loggerId = endpoint.getLoggerId();
 
@@ -87,9 +87,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         super.doStop();
         localLog.info("Stopping Google PubSub consumer for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
 
-        if (subscribers != null && !subscribers.isEmpty()) {
-            localLog.info("Stopping subscribers for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
-            subscribers.forEach(AbstractApiService::stopAsync);
+        synchronized (subscribers) {
+            if (!subscribers.isEmpty()) {
+                localLog.info("Stopping subscribers for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
+                subscribers.forEach(AbstractApiService::stopAsync);
+            }
         }
 
         safeCancelSynchronousPullResponses();

Reply via email to