Repository: nifi
Updated Branches:
  refs/heads/0.x 9997a6a3c -> 92d648ab4


Added mqtt

Fixed attribute name in PutMQTT


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dfd86ef1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dfd86ef1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dfd86ef1

Branch: refs/heads/0.x
Commit: dfd86ef1f7ea2e225eace1bf55a33488c9392cf0
Parents: 9997a6a
Author: [email protected] <[email protected]>
Authored: Tue Apr 26 11:35:54 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Wed May 25 09:11:51 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  10 +
 .../nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml      |  35 ++++
 .../nifi-mqtt-processors/pom.xml                |  86 +++++++++
 .../apache/nifi/processors/mqtt/GetMQTT.java    | 191 ++++++++++++++++++
 .../nifi/processors/mqtt/MQTTQueueMessage.java  |  29 +++
 .../apache/nifi/processors/mqtt/PutMQTT.java    | 193 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 ++
 .../nifi/processors/mqtt/TestGetMQTT.java       |  41 ++++
 .../nifi/processors/mqtt/TestPutMQTT.java       |  41 ++++
 nifi-nar-bundles/nifi-mqtt-bundle/pom.xml       |  37 ++++
 nifi-nar-bundles/pom.xml                        |   5 +-
 pom.xml                                         |   6 +
 12 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index a95220d..485a2b7 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -322,6 +322,16 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-site-to-site-reporting-nar</artifactId>
             <type>nar</type>
         </dependency>
+           <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-reporting-nar</artifactId>
+            <type>nar</type>
+       </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mqtt-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
new file mode 100644
index 0000000..158e80f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-mqtt-bundle</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-mqtt-nar</artifactId>
+    <packaging>nar</packaging>
+    <description>NiFi NAR for interacting with MQTT brokers</description>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mqtt-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
new file mode 100644
index 0000000..9f4e956
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -0,0 +1,86 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-mqtt-bundle</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>nifi-mqtt-processors</artifactId>
+    <packaging>jar</packaging>
+    <repositories>
+        <repository>
+            <id>Eclipse Paho Repo</id>
+            
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+        </repository>
+    </repositories>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.bytedeco</groupId>
+            <artifactId>javacv</artifactId>
+            <version>1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.websocket</groupId>
+            <artifactId>javax.websocket-api</artifactId>
+            <version>1.1</version>
+        </dependency>       
+        <dependency>
+            <groupId>org.glassfish.tyrus.bundles</groupId>
+            <artifactId>tyrus-standalone-client-jdk</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.tyrus</groupId>
+            <artifactId>tyrus-container-grizzly-client</artifactId>
+            <version>1.12</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
new file mode 100644
index 0000000..1391317
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+
+@Tags({"GetMQTT"})
+@CapabilityDescription("Gets messages from an MQTT broker")
+@SeeAlso({})
+@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
+@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT 
broker that was the message source"),
+    @WritesAttribute(attribute="topic", description="MQTT topic on which 
message was received")})
+public class GetMQTT extends AbstractProcessor implements MqttCallback {
+
+    String topic;
+    String broker;
+    String clientID;
+    double lastTime;
+    boolean firstTime = true;
+    
+    MemoryPersistence persistence = new MemoryPersistence();
+    MqttClient mqttClient;
+    
+    LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new 
LinkedBlockingQueue<>();
+
+    public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new 
PropertyDescriptor
+            .Builder().name("Broker address")
+            .description("MQTT broker address (e.g. tcp://localhost:1883)")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new 
PropertyDescriptor
+            .Builder().name("MQTT topic")
+            .description("MQTT topic to subscribe to")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new 
PropertyDescriptor
+            .Builder().name("MQTT client ID")
+            .description("MQTT client ID to use")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship RELATIONSHIP_MQTTMESSAGE = new 
Relationship.Builder()
+            .name("MQTTMessage")
+            .description("MQTT message output")
+            .build();
+
+     private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+    
+    @Override
+    public void connectionLost(Throwable t) {
+       getLogger().info("Connection to " + broker + " lost");
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+         mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload()));
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(PROPERTY_BROKER_ADDRESS);
+        descriptors.add(PROPERTY_MQTT_TOPIC);
+        descriptors.add(PROPERTY_MQTT_CLIENTID);
+      
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        relationships.add(RELATIONSHIP_MQTTMESSAGE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        try {
+            broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
+            topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue();
+            clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
+            mqttClient = new MqttClient(broker, clientID, persistence);
+            MqttConnectOptions connOpts = new MqttConnectOptions();
+            mqttClient.setCallback(this);
+            connOpts.setCleanSession(true);
+            getLogger().info("Connecting to broker: " + broker);
+            mqttClient.connect(connOpts);
+            mqttClient.subscribe(topic, 0);
+        } catch(MqttException me) {
+            getLogger().error("msg "+me.getMessage());
+        }
+    }
+ 
+    @OnUnscheduled
+    public void onUnscheduled(final ProcessContext context) {
+        try {
+            mqttClient.disconnect();
+        } catch(MqttException me) {
+            
+        }
+        getLogger().error("Disconnected");
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final List messageList = new LinkedList();
+        
+        mqttQueue.drainTo(messageList);
+        if (messageList.isEmpty())
+            return;
+        
+        Iterator iterator = messageList.iterator();
+        while (iterator.hasNext()) {
+            FlowFile messageFlowfile = session.create();
+            final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next();
+    
+            messageFlowfile = session.putAttribute(messageFlowfile, "broker", 
broker);
+            messageFlowfile = session.putAttribute(messageFlowfile, "topic", 
topic);
+            messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                     out.write(m.message);
+                }
+            });
+            session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE);
+            session.commit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
new file mode 100644
index 0000000..0874b10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+public class MQTTQueueMessage
+{
+    public String topic;
+    public byte[] message;
+    
+    public MQTTQueueMessage(String topic, byte[] message) {
+        this.topic = topic;
+        this.message = message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
new file mode 100644
index 0000000..29aeb10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.*;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.commons.io.IOUtils;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.io.InputStream;
+import java.io.IOException;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PutMQTT"})
+@CapabilityDescription("Publishes message to an MQTT topic")
+@SeeAlso({})
+@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to 
publish message to")})
+@WritesAttributes({@WritesAttribute(attribute="", description="")})
+public class PutMQTT extends AbstractProcessor implements MqttCallback {
+
+    String broker;
+    String clientID;
+    
+    MemoryPersistence persistence = new MemoryPersistence();
+    MqttClient mqttClient;
+    
+    public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new 
PropertyDescriptor
+            .Builder().name("Broker address")
+            .description("MQTT broker address (e.g. tcp://localhost:1883)")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    
+    public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new 
PropertyDescriptor
+            .Builder().name("MQTT client ID")
+            .description("MQTT client ID to use")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+    
+    @Override
+    public void connectionLost(Throwable t) {
+       getLogger().info("Connection to " + broker + " lost");
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(PROPERTY_BROKER_ADDRESS);
+        descriptors.add(PROPERTY_MQTT_CLIENTID);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        try {
+            broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
+            clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
+            mqttClient = new MqttClient(broker, clientID, persistence);
+            MqttConnectOptions connOpts = new MqttConnectOptions();
+            mqttClient.setCallback(this);
+            connOpts.setCleanSession(true);
+            getLogger().info("Connecting to broker: " + broker);
+            mqttClient.connect(connOpts);
+        } catch(MqttException me) {
+            getLogger().error("msg "+me.getMessage());
+        }
+    }
+ 
+    @OnUnscheduled
+    public void onUnscheduled(final ProcessContext context) {
+        try {
+            mqttClient.disconnect();
+        } catch(MqttException me) {
+            
+        }
+        getLogger().error("Disconnected");
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final AtomicReference<String> message = new AtomicReference<>();
+
+        FlowFile flowfile = session.get();
+        message.set("");
+
+        // get the MQTT topic
+        
+        String topic = flowfile.getAttribute("topic");
+
+        if (topic == null) {
+            getLogger().error("No topic attribute on flowfile");
+            session.remove(flowfile);
+            return;
+        }
+        
+        // do the read
+        
+        session.read(flowfile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                try{
+                    message.set(IOUtils.toString(in));
+                }catch(Exception e){
+                    getLogger().error("Failed to read flowfile " + 
e.getMessage());
+                }
+            }
+        });
+        try {
+            session.remove(flowfile);
+        } catch (Exception e) {
+             getLogger().error("Failed to remove flowfile " + e.getMessage());
+             return;
+        }       
+       
+        String output = message.get();
+        
+        if ((output == null) || output.isEmpty()) {
+            return;
+        }
+        
+        try {
+            mqttClient.publish(topic, output.getBytes(), 0, false);
+        } catch(MqttException me) {
+            getLogger().error("msg "+me.getMessage());          
+        }       
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..b5a30e9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.mqtt.GetMQTT
+org.apache.nifi.processors.mqtt.PutMQTT

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
new file mode 100644
index 0000000..cd22735
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.processors.mqtt.GetMQTT;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestGetMQTT {
+
+    private TestRunner testRunner;
+
+    @Before
+    public void init() {
+        testRunner = TestRunners.newTestRunner(GetMQTT.class);
+    }
+
+    @Test
+    public void testProcessor() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
new file mode 100644
index 0000000..64a0548
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.processors.mqtt.PutMQTT;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestPutMQTT {
+
+    private TestRunner testRunner;
+
+    @Before
+    public void init() {
+        testRunner = TestRunners.newTestRunner(PutMQTT.class);
+    }
+
+    @Test
+    public void testProcessor() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
new file mode 100644
index 0000000..456fae3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
@@ -0,0 +1,37 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-mqtt-bundle</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-mqtt-processors</module>
+        <module>nifi-mqtt-nar</module>
+    </modules>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-mqtt-processors</artifactId>
+                <version>1.0.0-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement> 
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 1982d73..7f3ca2c 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -61,7 +61,10 @@
         <module>nifi-spring-bundle</module>
         <module>nifi-hive-bundle</module>
         <module>nifi-site-to-site-reporting-bundle</module>
-  </modules>
+           <module>nifi-site-to-site-reporting-bundle</module>
+        <module>nifi-mqtt-bundle</module>
+    </modules>
+
     <dependencyManagement>
         <dependencies>
             <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dfd86ef1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 998bcb5..e24ccf6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1061,6 +1061,12 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-mqtt-nar</artifactId>
+                <version>1.0.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
                 <version>0.7.0-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to