Repository: nifi Updated Branches: refs/heads/master 5561c29ed -> ebaaf5797
NIFI-5721 Fixing connection handling in MQTT processors This closes #3096. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ebaaf579 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ebaaf579 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ebaaf579 Branch: refs/heads/master Commit: ebaaf5797e94d7177633cf4e85f0fbeada0765ba Parents: 5561c29 Author: Bryan Bende <[email protected]> Authored: Thu Oct 18 17:02:42 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Mon Oct 22 15:13:14 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/mqtt/ConsumeMQTT.java | 142 ++++++++----------- .../nifi/processors/mqtt/PublishMQTT.java | 126 +++++++--------- .../mqtt/common/AbstractMQTTProcessor.java | 91 ++++++------ .../nifi/processors/mqtt/TestConsumeMQTT.java | 2 +- .../nifi/processors/mqtt/TestPublishMQTT.java | 2 +- .../mqtt/common/TestConsumeMqttCommon.java | 25 ++-- .../mqtt/integration/TestConsumeMQTT.java | 2 +- .../mqtt/integration/TestConsumeMqttSSL.java | 2 +- .../TestPublishAndSubscribeMqttIntegration.java | 2 +- 9 files changed, 181 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index dbfbcbb..f0cba72 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -20,35 +20,36 @@ package org.apache.nifi.processors.mqtt; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.OutputStreamCallback; - import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,8 +58,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; -import java.io.OutputStream; -import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; @@ -73,7 +72,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL @Tags({"subscribe", "MQTT", "IOT", "consume", "listen"}) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection +@TriggerSerially @CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") @SeeAlso({PublishMQTT.class}) @WritesAttributes({ @@ -83,7 +82,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " + "on the topic.")}) -public class ConsumeMQTT extends AbstractMQTTProcessor { +public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; @@ -119,7 +118,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { .build(); - private static int DISCONNECT_TIMEOUT = 5000; private volatile long maxQueueSize; private volatile int qos; @@ -205,29 +203,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { } @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException { + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); qos = context.getProperty(PROP_QOS).asInteger(); maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong(); topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue(); - - buildClient(context); scheduled.set(true); } @OnUnscheduled public void onUnscheduled(final ProcessContext context) { scheduled.set(false); - - mqttClientConnectLock.writeLock().lock(); - try { - if(isConnected()) { - mqttClient.disconnect(DISCONNECT_TIMEOUT); - logger.info("Disconnected the MQTT client."); - } - } catch(MqttException me) { - logger.error("Failed when disconnecting the MQTT client.", me); - } finally { - mqttClientConnectLock.writeLock().unlock(); + synchronized (this) { + super.onStopped(); } } @@ -249,14 +237,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){ - logger.info("Queue is empty and client is not connected. Attempting to reconnect."); - - try { - reconnect(); - } catch (MqttException e) { - logger.error("Connection to " + broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e); - context.yield(); + final boolean isScheduled = scheduled.get(); + if (!isConnected() && isScheduled){ + synchronized (this) { + if (!isConnected()) { + initializeClient(context); + } } } @@ -267,6 +253,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { transferQueue(session); } + private void initializeClient(ProcessContext context) { + // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is + // non-null but not connected, so we need to handle each case and only create a new client when it is null + try { + if (mqttClient == null) { + logger.debug("Creating client"); + mqttClient = createMqttClient(broker, clientID, persistence); + mqttClient.setCallback(this); + } + + if (!mqttClient.isConnected()) { + logger.debug("Connecting client"); + mqttClient.connect(connOpts); + mqttClient.subscribe(topicFilter, qos); + } + } catch (MqttException e) { + logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); + context.yield(); + } + } + private void transferQueue(ProcessSession session){ while (!mqttQueue.isEmpty()) { FlowFile messageFlowfile = session.create(); @@ -303,56 +310,33 @@ public class ConsumeMQTT extends AbstractMQTTProcessor { } } - private class ConsumeMQTTCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - logger.warn("Connection to " + broker + " lost", cause); - try { - reconnect(); - } catch (MqttException e) { - logger.error("Connection to " + broker + " lost and callback re-connect failed."); - } - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - if (logger.isDebugEnabled()) { - byte[] payload = message.getPayload(); - String text = new String(payload, "UTF-8"); - if (StringUtils.isAsciiPrintable(text)) { - logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text}); - } else { - logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length}); - } - } + @Override + public void connectionLost(Throwable cause) { + logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); + } - if (mqttQueue.size() >= maxQueueSize){ - throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + if (logger.isDebugEnabled()) { + byte[] payload = message.getPayload(); + String text = new String(payload, "UTF-8"); + if (StringUtils.isAsciiPrintable(text)) { + logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text}); } else { - mqttQueue.add(new MQTTQueueMessage(topic, message)); + logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length}); } } - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token); + if (mqttQueue.size() >= maxQueueSize){ + throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); + } else { + mqttQueue.add(new MQTTQueueMessage(topic, message)); } } - private void reconnect() throws MqttException { - mqttClientConnectLock.writeLock().lock(); - try { - if (!mqttClient.isConnected()) { - setAndConnectClient(new ConsumeMQTTCallback()); - mqttClient.subscribe(topicFilter, qos); - } - } finally { - mqttClientConnectLock.writeLock().unlock(); - } + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token); } - private boolean isConnected(){ - return (mqttClient != null && mqttClient.isConnected()); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java index c1f1a68..0db7615 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java @@ -17,43 +17,42 @@ package org.apache.nifi.processors.mqtt; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; - import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.io.InputStream; -import java.io.IOException; import java.util.concurrent.TimeUnit; @SupportsBatching @@ -62,7 +61,7 @@ import java.util.concurrent.TimeUnit; @CapabilityDescription("Publishes a message to an MQTT topic") @SeeAlso({ConsumeMQTT.class}) @SystemResourceConsideration(resource = SystemResource.MEMORY) -public class PublishMQTT extends AbstractMQTTProcessor { +public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder() .name("Topic") @@ -132,21 +131,13 @@ public class PublishMQTT extends AbstractMQTTProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - buildClient(context); + super.onScheduled(context); } @OnStopped - public void onStop(final ProcessContext context) { - mqttClientConnectLock.writeLock().lock(); - try { - if (mqttClient != null && mqttClient.isConnected()) { - mqttClient.disconnect(); - logger.info("Disconnected the MQTT client."); - } - } catch(MqttException me) { - logger.error("Failed when disconnecting the MQTT client.", me); - } finally { - mqttClientConnectLock.writeLock().unlock(); + public void onStopped(final ProcessContext context) { + synchronized (this) { + super.onStopped(); } } @@ -157,15 +148,11 @@ public class PublishMQTT extends AbstractMQTTProcessor { return; } - if(mqttClient == null || !mqttClient.isConnected()){ - logger.info("Was disconnected from client or was never connected, attempting to connect."); - try { - reconnect(); - } catch (MqttException e) { - context.yield(); - session.transfer(flowfile, REL_FAILURE); - logger.error("MQTT client is disconnected and re-connecting failed. Transferring FlowFile to fail and yielding", e); - return; + if (!isConnected()){ + synchronized (this) { + if (!isConnected()) { + initializeClient(context); + } } } @@ -194,17 +181,12 @@ public class PublishMQTT extends AbstractMQTTProcessor { mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean()); try { - mqttClientConnectLock.readLock().lock(); final StopWatch stopWatch = new StopWatch(true); - try { - /* - * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously: - * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait()); - */ - mqttClient.publish(topic, mqttMessage); - } finally { - mqttClientConnectLock.readLock().unlock(); - } + /* + * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously: + * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait()); + */ + mqttClient.publish(topic, mqttMessage); session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowfile, REL_SUCCESS); @@ -214,40 +196,40 @@ public class PublishMQTT extends AbstractMQTTProcessor { } } - private class PublishMQTTCallback implements MqttCallback { - - @Override - public void connectionLost(Throwable cause) { - logger.warn("Connection to " + broker + " lost", cause); - try { - reconnect(); - } catch (MqttException e) { - logger.error("Connection to " + broker + " lost and re-connect failed"); + private void initializeClient(ProcessContext context) { + // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is + // non-null but not connected, so we need to handle each case and only create a new client when it is null + try { + if (mqttClient == null) { + logger.debug("Creating client"); + mqttClient = createMqttClient(broker, clientID, persistence); + mqttClient.setCallback(this); } - } - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}"); + if (!mqttClient.isConnected()) { + logger.debug("Connecting client"); + mqttClient.connect(connOpts); + } + } catch (MqttException e) { + logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); + context.yield(); } + } - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. - logger.trace("Received 'delivery complete' message from broker for:" + token.toString()); - } + @Override + public void connectionLost(Throwable cause) { + logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); } + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}"); + } - private void reconnect() throws MqttException { - mqttClientConnectLock.writeLock().lock(); - try { - if (!mqttClient.isConnected()) { - setAndConnectClient(new PublishMQTTCallback()); - getLogger().info("Connecting to broker: " + broker); - } - } finally { - mqttClientConnectLock.writeLock().unlock(); - } + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. + logger.trace("Received 'delivery complete' message from broker for:" + token.toString()); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java index 733c240..ceb206e 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java @@ -32,7 +32,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -44,8 +43,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE; @@ -58,9 +55,10 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor { + public static int DISCONNECT_TIMEOUT = 5000; + protected ComponentLog logger; protected IMqttClient mqttClient; - protected final ReadWriteLock mqttClientConnectLock = new ReentrantReadWriteLock(true); protected volatile String broker; protected volatile String clientID; protected MqttConnectOptions connOpts; @@ -296,51 +294,56 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces return properties; } - protected void buildClient(ProcessContext context){ - try { - broker = context.getProperty(PROP_BROKER_URI).getValue(); - clientID = context.getProperty(PROP_CLIENTID).getValue(); - - connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); - connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger()); - connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger()); - connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger()); - - PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); - if (sslProp.isSet()) { - Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService()); - connOpts.setSSLProperties(sslProps); - } + protected void onScheduled(final ProcessContext context){ + broker = context.getProperty(PROP_BROKER_URI).getValue(); + clientID = context.getProperty(PROP_CLIENTID).getValue(); - PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC); - if (lastWillTopicProp.isSet()){ - String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue(); - PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN); - Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger(); - connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); - } + connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); + connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger()); + connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger()); + connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger()); + PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); + if (sslProp.isSet()) { + Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService()); + connOpts.setSSLProperties(sslProps); + } - PropertyValue usernameProp = context.getProperty(PROP_USERNAME); - if(usernameProp.isSet()) { - connOpts.setUserName(usernameProp.getValue()); - connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray()); - } + PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC); + if (lastWillTopicProp.isSet()){ + String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue(); + PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN); + Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger(); + connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); + } - mqttClientConnectLock.writeLock().lock(); - try{ - mqttClient = getMqttClient(broker, clientID, persistence); - } finally { - mqttClientConnectLock.writeLock().unlock(); - } + PropertyValue usernameProp = context.getProperty(PROP_USERNAME); + if(usernameProp.isSet()) { + connOpts.setUserName(usernameProp.getValue()); + connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray()); + } + } + + protected void onStopped() { + try { + logger.info("Disconnecting client"); + mqttClient.disconnect(DISCONNECT_TIMEOUT); } catch(MqttException me) { - logger.error("Failed to initialize the connection to the " + me.getMessage()); + logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me); + } + + try { + logger.info("Closing client"); + mqttClient.close(); + mqttClient = null; + } catch (MqttException me) { + logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me); } } - protected IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { return new MqttClient(broker, clientID, persistence); } @@ -363,10 +366,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; - // Caller should obtain the necessary lock - protected void setAndConnectClient(MqttCallback mqttCallback) throws MqttException { - mqttClient = getMqttClient(broker, clientID, persistence); - mqttClient.setCallback(mqttCallback); - mqttClient.connect(connOpts); + protected boolean isConnected(){ + return (mqttClient != null && mqttClient.isConnected()); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java index 144cd63..7d02804 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java @@ -54,7 +54,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon { } @Override - public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber); return mqttTestClient; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java index cdbc67f..9916408 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java @@ -56,7 +56,7 @@ public class TestPublishMQTT extends TestPublishMqttCommon { } @Override - public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher); return mqttTestClient; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java index a9159ad..71b6814 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -20,6 +20,7 @@ package org.apache.nifi.processors.mqtt.common; import io.moquette.proto.messages.AbstractMessage; import io.moquette.proto.messages.PublishMessage; import io.moquette.server.Server; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processors.mqtt.ConsumeMQTT; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -79,7 +80,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -121,7 +122,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -139,7 +140,7 @@ public abstract class TestConsumeMqttCommon { internalPublish(testMessage); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -170,7 +171,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -211,7 +212,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -229,7 +230,7 @@ public abstract class TestConsumeMqttCommon { internalPublish(testMessage); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -260,7 +261,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -308,7 +309,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -354,7 +355,7 @@ public abstract class TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); @@ -396,10 +397,10 @@ public abstract class TestConsumeMqttCommon { } - public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { - Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect"); + public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class); method.setAccessible(true); - method.invoke(processor); + method.invoke(processor, context); } public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException { http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java index 759bf96..d7ed0e0 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java @@ -107,7 +107,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java index ccb0eb7..65319d8 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java @@ -122,7 +122,7 @@ public class TestConsumeMqttSSL extends TestConsumeMqttCommon { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testRunner.getProcessContext()); Thread.sleep(PUBLISH_WAIT_MS); http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java index a97ac98..dc09ce1 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java @@ -128,7 +128,7 @@ public class TestPublishAndSubscribeMqttIntegration { ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor(); consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext()); - reconnect(consumeMQTT); + reconnect(consumeMQTT, testSubscribeRunner.getProcessContext()); } private void subscribeVerify(){
