Github user apiri commented on a diff in the pull request:
https://github.com/apache/incubator-nifi/pull/61#discussion_r31829018
--- Diff:
nifi/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * <p>
+ * The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi
so
+ * that it can be processed by Apache Storm. The NiFi Spout connects to a
NiFi
+ * instance provided in the config and requests data from the OutputPort
that
+ * is named. In NiFi, when an OutputPort is added to the root process
group,
+ * it acts as a queue of data for remote clients. This spout is then able
to
+ * pull that data from NiFi reliably.
+ * </p>
+ *
+ * <p>
+ * It is important to note that if pulling data from a NiFi cluster, the
URL
+ * that should be used is that of the NiFi Cluster Manager. The Receiver
will
+ * automatically handle determining the nodes in that cluster and pull from
+ * those nodes as appropriate.
+ * </p>
+ *
+ * <p>
+ * In order to use the NiFiSpout, you will need to first build a
+ * {@link SiteToSiteClientConfig} to provide to the constructor. This can
be
+ * achieved by using the {@link SiteToSiteClient.Builder}. Below is an
example
+ * snippet of driver code to pull data from NiFi that is running on
+ * localhost:8080. This example assumes that NiFi exposes and OutputPort
on the
+ * root group named "Data For Storm". Additionally, it assumes that the
data
+ * that it will receive from this OutputPort is text data, as it will map
the
+ * byte array received from NiFi to a UTF-8 Encoded string.
+ * </p>
+ *
+ * <code>
+ * <pre>
+ * {@code
+ *
+ * // Build a Site-To-Site client config
+ * SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
+ * .url("http://localhost:8080/nifi")
+ * .portName("Data for Storm")
+ * .buildConfig();
+ *
+ * // Build a topology starting with a NiFiSpout
+ * TopologyBuilder builder = new TopologyBuilder();
+ * builder.setSpout("nifi", new NiFiSpout(clientConfig));
+ *
+ * // Add a bolt that prints the attributes and content
+ * builder.setBolt("print", new BaseBasicBolt() {
+ * @Override
+ * public void execute(Tuple tuple, BasicOutputCollector collector) {
+ * NiFiDataPacket dp = (NiFiDataPacket)
tuple.getValueByField("nifiDataPacket");
+ * System.out.println("Attributes: " + dp.getAttributes());
+ * System.out.println("Content: " + new String(dp.getContent()));
+ * }
+ *
+ * @Override
+ * public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+ *
+ * }).shuffleGrouping("nifi");
+ *
+ * // Submit the topology running in local mode
+ * Config conf = new Config();
+ * LocalCluster cluster = new LocalCluster();
+ * cluster.submitTopology("test", conf, builder.createTopology());
+ *
+ * Utils.sleep(90000);
+ * cluster.shutdown();
+ * }
+ * </pre>
+ * </code>
+ */
+public class NiFiSpout extends BaseRichSpout {
+
+ private static final long serialVersionUID = 3067274587595578836L;
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(NiFiSpout.class);
+
+ private NiFiSpoutReceiver spoutReceiver;
+ private LinkedBlockingQueue<NiFiDataPacket> queue;
+ private SpoutOutputCollector spoutOutputCollector;
+
+ private final SiteToSiteClientConfig clientConfig;
+
+ public NiFiSpout(SiteToSiteClientConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ this.queue = new LinkedBlockingQueue<>(1000);
+
+ this.spoutReceiver = new NiFiSpoutReceiver();
+ this.spoutReceiver.setDaemon(true);
+ this.spoutReceiver.setName("NiFi Spout Receiver");
+ this.spoutReceiver.start();
+ }
+
+ @Override
+ public void nextTuple() {
+ NiFiDataPacket data = queue.poll();
+ if (data == null) {
+ Utils.sleep(50);
+ } else {
+ spoutOutputCollector.emit(new Values(data));
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ spoutReceiver.shutdown();
+ }
+
+ class NiFiSpoutReceiver extends Thread {
+
+ private boolean shutdown = false;
+
+ public synchronized void shutdown() {
+ this.shutdown = true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final SiteToSiteClient client = new
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+ try {
+ while (!shutdown) {
+ final Transaction transaction =
client.createTransaction(TransferDirection.RECEIVE);
+ DataPacket dataPacket = transaction.receive();
+ if (dataPacket == null) {
+ transaction.confirm();
+ transaction.complete();
+
+ // no data available. Wait a bit and try again
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ }
+
+ continue;
+ }
+
+ final List<NiFiDataPacket> dataPackets = new
ArrayList<>();
+ do {
+ // Read the data into a byte array and wrap it
along with the attributes
+ // into a NiFiDataPacket.
+ final InputStream inStream =
dataPacket.getData();
+ final byte[] data = new byte[(int)
dataPacket.getSize()];
+ StreamUtils.fillBuffer(inStream, data);
+
+ final Map<String, String> attributes =
dataPacket.getAttributes();
+ final NiFiDataPacket NiFiDataPacket = new
NiFiDataPacket() {
--- End diff --
Minor: For the sake of consistency, your variable should follow camel
casing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---