nandorsoma commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957817027
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java:
##########
@@ -649,38 +625,44 @@ private void closeWriter(final RecordSetWriter writer) {
}
private String getTransitUri(String... appends) {
- StringBuilder stringBuilder = new StringBuilder(brokerUri);
- for(String append : appends) {
+ String broker = clientProperties.getBrokerUri().toString();
+ StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ?
broker : broker + "/");
+ for (String append : appends) {
stringBuilder.append(append);
}
return stringBuilder.toString();
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker,
cause.getMessage()}, cause);
+ logger.error("Connection to {} lost due to: {}", new
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
}
@Override
- public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ public void messageArrived(ReceivedMqttMessage message) {
if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload();
- String text = new String(payload, "UTF-8");
+ String text = new String(payload, StandardCharsets.UTF_8);
if (StringUtils.isAsciiPrintable(text)) {
- logger.debug("Message arrived from topic {}. Payload: {}", new
Object[] {topic, text});
+ logger.debug("Message arrived from topic {}. Payload: {}",
message.getTopic(), text);
} else {
- logger.debug("Message arrived from topic {}. Binary value of
size {}", new Object[] {topic, payload.length});
+ logger.debug("Message arrived from topic {}. Binary value of
size {}", message.getTopic(), payload.length);
}
}
- if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1,
TimeUnit.SECONDS)) {
- throw new IllegalStateException("The subscriber queue is full,
cannot receive another message until the processor is scheduled to run.");
+ try {
+ if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("The subscriber queue is full,
cannot receive another message until the processor is scheduled to run.");
+ }
+ } catch (InterruptedException e) {
+ throw new MqttException("Failed to process message arrived from
topic " + message.getTopic());
}
}
@Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- logger.warn("Received MQTT 'delivery complete' message to subscriber:
" + token);
+ public void deliveryComplete(String token) {
+ // Unlikely situation. Api uses the same callback for publisher and
consumer as well.
+ // That's why we have this log message here to indicate something
really messy thing happened.
+ logger.error("Received MQTT 'delivery complete' message to subscriber:
" + token);
Review Comment:
Changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]