tabish121 commented on code in PR #1543:
URL: https://github.com/apache/activemq/pull/1543#discussion_r2912301851
##########
activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java:
##########
@@ -713,6 +714,144 @@ public Message receiveNoWait() throws JMSException {
return createActiveMQMessage(md);
}
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type. This call blocks
+ * indefinitely until a message is produced or until this message consumer
+ * is closed.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @return the body of the next message, or null if this message consumer
+ * is concurrently closed
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBody(Class<T> c) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+
+ sendPullCommand(0);
+ MessageDispatch md = dequeue(-1);
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type, blocking up to the
+ * specified timeout. A {@code timeout} of zero never expires and the call
+ * blocks indefinitely.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @param timeout the timeout value (in milliseconds), a timeout of zero
+ * never expires
+ * @return the body of the next message, or null if the timeout expires or
+ * this message consumer is concurrently closed
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ if (timeout == 0) {
+ return this.receiveBody(c);
+ }
+
+ sendPullCommand(timeout);
+ while (timeout > 0) {
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1);
+ } else {
+ md = dequeue(timeout);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+ return null;
+ }
+
+ /**
+ * Receives the next message produced for this message consumer and returns
+ * its body as an object of the specified type if one is immediately
+ * available.
+ * <p>
+ * If the message is not of a type for which the body can be assigned to
+ * the specified type, a {@code MessageFormatException} is thrown and the
+ * message is not acknowledged. It may be delivered again when a subsequent
+ * {@code receive} or {@code receiveBody} call is made.
+ *
+ * @param c the type to which the body of the next message should be
+ * assigned
+ * @return the body of the next message, or null if one is not immediately
+ * available
+ * @throws MessageFormatException if the message body cannot be assigned to
+ * the specified type
+ * @throws JMSException if the JMS provider fails to receive the next
+ * message due to some internal error
+ */
+ public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+ sendPullCommand(-1);
+
+ MessageDispatch md;
+ if (info.getPrefetchSize() == 0) {
+ md = dequeue(-1);
+ } else {
+ md = dequeue(0);
+ }
+
+ if (md == null) {
+ return null;
+ }
+
+ return doReceiveBody(md, c);
+ }
+
+ /**
+ * Checks that the message body can be assigned to the requested type,
+ * acknowledges the message, and returns its body. If the body cannot be
+ * assigned, the message is re-enqueued without acknowledgement so that it
+ * remains available for a subsequent {@code receive} or
+ * {@code receiveBody} call.
+ */
+ private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws
JMSException {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ if (!message.isBodyAssignableTo(c)) {
Review Comment:
I think I've pointed this out already a couple times but this still violates
the specification. Please refer to section 8.6 on pages 84 and 85 of the
[Jakarta
Messaging](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1.pdf)
3.1 specification for how the client should handle this in terms of the ACK
mode of the consumer.
The basic concept is as follows:
1. If the client acknowledgement mode is AUTO or DUPS_OK then you are meant
to put the message back on the prefetch queue and allow the caller to read it
again as they have no recourse to recover those messages once you throw here.
2. If in CLIENT acknowledgement mode the message **should not** immediately
go back to the prefetch buffer as the application can either call
`session.recover()` to redeliver all unacknowledged messages or can read the
next message and call `message.acknowledge()` to acknowledge that message and
all previous messages.
3. If in TRANSACTED mode the message **should not** go back into the
prefetch buffer because the application can either call `session.rollback()` to
recover messages that failed or cal simply continue consuming messages and call
`session.commit()` to retire this and any other failed or consumed messages.
Testing here should account for these cases and also ensure that in CLIENT
and TRANSACTED modes that the consumer credit window is expanded when message
fail here to avoid a stuck consumer receive due to the current credit window
having been exhausted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact