Repository: nifi Updated Branches: refs/heads/0.x 9997a6a3c -> 92d648ab4
Added mqtt Fixed attribute name in PutMQTT Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dfd86ef1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dfd86ef1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dfd86ef1 Branch: refs/heads/0.x Commit: dfd86ef1f7ea2e225eace1bf55a33488c9392cf0 Parents: 9997a6a Author: [email protected] <[email protected]> Authored: Tue Apr 26 11:35:54 2016 -0400 Committer: Oleg Zhurakousky <[email protected]> Committed: Wed May 25 09:11:51 2016 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 10 + .../nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml | 35 ++++ .../nifi-mqtt-processors/pom.xml | 86 +++++++++ .../apache/nifi/processors/mqtt/GetMQTT.java | 191 ++++++++++++++++++ .../nifi/processors/mqtt/MQTTQueueMessage.java | 29 +++ .../apache/nifi/processors/mqtt/PutMQTT.java | 193 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../nifi/processors/mqtt/TestGetMQTT.java | 41 ++++ .../nifi/processors/mqtt/TestPutMQTT.java | 41 ++++ nifi-nar-bundles/nifi-mqtt-bundle/pom.xml | 37 ++++ nifi-nar-bundles/pom.xml | 5 +- pom.xml | 6 + 12 files changed, 689 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index a95220d..485a2b7 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -322,6 +322,16 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-site-to-site-reporting-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-reporting-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml new file mode 100644 index 0000000..158e80f --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml @@ -0,0 +1,35 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-bundle</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-mqtt-nar</artifactId> + <packaging>nar</packaging> + <description>NiFi NAR for interacting with MQTT brokers</description> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-processors</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml new file mode 100644 index 0000000..9f4e956 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -0,0 +1,86 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-bundle</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-mqtt-processors</artifactId> + <packaging>jar</packaging> + <repositories> + <repository> + <id>Eclipse Paho Repo</id> + <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> + </repository> + </repositories> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-io</artifactId> + <version>1.3.2</version> + </dependency> + <dependency> + <groupId>org.bytedeco</groupId> + <artifactId>javacv</artifactId> + <version>1.1</version> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>1.0.2</version> + </dependency> + <dependency> + <groupId>javax.websocket</groupId> + <artifactId>javax.websocket-api</artifactId> + <version>1.1</version> + </dependency> + <dependency> + <groupId>org.glassfish.tyrus.bundles</groupId> + <artifactId>tyrus-standalone-client-jdk</artifactId> + <version>1.12</version> + </dependency> + <dependency> + <groupId>org.glassfish.tyrus</groupId> + <artifactId>tyrus-container-grizzly-client</artifactId> + <version>1.12</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java new file mode 100644 index 0000000..1391317 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.io.OutputStream; +import java.io.IOException; + +@Tags({"GetMQTT"}) +@CapabilityDescription("Gets messages from an MQTT broker") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute="", description="")}) +@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"), + @WritesAttribute(attribute="topic", description="MQTT topic on which message was received")}) +public class GetMQTT extends AbstractProcessor implements MqttCallback { + + String topic; + String broker; + String clientID; + double lastTime; + boolean firstTime = true; + + MemoryPersistence persistence = new MemoryPersistence(); + MqttClient mqttClient; + + LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new LinkedBlockingQueue<>(); + + public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor + .Builder().name("Broker address") + .description("MQTT broker address (e.g. tcp://localhost:1883)") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new PropertyDescriptor + .Builder().name("MQTT topic") + .description("MQTT topic to subscribe to") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor + .Builder().name("MQTT client ID") + .description("MQTT client ID to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship RELATIONSHIP_MQTTMESSAGE = new Relationship.Builder() + .name("MQTTMessage") + .description("MQTT message output") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + @Override + public void connectionLost(Throwable t) { + getLogger().info("Connection to " + broker + " lost"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload())); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(PROPERTY_BROKER_ADDRESS); + descriptors.add(PROPERTY_MQTT_TOPIC); + descriptors.add(PROPERTY_MQTT_CLIENTID); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(RELATIONSHIP_MQTTMESSAGE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); + topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue(); + clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); + mqttClient = new MqttClient(broker, clientID, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + mqttClient.setCallback(this); + connOpts.setCleanSession(true); + getLogger().info("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + mqttClient.subscribe(topic, 0); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch(MqttException me) { + + } + getLogger().error("Disconnected"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final List messageList = new LinkedList(); + + mqttQueue.drainTo(messageList); + if (messageList.isEmpty()) + return; + + Iterator iterator = messageList.iterator(); + while (iterator.hasNext()) { + FlowFile messageFlowfile = session.create(); + final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next(); + + messageFlowfile = session.putAttribute(messageFlowfile, "broker", broker); + messageFlowfile = session.putAttribute(messageFlowfile, "topic", topic); + messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { + + @Override + public void process(final OutputStream out) throws IOException { + out.write(m.message); + } + }); + session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE); + session.commit(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java new file mode 100644 index 0000000..0874b10 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +public class MQTTQueueMessage +{ + public String topic; + public byte[] message; + + public MQTTQueueMessage(String topic, byte[] message) { + this.topic = topic; + this.message = message; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java new file mode 100644 index 0000000..29aeb10 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.commons.io.IOUtils; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.io.InputStream; +import java.io.IOException; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"PutMQTT"}) +@CapabilityDescription("Publishes message to an MQTT topic") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to publish message to")}) +@WritesAttributes({@WritesAttribute(attribute="", description="")}) +public class PutMQTT extends AbstractProcessor implements MqttCallback { + + String broker; + String clientID; + + MemoryPersistence persistence = new MemoryPersistence(); + MqttClient mqttClient; + + public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor + .Builder().name("Broker address") + .description("MQTT broker address (e.g. tcp://localhost:1883)") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor + .Builder().name("MQTT client ID") + .description("MQTT client ID to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + @Override + public void connectionLost(Throwable t) { + getLogger().info("Connection to " + broker + " lost"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(PROPERTY_BROKER_ADDRESS); + descriptors.add(PROPERTY_MQTT_CLIENTID); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); + clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); + mqttClient = new MqttClient(broker, clientID, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + mqttClient.setCallback(this); + connOpts.setCleanSession(true); + getLogger().info("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch(MqttException me) { + + } + getLogger().error("Disconnected"); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final AtomicReference<String> message = new AtomicReference<>(); + + FlowFile flowfile = session.get(); + message.set(""); + + // get the MQTT topic + + String topic = flowfile.getAttribute("topic"); + + if (topic == null) { + getLogger().error("No topic attribute on flowfile"); + session.remove(flowfile); + return; + } + + // do the read + + session.read(flowfile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + try{ + message.set(IOUtils.toString(in)); + }catch(Exception e){ + getLogger().error("Failed to read flowfile " + e.getMessage()); + } + } + }); + try { + session.remove(flowfile); + } catch (Exception e) { + getLogger().error("Failed to remove flowfile " + e.getMessage()); + return; + } + + String output = message.get(); + + if ((output == null) || output.isEmpty()) { + return; + } + + try { + mqttClient.publish(topic, output.getBytes(), 0, false); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..b5a30e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.mqtt.GetMQTT +org.apache.nifi.processors.mqtt.PutMQTT http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java new file mode 100644 index 0000000..cd22735 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.GetMQTT; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + + +public class TestGetMQTT { + + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(GetMQTT.class); + } + + @Test + public void testProcessor() { + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java new file mode 100644 index 0000000..64a0548 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.PutMQTT; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + + +public class TestPutMQTT { + + private TestRunner testRunner; + + @Before + public void init() { + testRunner = TestRunners.newTestRunner(PutMQTT.class); + } + + @Test + public void testProcessor() { + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml new file mode 100644 index 0000000..456fae3 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml @@ -0,0 +1,37 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-mqtt-bundle</artifactId> + <packaging>pom</packaging> + <modules> + <module>nifi-mqtt-processors</module> + <module>nifi-mqtt-nar</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-processors</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 1982d73..7f3ca2c 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -61,7 +61,10 @@ <module>nifi-spring-bundle</module> <module>nifi-hive-bundle</module> <module>nifi-site-to-site-reporting-bundle</module> - </modules> + <module>nifi-site-to-site-reporting-bundle</module> + <module>nifi-mqtt-bundle</module> + </modules> + <dependencyManagement> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 998bcb5..e24ccf6 100644 --- a/pom.xml +++ b/pom.xml @@ -1061,6 +1061,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mqtt-nar</artifactId> + <version>1.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <version>0.7.0-SNAPSHOT</version> <type>nar</type>
