Repository: incubator-nifi
Updated Branches:
  refs/heads/develop dca93a507 -> d68f71b12


NIFI-271


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d68f71b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d68f71b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d68f71b1

Branch: refs/heads/develop
Commit: d68f71b126c58112070d81f14e804497f9649e5b
Parents: dca93a5
Author: joewitt <[email protected]>
Authored: Fri Apr 24 16:44:04 2015 -0400
Committer: joewitt <[email protected]>
Committed: Fri Apr 24 16:44:04 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-external/nifi-spark-receiver/pom.xml  |  22 +-
 .../org/apache/nifi/spark/NiFiDataPacket.java   |  23 +-
 .../org/apache/nifi/spark/NiFiReceiver.java     | 236 ++++++++++---------
 3 files changed, 145 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml 
b/nifi/nifi-external/nifi-spark-receiver/pom.xml
index 5c93f6b..a6d9378 100644
--- a/nifi/nifi-external/nifi-spark-receiver/pom.xml
+++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml
@@ -23,15 +23,15 @@
     <groupId>org.apache.nifi</groupId>
     <artifactId>nifi-spark-receiver</artifactId>
 
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.spark</groupId>
-                       <artifactId>spark-streaming_2.10</artifactId>
-                       <version>1.2.0</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.nifi</groupId>
-                       <artifactId>nifi-site-to-site-client</artifactId>
-               </dependency>
-       </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
index 2f08dc5..484c2a9 100644
--- 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
+++ 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
@@ -20,21 +20,20 @@ import java.util.Map;
 
 /**
  * <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both a FlowFile's
- * content and its attributes so that they can be processed by Spark
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both
+ * a FlowFile's content and its attributes so that they can be processed by
+ * Spark
  * </p>
  */
 public interface NiFiDataPacket {
 
-       /**
-        * Returns the contents of a NiFi FlowFile
-        * @return
-        */
-       byte[] getContent();
+    /**
+     * @return the contents of a NiFi FlowFile
+     */
+    byte[] getContent();
 
-       /**
-        * Returns a Map of attributes that are associated with the NiFi 
FlowFile
-        * @return
-        */
-       Map<String, String> getAttributes();
+    /**
+     * @return a Map of attributes that are associated with the NiFi FlowFile
+     */
+    Map<String, String> getAttributes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
index 9f31062..8cbf60c 100644
--- 
a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++ 
b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 
-
 /**
  * <p>
- * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to 
pull data 
- * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi 
Receiver connects 
- * to NiFi instance provided in the config and requests data from
- * the OutputPort that is named. In NiFi, when an OutputPort is added to the 
root process group,
- * it acts as a queue of data for remote clients. This receiver is then able 
to pull that data
- * from NiFi reliably.
+ * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
+ * pull data from Apache NiFi so that it can be processed by Spark Streaming.
+ * The NiFi Receiver connects to NiFi instance provided in the config and
+ * requests data from the OutputPort that is named. In NiFi, when an OutputPort
+ * is added to the root process group, it acts as a queue of data for remote
+ * clients. This receiver is then able to pull that data from NiFi reliably.
  * </p>
- * 
+ *
  * <p>
- * It is important to note that if pulling data from a NiFi cluster, the URL 
that should be used
- * is that of the NiFi Cluster Manager. The Receiver will automatically handle 
determining the nodes
- * in that cluster and pull from those nodes as appropriate.
+ * It is important to note that if pulling data from a NiFi cluster, the URL
+ * that should be used is that of the NiFi Cluster Manager. The Receiver will
+ * automatically handle determining the nodes in that cluster and pull from
+ * those nodes as appropriate.
  * </p>
- * 
+ *
  * <p>
- * In order to use the NiFiReceiver, you will need to first build a {@link 
SiteToSiteClientConfig} to provide 
- * to the constructor. This can be achieved by using the {@link 
SiteToSiteClient.Builder}.
- * Below is an example snippet of driver code to pull data from NiFi that is 
running on localhost:8080. This
- * example assumes that NiFi exposes and OutputPort on the root group named 
"Data For Spark".
- * Additionally, it assumes that the data that it will receive from this 
OutputPort is text
- * data, as it will map the byte array received from NiFi to a UTF-8 Encoded 
string.
+ * In order to use the NiFiReceiver, you will need to first build a
+ * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
+ * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
+ * snippet of driver code to pull data from NiFi that is running on
+ * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
+ * root group named "Data For Spark". Additionally, it assumes that the data
+ * that it will receive from this OutputPort is text data, as it will map the
+ * byte array received from NiFi to a UTF-8 Encoded string.
  * </p>
- * 
+ *
  * <code>
  * <pre>
+ * {@code
  * Pattern SPACE = Pattern.compile(" ");
- * 
+ *
  * // Build a Site-to-site client config
  * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  *   .setUrl("http://localhost:8080/nifi";)
  *   .setPortName("Data For Spark")
  *   .buildConfig();
- * 
+ *
  * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming 
example");
  * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(1000L));
- * 
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can 
pull data from 
+ *
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can 
pull data from
  * // specified Port
- * JavaReceiverInputDStream<NiFiDataPacket> packetStream = 
+ * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
  *     ssc.receiverStream(new NiFiReceiver(clientConfig, 
StorageLevel.MEMORY_ONLY()));
- * 
+ *
  * // Map the data from NiFi to text, ignoring the attributes
  * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, 
String>() {
  *   public String call(final NiFiDataPacket dataPacket) throws Exception {
  *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
  *   }
  * });
- * 
+ *
  * // Split the words by spaces
  * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, 
String>() {
  *   public Iterable<String> call(final String text) throws Exception {
  *     return Arrays.asList(SPACE.split(text));
  *   }
  * });
- *         
+ *
  * // Map each word to the number 1, then aggregate by key
  * JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  *   new PairFunction<String, String, Integer>() {
@@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver;
  *     }
  *    }
  *  );
- * 
+ *
  * // print the results
  * wordCounts.print();
  * ssc.start();
  * ssc.awaitTermination();
+ * }
  * </pre>
  * </code>
  */
 public class NiFiReceiver extends Receiver<NiFiDataPacket> {
-       private static final long serialVersionUID = 3067274587595578836L;
-       private final SiteToSiteClientConfig clientConfig;
-       
-       public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final 
StorageLevel storageLevel) {
-               super(storageLevel);
-               this.clientConfig = clientConfig;
-       }
-       
-       @Override
-       public void onStart() {
-               final Thread thread = new Thread(new ReceiveRunnable());
-               thread.setDaemon(true);
-               thread.setName("NiFi Receiver");
-               thread.start();
-       }
-
-       @Override
-       public void onStop() {
-       }
-
-       class ReceiveRunnable implements Runnable {
-               public ReceiveRunnable() {
-               }
-               
-               public void run() {
-                       try {
-                               final SiteToSiteClient client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-                               try {
-                                       while ( !isStopped() ) {
-                                               final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-                                               DataPacket dataPacket = 
transaction.receive();
-                                               if ( dataPacket == null ) {
-                                                       transaction.confirm();
-                                                       transaction.complete();
-                                                       
-                                                       // no data available. 
Wait a bit and try again
-                                                       try {
-                                                               
Thread.sleep(1000L);
-                                                       } catch 
(InterruptedException e) {}
-                                                       
-                                                       continue;
-                                               }
-       
-                                               final List<NiFiDataPacket> 
dataPackets = new ArrayList<NiFiDataPacket>();
-                                               do {
-                                                       // Read the data into a 
byte array and wrap it along with the attributes
-                                                       // into a 
NiFiDataPacket.
-                                                       final InputStream 
inStream = dataPacket.getData();
-                                                       final byte[] data = new 
byte[(int) dataPacket.getSize()];
-                                                       
StreamUtils.fillBuffer(inStream, data);
-                                                       
-                                                       final Map<String, 
String> attributes = dataPacket.getAttributes();
-                                                       final NiFiDataPacket 
NiFiDataPacket = new NiFiDataPacket() {
-                                                               public byte[] 
getContent() {
-                                                                       return 
data;
-                                                               }
-
-                                                               public 
Map<String, String> getAttributes() {
-                                                                       return 
attributes;
-                                                               }
-                                                       };
-                                                       
-                                                       
dataPackets.add(NiFiDataPacket);
-                                                       dataPacket = 
transaction.receive();
-                                               } while ( dataPacket != null );
-
-                                               // Confirm transaction to 
verify the data
-                                               transaction.confirm();
-                                               
-                                               store(dataPackets.iterator());
-                                               
-                                               transaction.complete();
-                                       }
-                               } finally {
-                                       try {
-                                               client.close();
-                                       } catch (final IOException ioe) {
-                                               reportError("Failed to close 
client", ioe);
-                                       }
-                               }
-                       } catch (final IOException ioe) {
-                               restart("Failed to receive data from NiFi", 
ioe);
-                       }
-               }
-       }
+
+    private static final long serialVersionUID = 3067274587595578836L;
+    private final SiteToSiteClientConfig clientConfig;
+
+    public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final 
StorageLevel storageLevel) {
+        super(storageLevel);
+        this.clientConfig = clientConfig;
+    }
+
+    @Override
+    public void onStart() {
+        final Thread thread = new Thread(new ReceiveRunnable());
+        thread.setDaemon(true);
+        thread.setName("NiFi Receiver");
+        thread.start();
+    }
+
+    @Override
+    public void onStop() {
+    }
+
+    class ReceiveRunnable implements Runnable {
+
+        public ReceiveRunnable() {
+        }
+
+        @Override
+        public void run() {
+            try {
+                final SiteToSiteClient client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+                try {
+                    while (!isStopped()) {
+                        final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                        DataPacket dataPacket = transaction.receive();
+                        if (dataPacket == null) {
+                            transaction.confirm();
+                            transaction.complete();
+
+                            // no data available. Wait a bit and try again
+                            try {
+                                Thread.sleep(1000L);
+                            } catch (InterruptedException e) {
+                            }
+
+                            continue;
+                        }
+
+                        final List<NiFiDataPacket> dataPackets = new 
ArrayList<>();
+                        do {
+                            // Read the data into a byte array and wrap it 
along with the attributes
+                            // into a NiFiDataPacket.
+                            final InputStream inStream = dataPacket.getData();
+                            final byte[] data = new byte[(int) 
dataPacket.getSize()];
+                            StreamUtils.fillBuffer(inStream, data);
+
+                            final Map<String, String> attributes = 
dataPacket.getAttributes();
+                            final NiFiDataPacket NiFiDataPacket = new 
NiFiDataPacket() {
+                                @Override
+                                public byte[] getContent() {
+                                    return data;
+                                }
+
+                                @Override
+                                public Map<String, String> getAttributes() {
+                                    return attributes;
+                                }
+                            };
+
+                            dataPackets.add(NiFiDataPacket);
+                            dataPacket = transaction.receive();
+                        } while (dataPacket != null);
+
+                        // Confirm transaction to verify the data
+                        transaction.confirm();
+
+                        store(dataPackets.iterator());
+
+                        transaction.complete();
+                    }
+                } finally {
+                    try {
+                        client.close();
+                    } catch (final IOException ioe) {
+                        reportError("Failed to close client", ioe);
+                    }
+                }
+            } catch (final IOException ioe) {
+                restart("Failed to receive data from NiFi", ioe);
+            }
+        }
+    }
 }

Reply via email to