Borgstrom created AMQ-7211:
------------------------------
Summary: Pending ACKs errantly being cleared on STOMP transaction
COMMIT/ABORT
Key: AMQ-7211
URL: https://issues.apache.org/jira/browse/AMQ-7211
Project: ActiveMQ
Issue Type: Bug
Components: stomp
Affects Versions: 5.15.6
Reporter: Borgstrom
Given a queue with many messages in it, and a client that is creating
transactions for each message it is processing will lead to the following error:
org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received
for message-id [null]\n\tat
org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:476)\n\tat
org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)\n\tat
org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)\n\tat
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)\n\tat
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)\n\tat
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)\n\tat
java.lang.Thread.run(Thread.java:748)\n
The client has a prefetch value of 1.
Here is the debug logs from my client showing frames sent & received. We end
up processing a number of messages, but eventually this happens:
{noformat}
DEBU[0119] Read frame body="[123 34 101 110
118 101 108 111 112 101 34 58 123 34 101 120 99 104 97 110 103 101 73 100 34 58
34 89 65 75 95 72 69 65 82 84 66 69 65 84 34 44 34 117 110 116 114 117 115 116
101 100 95 115 111 117 114 99 101 34 58 102 97 108 115 101 44 34 105 100 34 58
34 56 50 49 53 52 100 98 55 45 102 102 53 99 45 52 98 57 49 45 98 52 98 52 45
101 57 49 52 54 54 99 52 100 50 54 102 34 44 34 109 101 115 115 97 103 101 95
116 105 109 101 115 116 97 109 112 34 58 49 53 53 56 49 50 52 57 57 54 57 49 51
125 44 34 112 97 121 108 111 97 100 34 58 123 34 120 34 58 51 55 44 34 98 121
116 101 115 34 58 34 120 108 52 61 34 125 44 34 97 99 116 105 111 110 34 58 34
119 111 114 107 34 125]" command=MESSAGE
header="map[ack:ID\\c6e56db7da800-41539-1558117094401-130\\c12
content-type:application/json destination:/queue/YAK_HEARTBEAT-YAK_HEARTBEAT
expires:0
message-id:ID\\c6e56db7da800-41539-1558117094401-5\\c68\\c-1\\c1\\c538
persistent:true priority:4 subscription:1E81E7A4F3D6C40314373C0F20B9F1AF
timestamp:1558124997013]"
DEBU[0119] Writing frame body="[]"
command=BEGIN header="map[transaction:55CE4B41F0CF0C86C0D95EA9C7182A26]"
DEBU[0119] Writing frame body="[]" command=ACK
header="map[id:ID\\c6e56db7da800-41539-1558117094401-130\\c12
transaction:55CE4B41F0CF0C86C0D95EA9C7182A26]"
DEBU[0119] Writing frame body="[]"
command=COMMIT header="map[transaction:55CE4B41F0CF0C86C0D95EA9C7182A26]"
DEBU[0119] Read frame body="[123 34 101 110
118 101 108 111 112 101 34 58 123 34 101 120 99 104 97 110 103 101 73 100 34 58
34 89 65 75 95 72 69 65 82 84 66 69 65 84 34 44 34 117 110 116 114 117 115 116
101 100 95 115 111 117 114 99 101 34 58 102 97 108 115 101 44 34 105 100 34 58
34 101 100 49 51 56 51 53 101 45 56 98 50 98 45 52 98 100 101 45 97 52 99 50 45
102 51 50 49 98 49 54 99 50 51 57 57 34 44 34 109 101 115 115 97 103 101 95 116
105 109 101 115 116 97 109 112 34 58 49 53 53 56 49 50 52 57 57 54 57 49 51 125
44 34 112 97 121 108 111 97 100 34 58 123 34 120 34 58 51 56 44 34 98 121 116
101 115 34 58 34 52 103 61 61 34 125 44 34 97 99 116 105 111 110 34 58 34 119
111 114 107 34 125]" command=MESSAGE
header="map[ack:ID\\c6e56db7da800-41539-1558117094401-130\\c13
content-type:application/json destination:/queue/YAK_HEARTBEAT-YAK_HEARTBEAT
expires:0
message-id:ID\\c6e56db7da800-41539-1558117094401-5\\c68\\c-1\\c1\\c539
persistent:true priority:4 subscription:1E81E7A4F3D6C40314373C0F20B9F1AF
timestamp:1558124997017]"
DEBU[0119] Writing frame body="[]"
command=BEGIN header="map[transaction:0082C668C3243EDBA05FE48D25728979]"
DEBU[0119] Writing frame body="[]" command=ACK
header="map[id:ID\\c6e56db7da800-41539-1558117094401-130\\c13
transaction:0082C668C3243EDBA05FE48D25728979]"
DEBU[0119] Writing frame body="[]"
command=COMMIT header="map[transaction:0082C668C3243EDBA05FE48D25728979]"
DEBU[0119] Read frame body="[123 34 101 110
118 101 108 111 112 101 34 58 123 34 101 120 99 104 97 110 103 101 73 100 34 58
34 89 65 75 95 72 69 65 82 84 66 69 65 84 34 44 34 117 110 116 114 117 115 116
101 100 95 115 111 117 114 99 101 34 58 102 97 108 115 101 44 34 105 100 34 58
34 49 54 102 98 99 50 97 101 45 53 52 99 54 45 52 53 49 101 45 97 98 97 50 45
50 49 98 55 56 56 99 49 50 48 57 53 34 44 34 109 101 115 115 97 103 101 95 116
105 109 101 115 116 97 109 112 34 58 49 53 53 56 49 50 52 57 57 54 57 50 52 125
44 34 112 97 121 108 111 97 100 34 58 123 34 120 34 58 52 48 44 34 98 121 116
101 115 34 58 34 79 89 81 61 34 125 44 34 97 99 116 105 111 110 34 58 34 119
111 114 107 34 125]" command=MESSAGE
header="map[ack:ID\\c6e56db7da800-41539-1558117094401-130\\c14
content-type:application/json destination:/queue/YAK_HEARTBEAT-YAK_HEARTBEAT
expires:0
message-id:ID\\c6e56db7da800-41539-1558117094401-5\\c68\\c-1\\c1\\c540
persistent:true priority:4 subscription:1E81E7A4F3D6C40314373C0F20B9F1AF
timestamp:1558124997020]"
DEBU[0119] Writing frame body="[]"
command=BEGIN header="map[transaction:9F8D662467C2343B3E1958F5FD78D7C4]"
DEBU[0119] Writing frame body="[]" command=ACK
header="map[id:ID\\c6e56db7da800-41539-1558117094401-130\\c14
transaction:9F8D662467C2343B3E1958F5FD78D7C4]"
DEBU[0119] Writing frame body="[]"
command=COMMIT header="map[transaction:9F8D662467C2343B3E1958F5FD78D7C4]"
DEBU[0119] Read frame body="[123 34 101 110
118 101 108 111 112 101 34 58 123 34 101 120 99 104 97 110 103 101 73 100 34 58
34 89 65 75 95 72 69 65 82 84 66 69 65 84 34 44 34 117 110 116 114 117 115 116
101 100 95 115 111 117 114 99 101 34 58 102 97 108 115 101 44 34 105 100 34 58
34 56 53 98 53 57 98 56 54 45 56 55 97 49 45 52 99 97 100 45 98 101 101 99 45
57 100 101 98 99 101 50 54 54 98 101 55 34 44 34 109 101 115 115 97 103 101 95
116 105 109 101 115 116 97 109 112 34 58 49 53 53 56 49 50 52 57 57 54 57 51 53
125 44 34 112 97 121 108 111 97 100 34 58 123 34 120 34 58 51 57 44 34 98 121
116 101 115 34 58 34 106 113 73 61 34 125 44 34 97 99 116 105 111 110 34 58 34
119 111 114 107 34 125]" command=MESSAGE
header="map[ack:ID\\c6e56db7da800-41539-1558117094401-130\\c15
content-type:application/json destination:/queue/YAK_HEARTBEAT-YAK_HEARTBEAT
expires:0
message-id:ID\\c6e56db7da800-41539-1558117094401-5\\c68\\c-1\\c1\\c541
persistent:true priority:4 subscription:1E81E7A4F3D6C40314373C0F20B9F1AF
timestamp:1558124997023]"
DEBU[0119] Writing frame body="[]"
command=BEGIN header="map[transaction:513A5D087CF9ADDF9B0225A35EBD3015]"
DEBU[0119] Writing frame body="[]" command=ACK
header="map[id:ID\\c6e56db7da800-41539-1558117094401-130\\c15
transaction:513A5D087CF9ADDF9B0225A35EBD3015]"
DEBU[0119] Writing frame body="[]"
command=COMMIT header="map[transaction:513A5D087CF9ADDF9B0225A35EBD3015]"
DEBU[0119] Read frame body="[111 114 103 46
97 112 97 99 104 101 46 97 99 116 105 118 101 109 113 46 116 114 97 110 115 112
111 114 116 46 115 116 111 109 112 46 80 114 111 116 111 99 111 108 69 120 99
101 112 116 105 111 110 58 32 85 110 101 120 112 101 99 116 101 100 32 65 67 75
32 114 101 99 101 105 118 101 100 32 102 111 114 32 109 101 115 115 97 103 101
45 105 100 32 91 110 117 108 108 93 10 9 97 116 32 111 114 103 46 97 112 97 99
104 101 46 97 99 116 105 118 101 109 113 46 116 114 97 110 115 112 111 114 116
46 115 116 111 109 112 46 80 114 111 116 111 99 111 108 67 111 110 118 101 114
116 101 114 46 111 110 83 116 111 109 112 65 99 107 40 80 114 111 116 111 99
111 108 67 111 110 118 101 114 116 101 114 46 106 97 118 97 58 52 55 54 41 10 9
97 116 32 111 114 103 46 97 112 97 99 104 101 46 97 99 116 105 118 101 109 113
46 116 114 97 110 115 112 111 114 116 46 115 116 111 109 112 46 80 114 111 116
111 99 111 108 67 111 110 118 101 114 116 101 114 46 111 110 83 116 111 109 112
67 111 109 109 97 110 100 40 80 114 111 116 111 99 111 108 67 111 110 118 101
114 116 101 114 46 106 97 118 97 58 50 53 49 41 10 9 97 116 32 111 114 103 46
97 112 97 99 104 101 46 97 99 116 105 118 101 109 113 46 116 114 97 110 115 112
111 114 116 46 115 116 111 109 112 46 83 116 111 109 112 84 114 97 110 115 112
111 114 116 70 105 108 116 101 114 46 111 110 67 111 109 109 97 110 100 40 83
116 111 109 112 84 114 97 110 115 112 111 114 116 70 105 108 116 101 114 46 106
97 118 97 58 56 53 41 10 9 97 116 32 111 114 103 46 97 112 97 99 104 101 46 97
99 116 105 118 101 109 113 46 116 114 97 110 115 112 111 114 116 46 84 114 97
110 115 112 111 114 116 83 117 112 112 111 114 116 46 100 111 67 111 110 115
117 109 101 40 84 114 97 110 115 112 111 114 116 83 117 112 112 111 114 116 46
106 97 118 97 58 56 51 41 10 9 97 116 32 111 114 103 46 97 112 97 99 104 101 46
97 99 116 105 118 101 109 113 46 116 114 97 110 115 112 111 114 116 46 116 99
112 46 84 99 112 84 114 97 110 115 112 111 114 116 46 100 111 82 117 110 40 84
99 112 84 114 97 110 115 112 111 114 116 46 106 97 118 97 58 50 51 51 41 10 9
97 116 32 111 114 103 46 97 112 97 99 104 101 46 97 99 116 105 118 101 109 113
46 116 114 97 110 115 112 111 114 116 46 116 99 112 46 84 99 112 84 114 97 110
115 112 111 114 116 46 114 117 110 40 84 99 112 84 114 97 110 115 112 111 114
116 46 106 97 118 97 58 50 49 53 41 10 9 97 116 32 106 97 118 97 46 108 97 110
103 46 84 104 114 101 97 100 46 114 117 110 40 84 104 114 101 97 100 46 106 97
118 97 58 55 52 56 41 10]" command=ERROR header="map[content-type:text/plain
message:Unexpected ACK received for message-id [null]]"
ERRO[0119] Error while processing frames to/from broker, marking as failed
broker=BLACK error="recevied ERROR from server:
org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received
for message-id [null]\n\tat
org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:476)\n\tat
org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)\n\tat
org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)\n\tat
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)\n\tat
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)\n\tat
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)\n\tat
java.lang.Thread.run(Thread.java:748)\n"
name=YAK_HEARTBEAT-YAK_HEARTBEAT{noformat}
I believe this happens because of a race, where under some conditions ActiveMQ
ends up errantly clearing out "c15" when from the pending ACKs when the COMMIT
for the ACK of "c14" is processed:
[https://github.com/apache/activemq/blob/activemq-5.15.6/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java#L525]
The same thing could happen for ABORT too:
[https://github.com/apache/activemq/blob/activemq-5.15.6/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java#L556]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)