Repository: incubator-nifi
Updated Branches:
  refs/heads/nifi-site-to-site-client e16fc7972 -> 2f60ddc03


NIFI-282: Added send(byte[], Map<String, String>) method to avoid having to 
create a DataPacket object


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2f60ddc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2f60ddc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2f60ddc0

Branch: refs/heads/nifi-site-to-site-client
Commit: 2f60ddc03a3e867ea3b0826621aa63439a10bee7
Parents: e16fc79
Author: Mark Payne <[email protected]>
Authored: Mon Feb 16 15:18:57 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Feb 16 15:18:57 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/remote/Transaction.java  | 11 +++++++++++
 .../nifi/remote/client/socket/SocketClient.java   |  6 ++++++
 .../apache/nifi/remote/protocol/DataPacket.java   | 18 +++++++++++++++++-
 .../protocol/socket/SocketClientTransaction.java  |  8 ++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 51bf244..eb7312d 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.nifi.remote.protocol.DataPacket;
 
@@ -81,6 +82,16 @@ public interface Transaction {
     void send(DataPacket dataPacket) throws IOException;
     
     /**
+     * Sends the given byte array as the content of a {@link DataPacket} along 
with the
+     * provided attributes
+     * 
+     * @param content
+     * @param attributes
+     * @throws IOException
+     */
+    void send(byte[] content, Map<String, String> attributes) throws 
IOException;
+    
+    /**
      * Retrieves information from the remote NiFi instance, if any is 
available. If no data is available, will return
      * {@code null}. It is important to consume all data from the remote NiFi 
instance before attempting to 
      * call {@link #confirm()}. This is because the sender is always 
responsible for determining when the Transaction

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c11c2ab..bd9319f 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote.client.socket;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.remote.Communicant;
@@ -186,6 +187,11 @@ public class SocketClient implements SiteToSiteClient {
                        }
 
                        @Override
+                       public void send(final byte[] content, final 
Map<String, String> attributes) throws IOException {
+                           transaction.send(content, attributes);
+                       }
+                       
+                       @Override
                        public DataPacket receive() throws IOException {
                                return transaction.receive();
                        }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index f4fa4d0..3f0ec4f 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -19,11 +19,27 @@ package org.apache.nifi.remote.protocol;
 import java.io.InputStream;
 import java.util.Map;
 
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from 
a NiFi instance.
+ */
 public interface DataPacket {
 
+    /**
+     * The key-value attributes that are to be associated with the data
+     * @return
+     */
        Map<String, String> getAttributes();
        
+       /**
+        * An InputStream from which the content can be read
+        * @return
+        */
        InputStream getData();
-       
+
+       /**
+        * The length of the InputStream.
+        * @return
+        */
        long getSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index b2fffed..2fbcfc4 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
@@ -36,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,6 +175,11 @@ public class SocketClientTransaction implements 
Transaction {
        
        
        @Override
+       public void send(final byte[] content, final Map<String, String> 
attributes) throws IOException {
+           send(new StandardDataPacket(attributes, new 
ByteArrayInputStream(content), content.length));
+       }
+       
+       @Override
        public void send(final DataPacket dataPacket) throws IOException {
            try {
                try {

Reply via email to