[
https://issues.apache.org/jira/browse/NIFI-606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14574799#comment-14574799
]
ASF GitHub Bot commented on NIFI-606:
-------------------------------------
Github user apiri commented on a diff in the pull request:
https://github.com/apache/incubator-nifi/pull/61#discussion_r31829018
--- Diff:
nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * <p>
+ * The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi
so
+ * that it can be processed by Apache Storm. The NiFi Spout connects to a
NiFi
+ * instance provided in the config and requests data from the OutputPort
that
+ * is named. In NiFi, when an OutputPort is added to the root process
group,
+ * it acts as a queue of data for remote clients. This spout is then able
to
+ * pull that data from NiFi reliably.
+ * </p>
+ *
+ * <p>
+ * It is important to note that if pulling data from a NiFi cluster, the
URL
+ * that should be used is that of the NiFi Cluster Manager. The Receiver
will
+ * automatically handle determining the nodes in that cluster and pull from
+ * those nodes as appropriate.
+ * </p>
+ *
+ * <p>
+ * In order to use the NiFiSpout, you will need to first build a
+ * {@link SiteToSiteClientConfig} to provide to the constructor. This can
be
+ * achieved by using the {@link SiteToSiteClient.Builder}. Below is an
example
+ * snippet of driver code to pull data from NiFi that is running on
+ * localhost:8080. This example assumes that NiFi exposes and OutputPort
on the
+ * root group named "Data For Storm". Additionally, it assumes that the
data
+ * that it will receive from this OutputPort is text data, as it will map
the
+ * byte array received from NiFi to a UTF-8 Encoded string.
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * {@code
+ *
+ * // Build a Site-To-Site client config
+ * SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+ * .url("http://localhost:8080/nifi")
+ * .portName("Data for Storm")
+ * .buildConfig();
+ *
+ * // Build a topology starting with a NiFiSpout
+ * TopologyBuilder builder = new TopologyBuilder();
+ * builder.setSpout("nifi", new NiFiSpout(clientConfig));
+ *
+ * // Add a bolt that prints the attributes and content
+ * builder.setBolt("print", new BaseBasicBolt() {
+ * @Override
+ * public void execute(Tuple tuple, BasicOutputCollector collector) {
+ * NiFiDataPacket dp = (NiFiDataPacket)
tuple.getValueByField("nifiDataPacket");
+ * System.out.println("Attributes: " + dp.getAttributes());
+ * System.out.println("Content: " + new String(dp.getContent()));
+ * }
+ *
+ * @Override
+ * public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+ *
+ * }).shuffleGrouping("nifi");
+ *
+ * // Submit the topology running in local mode
+ * Config conf = new Config();
+ * LocalCluster cluster = new LocalCluster();
+ * cluster.submitTopology("test", conf, builder.createTopology());
+ *
+ * Utils.sleep(90000);
+ * cluster.shutdown();
+ * }
+ * </pre>
+ * </code>
+ */
+public class NiFiSpout extends BaseRichSpout {
+
+ private static final long serialVersionUID = 3067274587595578836L;
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(NiFiSpout.class);
+
+ private NiFiSpoutReceiver spoutReceiver;
+ private LinkedBlockingQueue<NiFiDataPacket> queue;
+ private SpoutOutputCollector spoutOutputCollector;
+
+ private final SiteToSiteClientConfig clientConfig;
+
+ public NiFiSpout(SiteToSiteClientConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ this.queue = new LinkedBlockingQueue<>(1000);
+
+ this.spoutReceiver = new NiFiSpoutReceiver();
+ this.spoutReceiver.setDaemon(true);
+ this.spoutReceiver.setName("NiFi Spout Receiver");
+ this.spoutReceiver.start();
+ }
+
+ @Override
+ public void nextTuple() {
+ NiFiDataPacket data = queue.poll();
+ if (data == null) {
+ Utils.sleep(50);
+ } else {
+ spoutOutputCollector.emit(new Values(data));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ spoutReceiver.shutdown();
+ }
+
+ class NiFiSpoutReceiver extends Thread {
+
+ private boolean shutdown = false;
+
+ public synchronized void shutdown() {
+ this.shutdown = true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final SiteToSiteClient client = new
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+ try {
+ while (!shutdown) {
+ final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
+ DataPacket dataPacket = transaction.receive();
+ if (dataPacket == null) {
+ transaction.confirm();
+ transaction.complete();
+
+ // no data available. Wait a bit and try again
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ }
+
+ continue;
+ }
+
+ final List<NiFiDataPacket> dataPackets = new
ArrayList<>();
+ do {
+ // Read the data into a byte array and wrap it
along with the attributes
+ // into a NiFiDataPacket.
+ final InputStream inStream =
dataPacket.getData();
+ final byte[] data = new byte[(int)
dataPacket.getSize()];
+ StreamUtils.fillBuffer(inStream, data);
+
+ final Map<String, String> attributes =
dataPacket.getAttributes();
+ final NiFiDataPacket NiFiDataPacket = new
NiFiDataPacket() {
--- End diff --
Minor: For the sake of consistency, your variable should follow camel
casing.
> Add a NiFi Storm Spout
> ----------------------
>
> Key: NIFI-606
> URL: https://issues.apache.org/jira/browse/NIFI-606
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 0.2.0
>
>
> A nice integration would be a Storm spout that could pull data from NiFi,
> much like the Spark receiver does.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)