jbonofre commented on code in PR #1543:
URL: https://github.com/apache/activemq/pull/1543#discussion_r2914233503
##########
activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java:
##########
@@ -713,6 +714,144 @@ public Message receiveNoWait() throws JMSException {
return createActiveMQMessage(md);
}
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type. This call blocks
+ * indefinitely until a message is produced or until this message consumer
+ * is closed.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @return the body of the next message, or null if this message consumer
+ * is concurrently closed
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBody(Class<T> c) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+
+ sendPullCommand(0);
+ MessageDispatch md = dequeue(-1);
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type, blocking up to the
+ * specified timeout. A {@code timeout} of zero never expires and the call
+ * blocks indefinitely.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @param timeout the timeout value (in milliseconds), a timeout of zero
+ * never expires
+ * @return the body of the next message, or null if the timeout expires or
+ * this message consumer is concurrently closed
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ if (timeout == 0) {
+ return this.receiveBody(c);
+ }
+
+ sendPullCommand(timeout);
+ while (timeout > 0) {
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1);
+ } else {
+ md = dequeue(timeout);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+ return null;
+ }
+
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type if one is immediately
+ * available.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @return the body of the next message, or null if one is not immediately
+ * available
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(-1);
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1);
+ } else {
+ md = dequeue(0);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+
+ /**
+ * Checks that the message body can be assigned to the requested type,
+ * acknowledges the message, and returns its body. If the body cannot be
+ * assigned, the message is re-enqueued without acknowledgement so that it
+ * remains available for a subsequent {@code receive} or
+ * {@code receiveBody} call.
+ */
+ private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws
JMSException {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ if (!message.isBodyAssignableTo(c)) {
Review Comment:
@tabish121 @cshannon So, I did a new read on the spec.
Here's my take/understanding.
1. If the ack mode is AUTO or DUPS_OK, the spec says "behave as if the
unsuccessful call has not occurred. The message will be delivered again before
any subsequent messages. This is not considered to be redelivery and does not
cause JMSRedelivered to be set or JMSXDeliveryCount to be incremented." My
understanding here is that we should use `unconsumedMessages.enqueueFirst(md)`,
meaning that we put back to front of local prefetch, no broker interaction, no
header changes.
2. If the ack mode is CLIENT, the spec says "behave as if the call has been
successful and will not deliver the message again. The message will not be
acknowledged until acknowledge is called. If an application wishes redelivery,
it must call recover." My understanding here is that we should use
`beforeMessageIsConsumed(md)` and `afterMessageIsConsumed(md, false)`, meaning
tracked as delivered with credit window expansion.
3. if Transacted, the spec says "behave as if the call had been successful
and will not deliver the message again. The transaction will remain
uncommitted. If an application wishes redelivery, it mull roll back the
transaction." Here, my understand is that we should use the same approach as
CLIENT ack mode (delivered within transaction).
Also, I see in the spec that the message type has an "impact".
If the message doesn't have body, we throw the exception.
The spec says "may be used to receive any type of message except for
StreamMessage", meaning that `ActiveMQStreamMessage.isBodyAssignableTo()`
always return `false`.
The spec says "may be used to receive any type of message except for...
Message", meaning we need an explicit check for `ACTIVEMQ_MESSAGE` data
structure type.
Am I right ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact