nandorsoma commented on code in PR #6987:
URL: https://github.com/apache/nifi/pull/6987#discussion_r1150579598
##########
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java:
##########
@@ -82,79 +87,62 @@ private MessageConsumer createMessageConsumer(final Session
session, final Strin
}
}
-
/**
* Receives a message from the broker. It is the consumerCallback's
responsibility to acknowledge the received message.
*/
- public void consume(final String destinationName, String errorQueueName,
final boolean durable, final boolean shared, final String subscriptionName,
final String messageSelector,
- final String charset, final ConsumerCallback
consumerCallback) {
- this.jmsTemplate.execute(new SessionCallback<Void>() {
- @Override
- public Void doInJms(final Session session) throws JMSException {
-
- final MessageConsumer msgConsumer =
createMessageConsumer(session, destinationName, durable, shared,
subscriptionName, messageSelector);
- try {
- final Message message =
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-
- // If there is no message, there's nothing for us to do.
We can simply close the consumer and return.
- if (message == null) {
- JmsUtils.closeMessageConsumer(msgConsumer);
- return null;
- }
-
- String messageType;
- byte[] messageBody;
-
- try {
- if (message instanceof TextMessage) {
- messageType = TextMessage.class.getSimpleName();
- messageBody =
MessageBodyToBytesConverter.toBytes((TextMessage) message,
Charset.forName(charset));
- } else if (message instanceof BytesMessage) {
- messageType = BytesMessage.class.getSimpleName();
- messageBody =
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
- } else if (message instanceof ObjectMessage) {
- messageType = ObjectMessage.class.getSimpleName();
- messageBody =
MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
- } else if (message instanceof StreamMessage) {
- messageType = StreamMessage.class.getSimpleName();
- messageBody =
MessageBodyToBytesConverter.toBytes((StreamMessage) message);
- } else if (message instanceof MapMessage) {
- messageType = MapMessage.class.getSimpleName();
- messageBody =
MessageBodyToBytesConverter.toBytes((MapMessage) message);
- } else {
- acknowledge(message, session);
-
- if (errorQueueName != null) {
- processLog.error("Received unsupported JMS
Message type [{}]; rerouting message to error queue [{}].", new Object[]
{message, errorQueueName});
- jmsTemplate.send(errorQueueName, __ ->
message);
- } else {
- processLog.error("Received unsupported JMS
Message type [{}]; will skip this message.", new Object[] {message});
- }
-
- return null;
- }
- } catch (final MessageConversionException mce) {
- processLog.error("Received a JMS Message [{}] but
failed to obtain the content of the message; will acknowledge this message
without creating a FlowFile for it.",
- new Object[] {message}, mce);
- acknowledge(message, session);
-
- if (errorQueueName != null) {
- jmsTemplate.send(errorQueueName, __ -> message);
- }
-
- return null;
- }
+ public void consumeSingleMessage(final String destinationName, String
errorQueueName, final boolean durable, final boolean shared, final String
subscriptionName, final String messageSelector,
+ final String charset, final
Consumer<JMSResponse> singleMessageConsumer) {
+ doWithJmsTemplate(destinationName, durable, shared, subscriptionName,
messageSelector, (session, messageConsumer) -> {
+ final JMSResponse response = receiveMessage(session,
messageConsumer, charset, errorQueueName);
+ if (response != null) {
+ // Provide the JMSResponse to the processor to handle. It is
the responsibility of the
+ // processor to handle acknowledgment of the message (if
Client Acknowledge), and it is
+ // the responsibility of the processor to handle closing the
Message Consumer.
+ // Both of these actions can be handled by calling the
acknowledge() or reject() methods of
+ // the JMSResponse.
+ singleMessageConsumer.accept(response);
+ }
+ });
+ }
- final Map<String, String> messageHeaders =
extractMessageHeaders(message);
- final Map<String, String> messageProperties =
extractMessageProperties(message);
- final JMSResponse response = new JMSResponse(message,
session.getAcknowledgeMode(), messageType, messageBody, messageHeaders,
messageProperties, msgConsumer);
+ /**
+ * Receives a list of messages from the broker. It is the
consumerCallback's responsibility to acknowledge the received message.
+ */
+ public void consumeMessageSet(final String destinationName, String
errorQueueName, final boolean durable, final boolean shared, final String
subscriptionName, final String messageSelector,
+ final String charset, final
Consumer<List<JMSResponse>> messageSetConsumer) {
+ doWithJmsTemplate(destinationName, durable, shared, subscriptionName,
messageSelector, new MessageReceiver() {
+ @Override
+ public void consume(Session session, MessageConsumer
messageConsumer) throws JMSException {
+ final List<JMSResponse> jmsResponses = new ArrayList<>();
+ int batchCounter = 0;
+
+ JMSResponse response;
+ while ((response = receiveMessage(session, messageConsumer,
charset, errorQueueName)) != null && batchCounter < MAX_MESSAGES_PER_FLOW_FILE)
{
+ response.setBatchOrder(batchCounter);
+ jmsResponses.add(response);
+ batchCounter++;
+ }
+ if (!jmsResponses.isEmpty()) {
// Provide the JMSResponse to the processor to handle. It
is the responsibility of the
// processor to handle acknowledgment of the message (if
Client Acknowledge), and it is
// the responsibility of the processor to handle closing
the Message Consumer.
// Both of these actions can be handled by calling the
acknowledge() or reject() methods of
// the JMSResponse.
- consumerCallback.accept(response);
+ messageSetConsumer.accept(jmsResponses);
+ }
+ }
+ });
+ }
+
+ private void doWithJmsTemplate(String destinationName, boolean durable,
boolean shared, String subscriptionName, String messageSelector,
MessageReceiver messageReceiver) {
Review Comment:
I'm not too fond of the naming either, but I can't come up with a better
one. Do you have an idea?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]