[ https://issues.apache.org/jira/browse/BEAM-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anonymous updated BEAM-6516: ---------------------------- Status: Triage Needed (was: Resolved) > Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO > -------------------------------------------------------------------- > > Key: BEAM-6516 > URL: https://issues.apache.org/jira/browse/BEAM-6516 > Project: Beam > Issue Type: Bug > Components: io-java-rabbitmq > Reporter: Edin > Priority: P3 > Fix For: 2.33.0 > > Attachments: Screenshot 2019-01-26 at 20.11.41.png > > Time Spent: 2h > Remaining Estimate: 0h > > I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery > in my current pipeline. When I submit my pipeline to Dataflow, the messages > are propagated correctly to BigQuery, however I'm getting the following log > error entry in Dataflow which is repeating itself: > {code:java} > java.io.IOException: Failed to advance source: > org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806) > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException: > channel error; protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, > method-id=80) > > org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474) > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801) > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, > method-id=80) > com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206) > > com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237) > > org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451) > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801) > > org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, > method-id=80) > com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515) > com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340) > > com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) > com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) > > com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676) > > com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) > > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603) > java.lang.Thread.run(Thread.java:745) > {code} > It seems to me the issue is ACKing messages: > {code:java} > reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, > class-id=60, method-id=80 > {code} > I also see a lot of open channels (~90) in the RabbitMQ management interface > (see attached screenshot). -- This message was sent by Atlassian Jira (v8.20.10#820010)