This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2fad26ad10f3 CAMEL-22898: Fix memory leak in Google PubSub consumer
(#21209)
2fad26ad10f3 is described below
commit 2fad26ad10f33c5eb4d7116ba89b84395894bb4a
Author: Guillaume Nodet <[email protected]>
AuthorDate: Mon Feb 2 14:10:15 2026 +0100
CAMEL-22898: Fix memory leak in Google PubSub consumer (#21209)
- Fix memory leak by removing subscribers before stopping client
- Add proper cleanup in error paths
- Use ApiException.isRetryable() to distinguish error types
- Replace Thread.sleep() with Camel's Task API for retry delays
- Apply fixes to both PubSub and PubSub Lite consumers
---
.../pubsublite/GooglePubsubLiteConsumer.java | 50 +++++++++++++++++++++-
.../google/pubsub/GooglePubsubConsumer.java | 48 ++++++++++++++++++++-
2 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
index f4aa9de2a0a3..40ca62c54ebb 100644
---
a/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
+++
b/components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteConsumer.java
@@ -16,11 +16,13 @@
*/
package org.apache.camel.component.google.pubsublite;
+import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.common.base.Strings;
@@ -28,6 +30,8 @@ import com.google.pubsub.v1.ProjectSubscriptionName;
import org.apache.camel.Processor;
import
org.apache.camel.component.google.pubsublite.consumer.CamelMessageReceiver;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,20 +111,62 @@ public class GooglePubsubLiteConsumer extends
DefaultConsumer {
= new
CamelMessageReceiver(GooglePubsubLiteConsumer.this, endpoint, processor);
Subscriber subscriber =
endpoint.getComponent().getSubscriber(messageReceiver, endpoint);
+ boolean subscriberAdded = false;
try {
- subscribers.add(subscriber);
subscriber.startAsync().awaitRunning();
+ // Only add to list after successful startup
+ subscribers.add(subscriber);
+ subscriberAdded = true;
subscriber.awaitTerminated();
} catch (Exception e) {
- localLog.error("Failure getting messages from PubSub
Lite", e);
+ // Remove from list if it was added
+ if (subscriberAdded) {
+ subscribers.remove(subscriber);
+ }
+
+ // Check if error is recoverable
+ boolean isRecoverable = false;
+ if (e instanceof ApiException) {
+ isRecoverable = ((ApiException) e).isRetryable();
+ } else if (e.getCause() instanceof ApiException) {
+ isRecoverable = ((ApiException)
e.getCause()).isRetryable();
+ }
+
+ if (isRecoverable) {
+ localLog.error("Retryable error getting messages
from PubSub Lite", e);
+ } else {
+ localLog.error("Non-recoverable error getting
messages from PubSub Lite, stopping subscriber loop",
+ e);
+ }
// allow camel error handler to be aware
if (endpoint.isBridgeErrorHandler()) {
getExceptionHandler().handleException(e);
}
+
+ // For non-recoverable errors, exit the loop
+ if (!isRecoverable) {
+ break;
+ }
+
+ // Add backoff delay for recoverable errors to prevent
tight loop
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+
.withInitialDelay(Duration.ofSeconds(5))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("PubSubLiteRetryDelay")
+ .build()
+ .run(getEndpoint().getCamelContext(), () ->
true);
} finally {
localLog.debug("Stopping async subscriber {}",
subscriptionName);
subscriber.stopAsync();
+ // Ensure cleanup from list
+ if (subscriberAdded) {
+ subscribers.remove(subscriber);
+ }
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 1f13e556cf50..26d60949ac26 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.google.pubsub;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -48,6 +49,8 @@ import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,20 +168,61 @@ public class GooglePubsubConsumer extends DefaultConsumer
{
MessageReceiver messageReceiver = new
CamelMessageReceiver(GooglePubsubConsumer.this, endpoint, processor);
Subscriber subscriber =
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver,
endpoint);
+ boolean subscriberAdded = false;
try {
- subscribers.add(subscriber);
subscriber.startAsync().awaitRunning();
+ // Only add to list after successful startup
+ subscribers.add(subscriber);
+ subscriberAdded = true;
subscriber.awaitTerminated();
} catch (Exception e) {
- localLog.error("Failure getting messages from PubSub", e);
+ // Remove from list if it was added
+ if (subscriberAdded) {
+ subscribers.remove(subscriber);
+ }
+
+ // Check if error is recoverable
+ boolean isRecoverable = false;
+ if (e instanceof ApiException) {
+ isRecoverable = ((ApiException) e).isRetryable();
+ } else if (e.getCause() instanceof ApiException) {
+ isRecoverable = ((ApiException)
e.getCause()).isRetryable();
+ }
+
+ if (isRecoverable) {
+ localLog.error("Retryable error getting messages from
PubSub", e);
+ } else {
+ localLog.error("Non-recoverable error getting messages
from PubSub, stopping subscriber loop", e);
+ }
// allow camel error handler to be aware
if (endpoint.isBridgeErrorHandler()) {
getExceptionHandler().handleException(e);
}
+
+ // For non-recoverable errors, exit the loop
+ if (!isRecoverable) {
+ break;
+ }
+
+ // Add backoff delay for recoverable errors to prevent
tight loop
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+ .withInitialDelay(Duration.ofSeconds(5))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("PubSubRetryDelay")
+ .build()
+ .run(getEndpoint().getCamelContext(), () -> true);
} finally {
localLog.debug("Stopping async subscriber {}",
subscriptionName);
subscriber.stopAsync();
+ // Ensure cleanup from list
+ if (subscriberAdded) {
+ subscribers.remove(subscriber);
+ }
}
}
}