[
https://issues.apache.org/jira/browse/DISPATCH-825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165118#comment-16165118
]
ASF GitHub Bot commented on DISPATCH-825:
-----------------------------------------
Github user ChugR commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/194#discussion_r138710888
--- Diff: src/message.c ---
@@ -1109,95 +1146,97 @@ qd_message_t *qd_message_receive(pn_delivery_t
*delivery)
}
//
- // The discard flag indicates if we should continue receiving the
message.
- // This is pertinent in the case of large messages. When large
messages are being received, we try to send out part of the
- // message that has been received so far. If we not able to send it
anywhere, there is no need to keep creating buffers
- //
- bool discard = qd_message_is_discard((qd_message_t*)msg);
-
- //
- // Get a reference to the tail buffer on the message. This is the
buffer into which
- // we will store incoming message data. If there is no buffer in the
message, this is the
- // first time we are here and we need to allocate an empty one and add
it to the message.
+ // The discard flag indicates we should keep reading the input stream
+ // but not process the message for delivery.
//
- if (!discard) {
- buf = DEQ_TAIL(msg->content->buffers);
- if (!buf) {
- buf = qd_buffer();
- DEQ_INSERT_TAIL(msg->content->buffers, buf);
- }
+ if (qd_message_is_discard((qd_message_t*)msg)) {
+ return discard_receive(delivery, link, (qd_message_t *)msg);
}
+
+ // Loop until msg is complete, error seen, or incoming bytes are
consumed
+ bool recv_error = false;
while (1) {
- if (discard) {
- char dummy[BUFFER_SIZE];
- rc = pn_link_recv(link, dummy, BUFFER_SIZE);
- }
- else {
- //
- // Try to receive enough data to fill the remaining space in
the tail buffer.
- //
+ //
+ // handle EOS and clean up after pn receive errors
+ //
+ bool at_eos = (pn_delivery_partial(delivery) == false) &&
+ (pn_delivery_pending(delivery) == 0);
+
+ if (at_eos || recv_error) {
+ // Message is complete
+ sys_mutex_lock(msg->content->lock);
+ {
+ // Append last buffer if any with data
+ if (msg->content->pending) {
+ if (qd_buffer_size(msg->content->pending) > 0) {
+ // pending buffer has bytes that are port of
message
+ DEQ_INSERT_TAIL(msg->content->buffers,
+ msg->content->pending);
+ } else {
+ // pending buffer is empty
+ qd_buffer_free(msg->content->pending);
+ }
+ msg->content->pending = 0;
+ } else {
+ // pending buffer is absent
+ }
+
+ msg->content->receive_complete = true;
- rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf),
qd_buffer_capacity(buf));
+ // unlink message and delivery
+ pn_record_set(record, PN_DELIVERY_CTX, 0);
+ }
+ sys_mutex_unlock(msg->content->lock);
+ return (qd_message_t*) msg;
}
//
- // If we receive PN_EOS, we have come to the end of the message.
+ // Handle a missing or full pending buffer
//
- if (rc == PN_EOS) {
- //
- // We have received the entire message since rc == PN_EOS, set
the receive_complete flag to true
- //
- msg->content->receive_complete = true;
-
- //
- // Clear the value in the record with key PN_DELIVERY_CTX
- //
- pn_record_set(record, PN_DELIVERY_CTX, 0);
-
- //
- // If the last buffer in the list is empty, remove it and free
it. This
- // will only happen if the size of the message content is an
exact multiple
- // of the buffer size.
- //
- if (buf && qd_buffer_size(buf) == 0) {
+ if (!msg->content->pending) {
+ // Pending buffer is absent: get a new one
+ msg->content->pending = qd_buffer();
+ } else {
+ // Pending buffer exists
+ if (qd_buffer_capacity(msg->content->pending) == 0) {
+ // Pending buffer is full
sys_mutex_lock(msg->content->lock);
- DEQ_REMOVE_TAIL(msg->content->buffers);
+ DEQ_INSERT_TAIL(msg->content->buffers,
msg->content->pending);
sys_mutex_unlock(msg->content->lock);
- qd_buffer_free(buf);
+ msg->content->pending = qd_buffer();
+ } else {
+ // Pending buffer still has capacity
}
-
- return (qd_message_t*) msg;
}
- if (rc > 0) {
- if (discard)
- continue;
+ //
+ // Try to fill the remaining space in the pending buffer.
+ //
+ rc = pn_link_recv(link,
+ (char*) qd_buffer_cursor(msg->content->pending),
+ qd_buffer_capacity(msg->content->pending));
+
+ assert (rc != PN_EOS); // Just checked for this moments ago
--- End diff --
This assert is a mistake. Between sensing it a moment ago and now it may
have become true.
> Corrupted data on larger (>100Kb) messages
> ------------------------------------------
>
> Key: DISPATCH-825
> URL: https://issues.apache.org/jira/browse/DISPATCH-825
> Project: Qpid Dispatch
> Issue Type: Bug
> Environment: Fedora 25. Master branch qpid-dispatch and qpid-cpp
> tools: qpid-send, qpid-receive
> Reporter: Chuck Rolke
>
> h1. Setup
> h3. Start a dispatch router with this conf file:
> {noformat}
> # Router to run qpid-interop-test
> router {
> mode: interior
> id: Router.A
> workerThreads: 4
> allowUnsettledMulticast: yes
> }
> listener {
> host: 0.0.0.0
> port: 5672
> authenticatePeer: no
> saslMechanisms: ANONYMOUS
> }
> listener {
> host: localhost
> port: 5672
> authenticatePeer: no
> saslMechanisms: ANONYMOUS
> }
> address {
> prefix: jms.queue.qpid-interop.#
> distribution: balanced
> }
> log {
> module: DEFAULT
> enable: debug+
> }
> ({noformat}
> h3. Start a receiver to receive 1000 messages:
> {noformat}
> qpid-receive -a jms.queue.qpid-interop.test --connection-options
> "{protocol:amqp1.0}" -m 1000 -f --print-content no --print-headers yes
> --ack-frequency 1
> {noformat}
> h3. Start 1000 senders each with a different length message
> {noformat}
> #!/bin/bash
> for i in `seq 100512 101511`;
> do
> qpid-send -a jms.queue.qpid-interop.test --connection-options
> "{protocol:amqp1.0}" -m 1 --content-size $i
> done
> {noformat}
> h1. Result
> Eventually the receive program will exit with an error:
> {noformat}
> qpid-receive: Out of Bounds:
> requested advance of 100552 at 42 but only 100444 available
> (/home/chug/git/qpid-cpp/src/qpid/amqp/Decoder.cpp:307)
> {noformat}
> h2. Observations
> * Putting qd_log statements in the qd_message_send path, one at each
> pn_link_send() invocation, allows the setup to run the 1000 messages
> repeatedly. Probably it would fail eventually but in this condition it is
> harder to debug.
> * I suspect a interlock issue between sending and receiving a single message
> but adding a dozen or so assert has not revealed anything yet.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]