[
https://issues.apache.org/jira/browse/QPID-5912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Keith Wall updated QPID-5912:
-----------------------------
Description:
If a message takes the straight through delivery path through the Broker (that
is, where the IO thread that accepts the message from the publisher is the same
thread that delivers the message to the consumer), and the send to the consumer
fails with exception (for instance, a sender timeout exception), the connection
to the consumer is rightfully closed. However, when the consumer reconnects,
the Broker sends the message to the consumer successfully, but will suffer a
metadata not found exception when the consumers tries to or ack the
message/commit.
The issue is that mishandling of the sendertimeout exception has meant the the
Broker has failed to perform post delivery tasks so the message is never commit
to disk.
Reproduction
# Prepare a JMS publisher application that sends one message, make message
sufficiently large to overwhelm the TCP buffers (I use 100MB).
# Prepare a JMS consumer application to receive the message.
# Start the JMS consumer application, once it has begun to await the message
(MessageConsumer.receive()) either stop the process or put a breakpoint in
IoReceiver#run line 160)
# Start the JMS publisher, it will send the message to the Broker
# Broker will choose the straight through path
# Broker's send to consumer will timeout (stack 1 below)
# JMS consumer will be disconnected
# Restart the JMS consumer
# Expected behave message delivered to consumer successfully. Actual
behaviour, Broker reports exception when client tries to ack the message (stack
2 below)
stack 1 - Broker reports timeout on send to consumer
{noformat}
2014-07-22 11:32:56,296 ERROR [IoReceiver - /127.0.0.1:57756] (io.IoSender) -
write timed out for socket /127.0.0.1:57748: head -2146434251, tail -2146958539
2014-07-22 11:32:56,297 INFO [IoReceiver - /127.0.0.1:57748]
(subscription.close) - [Broker] [sub:7(vh(/default)/qu(myqueue)] SUB-1002 :
Close
2014-07-22 11:32:56,297 ERROR [IoReceiver - /127.0.0.1:57756]
(v0_10.ServerSessionDelegate) - Exception processing command
org.apache.qpid.transport.SenderException: write timed out for socket
/127.0.0.1:57748: head -2146434251, tail -2146958539
at
org.apache.qpid.transport.SenderException.rethrow(SenderException.java:49)
at org.apache.qpid.transport.Session.invoke(Session.java:784)
at
org.apache.qpid.server.protocol.v0_10.ServerSession.sendMessage(ServerSession.java:263)
at
org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:304)
at
org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:476)
at
org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1117)
at
org.apache.qpid.server.queue.AbstractQueue.deliverToConsumer(AbstractQueue.java:1041)
at
org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:91)
at
org.apache.qpid.server.queue.AbstractQueue$4.run(AbstractQueue.java:985)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:356)
at
org.apache.qpid.server.queue.AbstractQueue.doEnqueue(AbstractQueue.java:941)
at
org.apache.qpid.server.queue.AbstractQueue.enqueue(AbstractQueue.java:888)
at
org.apache.qpid.server.exchange.AbstractExchange$2.postCommit(AbstractExchange.java:535)
at
org.apache.qpid.server.protocol.v0_10.ServerSession$AsyncCommand.complete(ServerSession.java:971)
at
org.apache.qpid.server.protocol.v0_10.ServerSession.completeAsyncCommands(ServerSession.java:916)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:103)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
at org.apache.qpid.transport.Method.delegate(Method.java:159)
at org.apache.qpid.transport.Session.received(Session.java:596)
at org.apache.qpid.transport.Connection.dispatch(Connection.java:439)
at
org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64)
at
org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40)
at
org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113)
{noformat}
stack 2 - broker fail to ack message after client reconnects
{noformat}
2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806]
(util.ServerScopedRuntimeException) - Unable to find message with id 3 on queue
myqueue with id 63798825-4070-47df-8187-b58328a3d942
2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806]
(v0_10.ServerSessionDelegate) - Exception processing command
org.apache.qpid.server.store.StoreException: Unable to find message with id 3
on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942
at
org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:649)
at
org.apache.qpid.server.store.AbstractJDBCMessageStore.access$300(AbstractJDBCMessageStore.java:49)
at
org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1166)
at
org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:104)
at
org.apache.qpid.server.protocol.v0_10.ServerSession.acknowledge(ServerSession.java:457)
at
org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener.onAccept(ExplicitAcceptDispositionChangeListener.java:46)
at
org.apache.qpid.server.protocol.v0_10.ServerSession$3.performAction(ServerSession.java:283)
at
org.apache.qpid.server.protocol.v0_10.ServerSession.dispositionChange(ServerSession.java:372)
at
org.apache.qpid.server.protocol.v0_10.ServerSession.accept(ServerSession.java:279)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:128)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:77)
at
org.apache.qpid.transport.MessageAccept.dispatch(MessageAccept.java:87)
at
org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:55)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:94)
at
org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
{noformat}
was:
If a message takes the straight through delivery path through the Broker (that
is, where the IO thread that accepts the message from the publisher is the same
thread that delivers the message to the consumer), and the send to the consumer
fails with exception (for instance, a sender timeout exception), the connection
to the consumer is rightfully closed. However, when the consumer reconnects,
the Broker sends the message to the consumer successfully, but will suffer a
metadata not found exception when the consumers tries to or ack the
message/commit.
The issue is that mishandling of the sendertimeout exception has meant the the
Broker has failed to perform post delivery tasks so the message is never commit
to disk.
Reproduction
# Prepare a JMS publisher application that sends one message, make message
sufficiently large to overwhelm the TCP buffers (I use 100MB).
# Prepare a JMS consumer application to receive the message.
# Start the JMS consumer application, once it has begun to await the message
(MessageConsumer.receive()) either stop the process or put a breakpoint in
IoReceiver#run line 160)
# Start the JMS publisher, it will send the message to the Broker
# Broker will choose the straight through path
# Broker's send to consumer will timeout (stack 1 below)
# JMS consumer will be disconnected
# Restart the JMS consumer
# Expected behave message delivered to consumer successfully. Actual
behaviour, Broker reports exception when client tries to ack the message (stack
2 below)
> Metadata not found exception will result if message is delivered straight
> through and transport results during send to consumer
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: QPID-5912
> URL: https://issues.apache.org/jira/browse/QPID-5912
> Project: Qpid
> Issue Type: Bug
> Components: Java Broker
> Reporter: Keith Wall
> Priority: Critical
> Fix For: 0.29
>
>
> If a message takes the straight through delivery path through the Broker
> (that is, where the IO thread that accepts the message from the publisher is
> the same thread that delivers the message to the consumer), and the send to
> the consumer fails with exception (for instance, a sender timeout exception),
> the connection to the consumer is rightfully closed. However, when the
> consumer reconnects, the Broker sends the message to the consumer
> successfully, but will suffer a metadata not found exception when the
> consumers tries to or ack the message/commit.
> The issue is that mishandling of the sendertimeout exception has meant the
> the Broker has failed to perform post delivery tasks so the message is never
> commit to disk.
> Reproduction
> # Prepare a JMS publisher application that sends one message, make message
> sufficiently large to overwhelm the TCP buffers (I use 100MB).
> # Prepare a JMS consumer application to receive the message.
> # Start the JMS consumer application, once it has begun to await the message
> (MessageConsumer.receive()) either stop the process or put a breakpoint in
> IoReceiver#run line 160)
> # Start the JMS publisher, it will send the message to the Broker
> # Broker will choose the straight through path
> # Broker's send to consumer will timeout (stack 1 below)
> # JMS consumer will be disconnected
> # Restart the JMS consumer
> # Expected behave message delivered to consumer successfully. Actual
> behaviour, Broker reports exception when client tries to ack the message
> (stack 2 below)
> stack 1 - Broker reports timeout on send to consumer
> {noformat}
> 2014-07-22 11:32:56,296 ERROR [IoReceiver - /127.0.0.1:57756] (io.IoSender) -
> write timed out for socket /127.0.0.1:57748: head -2146434251, tail
> -2146958539
> 2014-07-22 11:32:56,297 INFO [IoReceiver - /127.0.0.1:57748]
> (subscription.close) - [Broker] [sub:7(vh(/default)/qu(myqueue)] SUB-1002 :
> Close
> 2014-07-22 11:32:56,297 ERROR [IoReceiver - /127.0.0.1:57756]
> (v0_10.ServerSessionDelegate) - Exception processing command
> org.apache.qpid.transport.SenderException: write timed out for socket
> /127.0.0.1:57748: head -2146434251, tail -2146958539
> at
> org.apache.qpid.transport.SenderException.rethrow(SenderException.java:49)
> at org.apache.qpid.transport.Session.invoke(Session.java:784)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession.sendMessage(ServerSession.java:263)
> at
> org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:304)
> at
> org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:476)
> at
> org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1117)
> at
> org.apache.qpid.server.queue.AbstractQueue.deliverToConsumer(AbstractQueue.java:1041)
> at
> org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:91)
> at
> org.apache.qpid.server.queue.AbstractQueue$4.run(AbstractQueue.java:985)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:356)
> at
> org.apache.qpid.server.queue.AbstractQueue.doEnqueue(AbstractQueue.java:941)
> at
> org.apache.qpid.server.queue.AbstractQueue.enqueue(AbstractQueue.java:888)
> at
> org.apache.qpid.server.exchange.AbstractExchange$2.postCommit(AbstractExchange.java:535)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession$AsyncCommand.complete(ServerSession.java:971)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession.completeAsyncCommands(ServerSession.java:916)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:103)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
> at org.apache.qpid.transport.Method.delegate(Method.java:159)
> at org.apache.qpid.transport.Session.received(Session.java:596)
> at org.apache.qpid.transport.Connection.dispatch(Connection.java:439)
> at
> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64)
> at
> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40)
> at
> org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113)
> {noformat}
> stack 2 - broker fail to ack message after client reconnects
> {noformat}
> 2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806]
> (util.ServerScopedRuntimeException) - Unable to find message with id 3 on
> queue myqueue with id 63798825-4070-47df-8187-b58328a3d942
> 2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806]
> (v0_10.ServerSessionDelegate) - Exception processing command
> org.apache.qpid.server.store.StoreException: Unable to find message with id 3
> on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942
> at
> org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:649)
> at
> org.apache.qpid.server.store.AbstractJDBCMessageStore.access$300(AbstractJDBCMessageStore.java:49)
> at
> org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1166)
> at
> org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:104)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession.acknowledge(ServerSession.java:457)
> at
> org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener.onAccept(ExplicitAcceptDispositionChangeListener.java:46)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession$3.performAction(ServerSession.java:283)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession.dispositionChange(ServerSession.java:372)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSession.accept(ServerSession.java:279)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:128)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:77)
> at
> org.apache.qpid.transport.MessageAccept.dispatch(MessageAccept.java:87)
> at
> org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:55)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:94)
> at
> org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
> {noformat}
>
--
This message was sent by Atlassian JIRA
(v6.2#6252)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]