[
https://issues.apache.org/jira/browse/BEAM-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16844004#comment-16844004
]
Nicolas Delsaux edited comment on BEAM-6516 at 5/20/19 2:24 PM:
----------------------------------------------------------------
If I understand correctly, when i create a RabbitMqIO.Read object, the start
method is invoked. This start method creates a channel object, which is a
communication channel between the dataflow executor and my RabbitMQ messaging
system.
When the RabbitMQIO.Read.advance() method is invoked, we wait for the next
available message on RabbitMQ and the message delivery tag (well, if one
message was read) is added to Beam checkpointMark.
When the checkpointMark is finalized, all the devlivery tag are acknowledged,
which mark the messages as consumed. This is, i guess, the normal process (at
least, what i describe is compatible with both source code *and* stack trace
exposed here).
The problem we have here is the last part of exception
{{ Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60,
method-id=80)}}
{{ com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)}}
{{ com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)}}
{{
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)}}
{{ com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)}}
{{ com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)}}
{{ com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)}}
{{ com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)}}
{{ java.lang.Thread.run(Thread.java:745)}}
Which is the reale xception cause.
According to RabbitMQ mailing-list (see this message, typically,
[http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-td21310.html)|http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-td21310.html]
{quote}Apart from the unlikely scenario of the client application randomly
inventing delivery tags and submitting acks, the probable causes are:
- ack'ing the same message twice
- ack'ing on a channel other than the one the messages was received on (*)
- ack'ing a message when it was consumed in no-ack mode
- all the above but using basic.reject, basic.nack instead of basic.ack,
or a combination of the three (e.g. rejecting a message and then ack'ing it)
(*) this is a common occurrence in applications with naive error
recovery logic that re-establishes connections&channels transparently
behind the scenes.
{quote}
I do not have here sufficient knowledge of Apache Beam threading model, but I
suspect there is something weird (and linked to multithread) in
RabbitMQCheckpointMark#finalizeCheckpoint()
was (Author: riduidel):
If I understand correctly, when i create a RabbitMqIO.Read object, the start
method is invoked. This start method creates a channel object, which is a
communication channel between the dataflow executor and my RabbitMQ messaging
system.
When the RabbitMQIO.Read.advance() method is invoked, we wait for the next
available message on RabbitMQ and the message delivery tag (well, if one
message was read) is added to Beam checkpointMark.
When the checkpointMark is finalized, all the devlivery tag are acknowledged,
which mark the messages as consumed. This is, i guess, the normal process (at
least, what i describe is compatible with both source code *and* stack trace
exposed here).
The problem we have here is the last part of exception
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol
method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED -
unknown delivery tag 1, class-id=60, method-id=80)
com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
java.lang.Thread.run(Thread.java:745)
Which is the reale xception cause.
According to RabbitMQ mailing-list (see this message, typically,
[http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-td21310.html)|http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-td21310.html]
{quote}Apart from the unlikely scenario of the client application randomly
inventing delivery tags and submitting acks, the probable causes are:
- ack'ing the same message twice
- ack'ing on a channel other than the one the messages was received on (*)
- ack'ing a message when it was consumed in no-ack mode
- all the above but using basic.reject, basic.nack instead of basic.ack,
or a combination of the three (e.g. rejecting a message and then ack'ing it)
(*) this is a common occurrence in applications with naive error
recovery logic that re-establishes connections&channels transparently
behind the scenes.
{quote}
I do not have here sufficient knowledge of Apache Beam threading model, but I
suspect there is something weird (and linked to multithread) in
RabbitMQCheckpointMark#finalizeCheckpoint()
> Failed to advance source: org.apache.beam.sdk.io.rabbitmq.RabbitMqIO
> --------------------------------------------------------------------
>
> Key: BEAM-6516
> URL: https://issues.apache.org/jira/browse/BEAM-6516
> Project: Beam
> Issue Type: Bug
> Components: io-java-rabbitmq
> Reporter: Edin
> Priority: Major
> Attachments: Screenshot 2019-01-26 at 20.11.41.png
>
>
> I'm using the RabbitMqIO connector to get messages from RabbitMQ to BigQuery
> in my current pipeline. When I submit my pipeline to Dataflow, the messages
> are propagated correctly to BigQuery, however I'm getting the following log
> error entry in Dataflow which is repeating itself:
> {code:java}
> java.io.IOException: Failed to advance source:
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQSource@1cdd077d
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:806)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: com.rabbitmq.client.ShutdownSignalException:
> channel error; protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60,
> method-id=80)
>
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:474)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60,
> method-id=80)
> com.rabbitmq.client.QueueingConsumer.handle(QueueingConsumer.java:206)
>
> com.rabbitmq.client.QueueingConsumer.nextDelivery(QueueingConsumer.java:237)
>
> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$UnboundedRabbitMqReader.advance(RabbitMqIO.java:451)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:801)
>
> org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:776)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60,
> method-id=80)
> com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:515)
> com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
>
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
> com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
>
> com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:676)
>
> com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
>
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)
> java.lang.Thread.run(Thread.java:745)
> {code}
> It seems to me the issue is ACKing messages:
> {code:java}
> reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1,
> class-id=60, method-id=80
> {code}
> I also see a lot of open channels (~90) in the RabbitMQ management interface
> (see attached screenshot).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)