This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3fb8cf8a4ed Fix PubSub concurrent access error on shutdown (#7347)
3fb8cf8a4ed is described below
commit 3fb8cf8a4edf67f668b8ac92c49124ea85a3b75d
Author: vpaturet <[email protected]>
AuthorDate: Mon Apr 4 18:07:58 2022 +0200
Fix PubSub concurrent access error on shutdown (#7347)
---
.../camel/component/google/pubsub/GooglePubsubConsumer.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 06a9942de4a..e639e400e22 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -61,7 +61,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
- this.subscribers = new LinkedList<>();
+ this.subscribers = Collections.synchronizedList(new LinkedList<>());
this.pendingSynchronousPullResponses = Collections.synchronizedSet(new
HashSet<>());
String loggerId = endpoint.getLoggerId();
@@ -87,9 +87,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
super.doStop();
localLog.info("Stopping Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- if (subscribers != null && !subscribers.isEmpty()) {
- localLog.info("Stopping subscribers for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
- subscribers.forEach(AbstractApiService::stopAsync);
+ synchronized (subscribers) {
+ if (!subscribers.isEmpty()) {
+ localLog.info("Stopping subscribers for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
+ subscribers.forEach(AbstractApiService::stopAsync);
+ }
}
safeCancelSynchronousPullResponses();