[ 
https://issues.apache.org/jira/browse/CAMEL-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrea Cosentino reassigned CAMEL-16263:
----------------------------------------

    Assignee: Andrea Cosentino

> camel-google-pubsub - Consumer does not recover from 500 series error from 
> Google
> ---------------------------------------------------------------------------------
>
>                 Key: CAMEL-16263
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16263
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-google-pubsub
>    Affects Versions: 3.8.0
>            Reporter: Donal Anglin
>            Assignee: Andrea Cosentino
>            Priority: Major
>             Fix For: 3.10.0
>
>
> When the {{GooglePubsubConsumer}} receives a 500 series error from Google I 
> expect it to catch that exception, log it and continue trying to pull 
> messages from PubSub.
> What is actually happening is when an error like that occurs, the 
> {{SubscriberWrapper}} thread started by the {{GooglePubsubConsumer}} exits 
> and never restarts. The application will pull no more messages from PubSub 
> for the lifetime of the application.
> The error is intermittent but can be reproduced using a simple route and 
> waiting:
> {code:java}
> public class PubSubRoutes extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         
> from("google-pubsub://{{gcp.project.id}}:{{gcp.subscription.id}}?synchronousPull=true")
>                 .log(LoggingLevel.INFO, "MessageReceived! ${body}");
>     }
> }
> {code}
> Sample project: [https://github.com/anglind/camel-pubsub-bug]
> The error log:
> {code:java}
> 16:58:36.394 [com.github.anglind.App.main()] INFO  
> o.a.c.c.g.p.GooglePubsubConsumer - Starting Google PubSub consumer for 
> myproject/camel-test-topic-sub
> 16:58:36.450 [com.github.anglind.App.main()] INFO  
> o.a.c.i.engine.AbstractCamelContext - Routes startup summary (total:1 
> started:1)
> 16:58:36.451 [com.github.anglind.App.main()] INFO  
> o.a.c.i.engine.AbstractCamelContext -      Started route1 
> (google-pubsub://myproject:camel-test-topic-sub)
> 16:58:36.452 [com.github.anglind.App.main()] INFO  
> o.a.c.i.engine.AbstractCamelContext - Apache Camel 3.8.0 (camel-1) started in 
> 308ms (build:42ms init:204ms start:62ms)
> 16:59:04.044 [Camel (camel-1) thread #0 - 
> GooglePubsubConsumer[camel-test-topic-sub]] INFO  route1 - MessageReceived! 
> This is a message I published
> 18:23:12.719 [Camel (camel-1) thread #0 - 
> GooglePubsubConsumer[camel-test-topic-sub]] ERROR 
> o.a.c.c.g.p.GooglePubsubConsumer - Failure getting messages from PubSub
> com.google.api.gax.rpc.DeadlineExceededException: 
> io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 
> 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, 
> remote_addr=pubsub.googleapis.com/74.125.193.95:443]
>       at 
> com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51)
>       at 
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
>       at 
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
>       at 
> com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
>       at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
>       at 
> com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1041)
>       at 
> com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
>       at 
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1215)
>       at 
> com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
>       at 
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
>       at 
> io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
>       at 
> io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
>       at 
> io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
>       at 
> io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
>       at 
> io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
>       at 
> io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
>       at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
>       at 
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
>       at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>       at 
> io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>       Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous 
> task failed
>               at 
> com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
>               at 
> com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
>               at 
> org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.synchronousPull(GooglePubsubConsumer.java:152)
>               at 
> org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.run(GooglePubsubConsumer.java:113)
>               at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>               ... 3 common frames omitted
> Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline 
> exceeded after 23.965861200s. [buffered_nanos=7490862100, 
> buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443]
>       at io.grpc.Status.asRuntimeException(Status.java:533)
>       ... 17 common frames omitted
> {code}
> I think the component should catch and continue pulling for all the 500 
> series error codes listed on this page: 
> https://cloud.google.com/pubsub/docs/reference/error-codes



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to