nandorsoma commented on code in PR #6987:
URL: https://github.com/apache/nifi/pull/6987#discussion_r1150579598


##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session 
session, final Strin
         }
     }
 
-
     /**
      * Receives a message from the broker. It is the consumerCallback's 
responsibility to acknowledge the received message.
      */
-    public void consume(final String destinationName, String errorQueueName, 
final boolean durable, final boolean shared, final String subscriptionName, 
final String messageSelector,
-                        final String charset, final ConsumerCallback 
consumerCallback) {
-        this.jmsTemplate.execute(new SessionCallback<Void>() {
-            @Override
-            public Void doInJms(final Session session) throws JMSException {
-
-                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriptionName, messageSelector);
-                try {
-                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
-                    // If there is no message, there's nothing for us to do. 
We can simply close the consumer and return.
-                    if (message == null) {
-                        JmsUtils.closeMessageConsumer(msgConsumer);
-                        return null;
-                    }
-
-                    String messageType;
-                    byte[] messageBody;
-
-                    try {
-                        if (message instanceof TextMessage) {
-                            messageType = TextMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
-                        } else if (message instanceof BytesMessage) {
-                            messageType = BytesMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                        } else if (message instanceof ObjectMessage) {
-                            messageType = ObjectMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
-                        } else if (message instanceof StreamMessage) {
-                            messageType = StreamMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
-                        } else if (message instanceof MapMessage) {
-                            messageType = MapMessage.class.getSimpleName();
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((MapMessage) message);
-                        } else {
-                            acknowledge(message, session);
-
-                            if (errorQueueName != null) {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; rerouting message to error queue [{}].", new Object[] 
{message, errorQueueName});
-                                jmsTemplate.send(errorQueueName, __ -> 
message);
-                            } else {
-                                processLog.error("Received unsupported JMS 
Message type [{}]; will skip this message.", new Object[] {message});
-                            }
-
-                            return null;
-                        }
-                    } catch (final MessageConversionException mce) {
-                        processLog.error("Received a JMS Message [{}] but 
failed to obtain the content of the message; will acknowledge this message 
without creating a FlowFile for it.",
-                            new Object[] {message}, mce);
-                        acknowledge(message, session);
-
-                        if (errorQueueName != null) {
-                            jmsTemplate.send(errorQueueName, __ -> message);
-                        }
-
-                        return null;
-                    }
+    public void consumeSingleMessage(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                                     final String charset, final 
Consumer<JMSResponse> singleMessageConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, (session, messageConsumer) -> {
+            final JMSResponse response = receiveMessage(session, 
messageConsumer, charset, errorQueueName);
+            if (response != null) {
+                // Provide the JMSResponse to the processor to handle. It is 
the responsibility of the
+                // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
+                // the responsibility of the processor to handle closing the 
Message Consumer.
+                // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
+                // the JMSResponse.
+                singleMessageConsumer.accept(response);
+            }
+        });
+    }
 
-                    final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
-                    final Map<String, String> messageProperties = 
extractMessageProperties(message);
-                    final JMSResponse response = new JMSResponse(message, 
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders, 
messageProperties, msgConsumer);
+    /**
+     * Receives a list of messages from the broker. It is the 
consumerCallback's responsibility to acknowledge the received message.
+     */
+    public void consumeMessageSet(final String destinationName, String 
errorQueueName, final boolean durable, final boolean shared, final String 
subscriptionName, final String messageSelector,
+                        final String charset, final 
Consumer<List<JMSResponse>> messageSetConsumer) {
+        doWithJmsTemplate(destinationName, durable, shared, subscriptionName, 
messageSelector, new MessageReceiver() {
+            @Override
+            public void consume(Session session, MessageConsumer 
messageConsumer) throws JMSException {
+                final List<JMSResponse> jmsResponses = new ArrayList<>();
+                int batchCounter = 0;
+
+                JMSResponse response;
+                while ((response = receiveMessage(session, messageConsumer, 
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE) 
{
+                    response.setBatchOrder(batchCounter);
+                    jmsResponses.add(response);
+                    batchCounter++;
+                }
 
+                if (!jmsResponses.isEmpty()) {
                     // Provide the JMSResponse to the processor to handle. It 
is the responsibility of the
                     // processor to handle acknowledgment of the message (if 
Client Acknowledge), and it is
                     // the responsibility of the processor to handle closing 
the Message Consumer.
                     // Both of these actions can be handled by calling the 
acknowledge() or reject() methods of
                     // the JMSResponse.
-                    consumerCallback.accept(response);
+                    messageSetConsumer.accept(jmsResponses);
+                }
+            }
+        });
+    }
+
+    private void doWithJmsTemplate(String destinationName, boolean durable, 
boolean shared, String subscriptionName, String messageSelector, 
MessageReceiver messageReceiver) {

Review Comment:
   I'm not too fond of the naming either, but I can't come up with a better 
one. Do you have an idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to