This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fad26ad10f3 CAMEL-22898: Fix memory leak in Google PubSub consumer 
(#21209)
2fad26ad10f3 is described below

commit 2fad26ad10f33c5eb4d7116ba89b84395894bb4a
Author: Guillaume Nodet <[email protected]>
AuthorDate: Mon Feb 2 14:10:15 2026 +0100

    CAMEL-22898: Fix memory leak in Google PubSub consumer (#21209)
    
     - Fix memory leak by removing subscribers before stopping client
     - Add proper cleanup in error paths
     - Use ApiException.isRetryable() to distinguish error types
     - Replace Thread.sleep() with Camel's Task API for retry delays
     - Apply fixes to both PubSub and PubSub Lite consumers
---
 .../pubsublite/GooglePubsubLiteConsumer.java       | 50 +++++++++++++++++++++-
 .../google/pubsub/GooglePubsubConsumer.java        | 48 ++++++++++++++++++++-
 2 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
index f4aa9de2a0a3..40ca62c54ebb 100644
--- 
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.google.pubsublite;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
 import com.google.common.base.Strings;
@@ -28,6 +30,8 @@ import com.google.pubsub.v1.ProjectSubscriptionName;
 import org.apache.camel.Processor;
 import 
org.apache.camel.component.google.pubsublite.consumer.CamelMessageReceiver;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,20 +111,62 @@ public class GooglePubsubLiteConsumer extends 
DefaultConsumer {
                             = new 
CamelMessageReceiver(GooglePubsubLiteConsumer.this, endpoint, processor);
 
                     Subscriber subscriber = 
endpoint.getComponent().getSubscriber(messageReceiver, endpoint);
+                    boolean subscriberAdded = false;
                     try {
-                        subscribers.add(subscriber);
                         subscriber.startAsync().awaitRunning();
+                        // Only add to list after successful startup
+                        subscribers.add(subscriber);
+                        subscriberAdded = true;
                         subscriber.awaitTerminated();
                     } catch (Exception e) {
-                        localLog.error("Failure getting messages from PubSub 
Lite", e);
+                        // Remove from list if it was added
+                        if (subscriberAdded) {
+                            subscribers.remove(subscriber);
+                        }
+
+                        // Check if error is recoverable
+                        boolean isRecoverable = false;
+                        if (e instanceof ApiException) {
+                            isRecoverable = ((ApiException) e).isRetryable();
+                        } else if (e.getCause() instanceof ApiException) {
+                            isRecoverable = ((ApiException) 
e.getCause()).isRetryable();
+                        }
+
+                        if (isRecoverable) {
+                            localLog.error("Retryable error getting messages 
from PubSub Lite", e);
+                        } else {
+                            localLog.error("Non-recoverable error getting 
messages from PubSub Lite, stopping subscriber loop",
+                                    e);
+                        }
 
                         // allow camel error handler to be aware
                         if (endpoint.isBridgeErrorHandler()) {
                             getExceptionHandler().handleException(e);
                         }
+
+                        // For non-recoverable errors, exit the loop
+                        if (!isRecoverable) {
+                            break;
+                        }
+
+                        // Add backoff delay for recoverable errors to prevent 
tight loop
+                        // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                        Tasks.foregroundTask()
+                                .withBudget(Budgets.iterationBudget()
+                                        .withMaxIterations(1)
+                                        
.withInitialDelay(Duration.ofSeconds(5))
+                                        .withInterval(Duration.ZERO)
+                                        .build())
+                                .withName("PubSubLiteRetryDelay")
+                                .build()
+                                .run(getEndpoint().getCamelContext(), () -> 
true);
                     } finally {
                         localLog.debug("Stopping async subscriber {}", 
subscriptionName);
                         subscriber.stopAsync();
+                        // Ensure cleanup from list
+                        if (subscriberAdded) {
+                            subscribers.remove(subscriber);
+                        }
                     }
                 }
 
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 1f13e556cf50..26d60949ac26 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.google.pubsub;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -48,6 +49,8 @@ import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -165,20 +168,61 @@ public class GooglePubsubConsumer extends DefaultConsumer 
{
                 MessageReceiver messageReceiver = new 
CamelMessageReceiver(GooglePubsubConsumer.this, endpoint, processor);
 
                 Subscriber subscriber = 
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver, 
endpoint);
+                boolean subscriberAdded = false;
                 try {
-                    subscribers.add(subscriber);
                     subscriber.startAsync().awaitRunning();
+                    // Only add to list after successful startup
+                    subscribers.add(subscriber);
+                    subscriberAdded = true;
                     subscriber.awaitTerminated();
                 } catch (Exception e) {
-                    localLog.error("Failure getting messages from PubSub", e);
+                    // Remove from list if it was added
+                    if (subscriberAdded) {
+                        subscribers.remove(subscriber);
+                    }
+
+                    // Check if error is recoverable
+                    boolean isRecoverable = false;
+                    if (e instanceof ApiException) {
+                        isRecoverable = ((ApiException) e).isRetryable();
+                    } else if (e.getCause() instanceof ApiException) {
+                        isRecoverable = ((ApiException) 
e.getCause()).isRetryable();
+                    }
+
+                    if (isRecoverable) {
+                        localLog.error("Retryable error getting messages from 
PubSub", e);
+                    } else {
+                        localLog.error("Non-recoverable error getting messages 
from PubSub, stopping subscriber loop", e);
+                    }
 
                     // allow camel error handler to be aware
                     if (endpoint.isBridgeErrorHandler()) {
                         getExceptionHandler().handleException(e);
                     }
+
+                    // For non-recoverable errors, exit the loop
+                    if (!isRecoverable) {
+                        break;
+                    }
+
+                    // Add backoff delay for recoverable errors to prevent 
tight loop
+                    // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                    Tasks.foregroundTask()
+                            .withBudget(Budgets.iterationBudget()
+                                    .withMaxIterations(1)
+                                    .withInitialDelay(Duration.ofSeconds(5))
+                                    .withInterval(Duration.ZERO)
+                                    .build())
+                            .withName("PubSubRetryDelay")
+                            .build()
+                            .run(getEndpoint().getCamelContext(), () -> true);
                 } finally {
                     localLog.debug("Stopping async subscriber {}", 
subscriptionName);
                     subscriber.stopAsync();
+                    // Ensure cleanup from list
+                    if (subscriberAdded) {
+                        subscribers.remove(subscriber);
+                    }
                 }
             }
         }

Reply via email to