Repository: incubator-nifi Updated Branches: refs/heads/develop ba7855646 -> 960560723
NIFI-606 Add a NiFi Storm Spout Fixing JavaDoc to pass check-styles Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/96056072 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/96056072 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/96056072 Branch: refs/heads/develop Commit: 960560723dc4ed8968c1a4d072a5b4902c371298 Parents: ba78556 Author: bbende <[email protected]> Authored: Sat May 30 10:00:54 2015 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri Jun 5 13:39:36 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-external/nifi-storm-spout/pom.xml | 38 +++ .../org/apache/nifi/storm/NiFiDataPacket.java | 39 ++++ .../java/org/apache/nifi/storm/NiFiSpout.java | 232 +++++++++++++++++++ nifi/nifi-external/pom.xml | 1 + 4 files changed, 310 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/96056072/nifi/nifi-external/nifi-storm-spout/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-storm-spout/pom.xml b/nifi/nifi-external/nifi-storm-spout/pom.xml new file mode 100644 index 0000000..c55c698 --- /dev/null +++ b/nifi/nifi-external/nifi-storm-spout/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-external</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + </parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-storm-spout</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>0.9.4</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/96056072/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacket.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacket.java new file mode 100644 index 0000000..4c399e0 --- /dev/null +++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacket.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import java.util.Map; + +/** + * <p> + * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both + * a FlowFile's content and its attributes so that they can be processed by + * Storm + * </p> + */ +public interface NiFiDataPacket { + + /** + * @return the contents of a NiFi FlowFile + */ + byte[] getContent(); + + /** + * @return a Map of attributes that are associated with the NiFi FlowFile + */ + Map<String, String> getAttributes(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/96056072/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java new file mode 100644 index 0000000..6fb9754 --- /dev/null +++ b/nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.StreamUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * <p> + * The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so + * that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi + * instance provided in the config and requests data from the OutputPort that + * is named. In NiFi, when an OutputPort is added to the root process group, + * it acts as a queue of data for remote clients. This spout is then able to + * pull that data from NiFi reliably. + * </p> + * + * <p> + * It is important to note that if pulling data from a NiFi cluster, the URL + * that should be used is that of the NiFi Cluster Manager. The Receiver will + * automatically handle determining the nodes in that cluster and pull from + * those nodes as appropriate. + * </p> + * + * <p> + * In order to use the NiFiSpout, you will need to first build a + * {@link SiteToSiteClientConfig} to provide to the constructor. This can be + * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example + * snippet of driver code to pull data from NiFi that is running on + * localhost:8080. This example assumes that NiFi exposes and OutputPort on the + * root group named "Data For Storm". Additionally, it assumes that the data + * that it will receive from this OutputPort is text data, as it will map the + * byte array received from NiFi to a UTF-8 Encoded string. + * </p> + * + * <code> + * <pre> + * {@code + * + * // Build a Site-To-Site client config + * SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + * .url("http://localhost:8080/nifi") + * .portName("Data for Storm") + * .buildConfig(); + * + * // Build a topology starting with a NiFiSpout + * TopologyBuilder builder = new TopologyBuilder(); + * builder.setSpout("nifi", new NiFiSpout(clientConfig)); + * + * // Add a bolt that prints the attributes and content + * builder.setBolt("print", new BaseBasicBolt() { + * @Override + * public void execute(Tuple tuple, BasicOutputCollector collector) { + * NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket"); + * System.out.println("Attributes: " + dp.getAttributes()); + * System.out.println("Content: " + new String(dp.getContent())); + * } + * + * @Override + * public void declareOutputFields(OutputFieldsDeclarer declarer) {} + * + * }).shuffleGrouping("nifi"); + * + * // Submit the topology running in local mode + * Config conf = new Config(); + * LocalCluster cluster = new LocalCluster(); + * cluster.submitTopology("test", conf, builder.createTopology()); + * + * Utils.sleep(90000); + * cluster.shutdown(); + * } + * </pre> + * </code> + */ +public class NiFiSpout extends BaseRichSpout { + + private static final long serialVersionUID = 3067274587595578836L; + + public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class); + + private NiFiSpoutReceiver spoutReceiver; + private LinkedBlockingQueue<NiFiDataPacket> queue; + private SpoutOutputCollector spoutOutputCollector; + + private final SiteToSiteClientConfig clientConfig; + + public NiFiSpout(SiteToSiteClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { + this.spoutOutputCollector = spoutOutputCollector; + this.queue = new LinkedBlockingQueue<>(1000); + + this.spoutReceiver = new NiFiSpoutReceiver(); + this.spoutReceiver.setDaemon(true); + this.spoutReceiver.setName("NiFi Spout Receiver"); + this.spoutReceiver.start(); + } + + @Override + public void nextTuple() { + NiFiDataPacket data = queue.poll(); + if (data == null) { + Utils.sleep(50); + } else { + spoutOutputCollector.emit(new Values(data)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("nifiDataPacket")); + } + + @Override + public void close() { + super.close(); + spoutReceiver.shutdown(); + } + + class NiFiSpoutReceiver extends Thread { + + private boolean shutdown = false; + + public synchronized void shutdown() { + this.shutdown = true; + } + + @Override + public void run() { + try { + final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + try { + while (!shutdown) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); + + // no data available. Wait a bit and try again + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + + continue; + } + + final List<NiFiDataPacket> dataPackets = new ArrayList<>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map<String, String> attributes = dataPacket.getAttributes(); + final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() { + @Override + public byte[] getContent() { + return data; + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + }; + + dataPackets.add(niFiDataPacket); + dataPacket = transaction.receive(); + } while (dataPacket != null); + + // Confirm transaction to verify the data + transaction.confirm(); + + for (NiFiDataPacket dp : dataPackets) { + queue.offer(dp); + } + + transaction.complete(); + } + } finally { + try { + client.close(); + } catch (final IOException ioe) { + LOGGER.error("Failed to close client", ioe); + } + } + } catch (final IOException ioe) { + LOGGER.error("Failed to receive data from NiFi", ioe); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/96056072/nifi/nifi-external/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/pom.xml b/nifi/nifi-external/pom.xml index 9397176..0c70c4a 100644 --- a/nifi/nifi-external/pom.xml +++ b/nifi/nifi-external/pom.xml @@ -25,5 +25,6 @@ <packaging>pom</packaging> <modules> <module>nifi-spark-receiver</module> + <module>nifi-storm-spout</module> </modules> </project>
