Repository: incubator-nifi Updated Branches: refs/heads/develop dca93a507 -> d68f71b12
NIFI-271 Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d68f71b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d68f71b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d68f71b1 Branch: refs/heads/develop Commit: d68f71b126c58112070d81f14e804497f9649e5b Parents: dca93a5 Author: joewitt <[email protected]> Authored: Fri Apr 24 16:44:04 2015 -0400 Committer: joewitt <[email protected]> Committed: Fri Apr 24 16:44:04 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-external/nifi-spark-receiver/pom.xml | 22 +- .../org/apache/nifi/spark/NiFiDataPacket.java | 23 +- .../org/apache/nifi/spark/NiFiReceiver.java | 236 ++++++++++--------- 3 files changed, 145 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml b/nifi/nifi-external/nifi-spark-receiver/pom.xml index 5c93f6b..a6d9378 100644 --- a/nifi/nifi-external/nifi-spark-receiver/pom.xml +++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml @@ -23,15 +23,15 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-spark-receiver</artifactId> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> - <version>1.2.0</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site-client</artifactId> - </dependency> - </dependencies> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + </dependency> + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java index 2f08dc5..484c2a9 100644 --- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java @@ -20,21 +20,20 @@ import java.util.Map; /** * <p> - * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's - * content and its attributes so that they can be processed by Spark + * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both + * a FlowFile's content and its attributes so that they can be processed by + * Spark * </p> */ public interface NiFiDataPacket { - /** - * Returns the contents of a NiFi FlowFile - * @return - */ - byte[] getContent(); + /** + * @return the contents of a NiFi FlowFile + */ + byte[] getContent(); - /** - * Returns a Map of attributes that are associated with the NiFi FlowFile - * @return - */ - Map<String, String> getAttributes(); + /** + * @return a Map of attributes that are associated with the NiFi FlowFile + */ + Map<String, String> getAttributes(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java index 9f31062..8cbf60c 100644 --- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java @@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; - /** * <p> - * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data - * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects - * to NiFi instance provided in the config and requests data from - * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group, - * it acts as a queue of data for remote clients. This receiver is then able to pull that data - * from NiFi reliably. + * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to + * pull data from Apache NiFi so that it can be processed by Spark Streaming. + * The NiFi Receiver connects to NiFi instance provided in the config and + * requests data from the OutputPort that is named. In NiFi, when an OutputPort + * is added to the root process group, it acts as a queue of data for remote + * clients. This receiver is then able to pull that data from NiFi reliably. * </p> - * + * * <p> - * It is important to note that if pulling data from a NiFi cluster, the URL that should be used - * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes - * in that cluster and pull from those nodes as appropriate. + * It is important to note that if pulling data from a NiFi cluster, the URL + * that should be used is that of the NiFi Cluster Manager. The Receiver will + * automatically handle determining the nodes in that cluster and pull from + * those nodes as appropriate. * </p> - * + * * <p> - * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide - * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}. - * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This - * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark". - * Additionally, it assumes that the data that it will receive from this OutputPort is text - * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string. + * In order to use the NiFiReceiver, you will need to first build a + * {@link SiteToSiteClientConfig} to provide to the constructor. This can be + * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example + * snippet of driver code to pull data from NiFi that is running on + * localhost:8080. This example assumes that NiFi exposes and OutputPort on the + * root group named "Data For Spark". Additionally, it assumes that the data + * that it will receive from this OutputPort is text data, as it will map the + * byte array received from NiFi to a UTF-8 Encoded string. * </p> - * + * * <code> * <pre> + * {@code * Pattern SPACE = Pattern.compile(" "); - * + * * // Build a Site-to-site client config * SiteToSiteClientConfig config = new SiteToSiteClient.Builder() * .setUrl("http://localhost:8080/nifi") * .setPortName("Data For Spark") * .buildConfig(); - * + * * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example"); * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L)); - * - * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from + * + * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from * // specified Port - * JavaReceiverInputDStream<NiFiDataPacket> packetStream = + * JavaReceiverInputDStream<NiFiDataPacket> packetStream = * ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY())); - * + * * // Map the data from NiFi to text, ignoring the attributes * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() { * public String call(final NiFiDataPacket dataPacket) throws Exception { * return new String(dataPacket.getContent(), StandardCharsets.UTF_8); * } * }); - * + * * // Split the words by spaces * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() { * public Iterable<String> call(final String text) throws Exception { * return Arrays.asList(SPACE.split(text)); * } * }); - * + * * // Map each word to the number 1, then aggregate by key * JavaPairDStream<String, Integer> wordCounts = words.mapToPair( * new PairFunction<String, String, Integer>() { @@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver; * } * } * ); - * + * * // print the results * wordCounts.print(); * ssc.start(); * ssc.awaitTermination(); + * } * </pre> * </code> */ public class NiFiReceiver extends Receiver<NiFiDataPacket> { - private static final long serialVersionUID = 3067274587595578836L; - private final SiteToSiteClientConfig clientConfig; - - public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { - super(storageLevel); - this.clientConfig = clientConfig; - } - - @Override - public void onStart() { - final Thread thread = new Thread(new ReceiveRunnable()); - thread.setDaemon(true); - thread.setName("NiFi Receiver"); - thread.start(); - } - - @Override - public void onStop() { - } - - class ReceiveRunnable implements Runnable { - public ReceiveRunnable() { - } - - public void run() { - try { - final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - try { - while ( !isStopped() ) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - DataPacket dataPacket = transaction.receive(); - if ( dataPacket == null ) { - transaction.confirm(); - transaction.complete(); - - // no data available. Wait a bit and try again - try { - Thread.sleep(1000L); - } catch (InterruptedException e) {} - - continue; - } - - final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map<String, String> attributes = dataPacket.getAttributes(); - final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() { - public byte[] getContent() { - return data; - } - - public Map<String, String> getAttributes() { - return attributes; - } - }; - - dataPackets.add(NiFiDataPacket); - dataPacket = transaction.receive(); - } while ( dataPacket != null ); - - // Confirm transaction to verify the data - transaction.confirm(); - - store(dataPackets.iterator()); - - transaction.complete(); - } - } finally { - try { - client.close(); - } catch (final IOException ioe) { - reportError("Failed to close client", ioe); - } - } - } catch (final IOException ioe) { - restart("Failed to receive data from NiFi", ioe); - } - } - } + + private static final long serialVersionUID = 3067274587595578836L; + private final SiteToSiteClientConfig clientConfig; + + public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { + super(storageLevel); + this.clientConfig = clientConfig; + } + + @Override + public void onStart() { + final Thread thread = new Thread(new ReceiveRunnable()); + thread.setDaemon(true); + thread.setName("NiFi Receiver"); + thread.start(); + } + + @Override + public void onStop() { + } + + class ReceiveRunnable implements Runnable { + + public ReceiveRunnable() { + } + + @Override + public void run() { + try { + final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + try { + while (!isStopped()) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); + + // no data available. Wait a bit and try again + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + + continue; + } + + final List<NiFiDataPacket> dataPackets = new ArrayList<>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map<String, String> attributes = dataPacket.getAttributes(); + final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() { + @Override + public byte[] getContent() { + return data; + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + }; + + dataPackets.add(NiFiDataPacket); + dataPacket = transaction.receive(); + } while (dataPacket != null); + + // Confirm transaction to verify the data + transaction.confirm(); + + store(dataPackets.iterator()); + + transaction.complete(); + } + } finally { + try { + client.close(); + } catch (final IOException ioe) { + reportError("Failed to close client", ioe); + } + } + } catch (final IOException ioe) { + restart("Failed to receive data from NiFi", ioe); + } + } + } }
