Repository: incubator-nifi Updated Branches: refs/heads/nifi-site-to-site-client e16fc7972 -> 2f60ddc03
NIFI-282: Added send(byte[], Map<String, String>) method to avoid having to create a DataPacket object Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2f60ddc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2f60ddc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2f60ddc0 Branch: refs/heads/nifi-site-to-site-client Commit: 2f60ddc03a3e867ea3b0826621aa63439a10bee7 Parents: e16fc79 Author: Mark Payne <[email protected]> Authored: Mon Feb 16 15:18:57 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Feb 16 15:18:57 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/nifi/remote/Transaction.java | 11 +++++++++++ .../nifi/remote/client/socket/SocketClient.java | 6 ++++++ .../apache/nifi/remote/protocol/DataPacket.java | 18 +++++++++++++++++- .../protocol/socket/SocketClientTransaction.java | 8 ++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java index 51bf244..eb7312d 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote; import java.io.IOException; +import java.util.Map; import org.apache.nifi.remote.protocol.DataPacket; @@ -81,6 +82,16 @@ public interface Transaction { void send(DataPacket dataPacket) throws IOException; /** + * Sends the given byte array as the content of a {@link DataPacket} along with the + * provided attributes + * + * @param content + * @param attributes + * @throws IOException + */ + void send(byte[] content, Map<String, String> attributes) throws IOException; + + /** * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index c11c2ab..bd9319f 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -17,6 +17,7 @@ package org.apache.nifi.remote.client.socket; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.nifi.remote.Communicant; @@ -186,6 +187,11 @@ public class SocketClient implements SiteToSiteClient { } @Override + public void send(final byte[] content, final Map<String, String> attributes) throws IOException { + transaction.send(content, attributes); + } + + @Override public DataPacket receive() throws IOException { return transaction.receive(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java index f4fa4d0..3f0ec4f 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java @@ -19,11 +19,27 @@ package org.apache.nifi.remote.protocol; import java.io.InputStream; import java.util.Map; + +/** + * Represents a piece of data that is to be sent to or that was received from a NiFi instance. + */ public interface DataPacket { + /** + * The key-value attributes that are to be associated with the data + * @return + */ Map<String, String> getAttributes(); + /** + * An InputStream from which the content can be read + * @return + */ InputStream getData(); - + + /** + * The length of the InputStream. + * @return + */ long getSize(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index b2fffed..2fbcfc4 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.remote.protocol.socket; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; @@ -36,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.util.StandardDataPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,6 +175,11 @@ public class SocketClientTransaction implements Transaction { @Override + public void send(final byte[] content, final Map<String, String> attributes) throws IOException { + send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length)); + } + + @Override public void send(final DataPacket dataPacket) throws IOException { try { try {
