Repository: nifi
Updated Branches:
  refs/heads/master e230e50f9 -> 4e13fef72


NIFI-2783 - Site-to-site command line client - s2s.sh shell script warnings -> 
stderr, usage improvement with examples - Increasing heap settings for s2s cli

This closes #1056.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e13fef7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e13fef7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e13fef7

Branch: refs/heads/master
Commit: 4e13fef7243935962848ae83b3e31a5283c71c66
Parents: e230e50
Author: Bryan Rosander <bryanrosan...@gmail.com>
Authored: Fri Sep 23 11:32:05 2016 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Fri Sep 30 09:12:16 2016 -0400

----------------------------------------------------------------------
 nifi-toolkit/nifi-toolkit-assembly/pom.xml      |   4 +
 .../src/main/resources/bin/s2s.bat              |  40 +++
 .../src/main/resources/bin/s2s.sh               | 120 +++++++++
 nifi-toolkit/nifi-toolkit-s2s/pom.xml           |  43 +++
 .../apache/nifi/toolkit/s2s/DataPacketDto.java  | 136 ++++++++++
 .../apache/nifi/toolkit/s2s/DataPacketImpl.java |  97 +++++++
 .../nifi/toolkit/s2s/SiteToSiteCliMain.java     | 260 +++++++++++++++++++
 .../nifi/toolkit/s2s/SiteToSiteReceiver.java    |  73 ++++++
 .../nifi/toolkit/s2s/SiteToSiteSender.java      |  60 +++++
 .../nifi/toolkit/s2s/DataPacketDtoTest.java     |  96 +++++++
 .../nifi/toolkit/s2s/DataPacketImplTest.java    |  85 ++++++
 .../nifi/toolkit/s2s/SiteToSiteCliMainTest.java | 245 +++++++++++++++++
 .../toolkit/s2s/SiteToSiteReceiverTest.java     |  98 +++++++
 .../nifi/toolkit/s2s/SiteToSiteSenderTest.java  | 130 ++++++++++
 nifi-toolkit/pom.xml                            |   1 +
 pom.xml                                         |   5 +
 16 files changed, 1493 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml 
b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
index 9df9b0c..8d189e4 100644
--- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
@@ -69,6 +69,10 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-toolkit-encrypt-config</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-toolkit-s2s</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <scope>compile</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
new file mode 100644
index 0000000..6155715
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
@@ -0,0 +1,40 @@
+@echo off
+rem
+rem    Licensed to the Apache Software Foundation (ASF) under one or more
+rem    contributor license agreements.  See the NOTICE file distributed with
+rem    this work for additional information regarding copyright ownership.
+rem    The ASF licenses this file to You under the Apache License, Version 2.0
+rem    (the "License"); you may not use this file except in compliance with
+rem    the License.  You may obtain a copy of the License at
+rem
+rem       http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem    Unless required by applicable law or agreed to in writing, software
+rem    distributed under the License is distributed on an "AS IS" BASIS,
+rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem    See the License for the specific language governing permissions and
+rem    limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+
+if "%JAVA_HOME%" == "" goto noJavaHome
+if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
+set JAVA_EXE=%JAVA_HOME%\bin\java.exe
+goto startConfig
+
+:noJavaHome
+echo The JAVA_HOME environment variable is not defined correctly.
+echo Instead the PATH will be used to find the java executable.
+echo.
+set JAVA_EXE=java
+goto startConfig
+
+:startConfig
+set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms128m -Xmx256m %JAVA_ARGS% 
org.apache.nifi.toolkit.s2s.SiteToSiteCliMain
+
+cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %*
+
+popd

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh 
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
new file mode 100644
index 0000000..e113ca0
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
@@ -0,0 +1,120 @@
+#!/bin/sh
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+#
+
+# Script structure inspired from Apache Karaf and other Apache projects with 
similar startup approaches
+
+SCRIPT_DIR=$(dirname "$0")
+SCRIPT_NAME=$(basename "$0")
+NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
+PROGNAME=$(basename "$0")
+
+
+warn() {
+    (>&2 echo "${PROGNAME}: $*")
+}
+
+die() {
+    warn "$*"
+    exit 1
+}
+
+detectOS() {
+    # OS specific support (must be 'true' or 'false').
+    cygwin=false;
+    aix=false;
+    os400=false;
+    darwin=false;
+    case "$(uname)" in
+        CYGWIN*)
+            cygwin=true
+            ;;
+        AIX*)
+            aix=true
+            ;;
+        OS400*)
+            os400=true
+            ;;
+        Darwin)
+            darwin=true
+            ;;
+    esac
+    # For AIX, set an environment variable
+    if ${aix}; then
+         export LDR_CNTRL=MAXDATA=0xB0000000@DSA
+         echo ${LDR_CNTRL}
+    fi
+}
+
+locateJava() {
+    # Setup the Java Virtual Machine
+    if $cygwin ; then
+        [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
+        [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
+    fi
+
+    if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
+        JAVA_HOME=$(java-config --jre-home)
+    fi
+    if [ "x${JAVA}" = "x" ]; then
+        if [ "x${JAVA_HOME}" != "x" ]; then
+            if [ ! -d "${JAVA_HOME}" ]; then
+                die "JAVA_HOME is not valid: ${JAVA_HOME}"
+            fi
+            JAVA="${JAVA_HOME}/bin/java"
+        else
+            warn "JAVA_HOME not set; results may vary"
+            JAVA=$(type java)
+            JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
+            if [ "x${JAVA}" = "x" ]; then
+                die "java command not found"
+            fi
+        fi
+    fi
+}
+
+init() {
+    # Determine if there is special OS handling we must perform
+    detectOS
+
+    # Locate the Java VM to execute
+    locateJava "$1"
+}
+
+run() {
+    LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
+
+    sudo_cmd_prefix=""
+    if $cygwin; then
+        NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
+        CLASSPATH="$NIFI_TOOLKIT_HOME/classpath";$(cygpath --path --windows 
"${LIBS}")
+    else
+        CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
+    fi
+
+   export JAVA_HOME="$JAVA_HOME"
+   export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
+
+   umask 0077
+   "${JAVA}" -cp "${CLASSPATH}" -Xms128m -Xmx256m 
org.apache.nifi.toolkit.s2s.SiteToSiteCliMain "$@"
+   return $?
+}
+
+
+init "$1"
+run "$@"

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-toolkit/nifi-toolkit-s2s/pom.xml 
b/nifi-toolkit/nifi-toolkit-s2s/pom.xml
new file mode 100644
index 0000000..fac6bf1
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-s2s/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-toolkit</artifactId>
+        <version>1.1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-toolkit-s2s</artifactId>
+    <description>Site-to-site cli</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
new file mode 100644
index 0000000..8d7e7b3
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * DTO object for serializing and deserializing DataPackets via JSON
+ */
+public class DataPacketDto {
+    public static final TypeReference<DataPacketDto> 
DATA_PACKET_DTO_TYPE_REFERENCE = new TypeReference<DataPacketDto>() {
+    };
+    private Map<String, String> attributes;
+    private byte[] data;
+    private String dataFile;
+
+    public DataPacketDto() {
+        this(null);
+    }
+
+    public DataPacketDto(byte[] data) {
+        this(new HashMap<>(), data);
+    }
+
+    public DataPacketDto(Map<String, String> attributes, byte[] data) {
+        this(attributes, data, null);
+    }
+
+    public DataPacketDto(Map<String, String> attributes, String dataFile) {
+        this(attributes, null, dataFile);
+    }
+
+    public DataPacketDto(Map<String, String> attributes, byte[] data, String 
dataFile) {
+        this.attributes = attributes;
+        this.data = data;
+        this.dataFile = dataFile;
+    }
+
+    public static Stream<DataPacket> getDataPacketStream(InputStream 
inputStream) throws IOException {
+        JsonParser jsonParser = new JsonFactory().createParser(inputStream);
+        if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
+            throw new IOException("Expecting start array token to begin object 
array.");
+        }
+        jsonParser.setCodec(new ObjectMapper());
+        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new 
Iterator<DataPacket>() {
+            DataPacket next = getNext();
+
+            @Override
+            public boolean hasNext() {
+                return next != null;
+            }
+
+            @Override
+            public DataPacket next() {
+                DataPacket next = this.next;
+                this.next = getNext();
+                return next;
+            }
+
+            DataPacket getNext() throws RuntimeException {
+                try {
+                    if (jsonParser.nextToken() == JsonToken.END_ARRAY) {
+                        return null;
+                    }
+                    DataPacketDto dataPacketDto = 
jsonParser.readValueAs(DATA_PACKET_DTO_TYPE_REFERENCE);
+                    return dataPacketDto.toDataPacket();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, Spliterator.ORDERED), false);
+    }
+
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    public void setAttributes(Map<String, String> attributes) {
+        this.attributes = attributes;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public String getDataFile() {
+        return dataFile;
+    }
+
+    public void setDataFile(String dataFile) {
+        this.dataFile = dataFile;
+    }
+
+    public DataPacket toDataPacket() {
+        return new DataPacketImpl(attributes, data, dataFile);
+    }
+
+    public DataPacketDto putAttribute(String key, String value) {
+        attributes.put(key, value);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
new file mode 100644
index 0000000..8675e33
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements DataPacket either taking the data passed in or reading from a 
file on disk
+ */
+public class DataPacketImpl implements DataPacket {
+    private final Map<String, String> attributes;
+    private final byte[] data;
+    private final String dataFile;
+
+    public DataPacketImpl(Map<String, String> attributes, byte[] data, String 
dataFile) {
+        this.attributes = attributes == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(new HashMap<>(attributes));
+        this.data = data;
+        this.dataFile = dataFile;
+    }
+
+    @Override
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    @Override
+    public InputStream getData() {
+        if (data == null) {
+            if (dataFile != null && dataFile.length() > 0) {
+                try {
+                    return new FileInputStream(dataFile);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                return new ByteArrayInputStream(new byte[0]);
+            }
+        }
+        return new ByteArrayInputStream(data);
+    }
+
+    @Override
+    public long getSize() {
+        if (data == null) {
+            if (dataFile != null && dataFile.length() > 0) {
+                return new File(dataFile).length();
+            } else {
+                return 0;
+            }
+        }
+        return data.length;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DataPacketImpl that = (DataPacketImpl) o;
+
+        if (attributes != null ? !attributes.equals(that.attributes) : 
that.attributes != null) return false;
+        return Arrays.equals(data, that.data);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = attributes != null ? attributes.hashCode() : 0;
+        result = 31 * result + Arrays.hashCode(data);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
new file mode 100644
index 0000000..f03a0bf
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.KeystoreType;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.util.FormatUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class SiteToSiteCliMain {
+    public static final String URL_OPTION = "url";
+    public static final String URL_OPTION_DEFAULT = 
"http://localhost:8080/nifi";;
+    public static final String DIRECTION_OPTION = "direction";
+    public static final String DIRECTION_OPTION_DEFAULT = 
TransferDirection.SEND.toString();
+    public static final String PORT_NAME_OPTION = "portName";
+    public static final String PORT_IDENTIFIER_OPTION = "portIdentifier";
+    public static final String TIMEOUT_OPTION = "timeout";
+    public static final String PENALIZATION_OPTION = "penalization";
+    public static final String KEYSTORE_OPTION = "keyStore";
+    public static final String KEY_STORE_TYPE_OPTION = "keyStoreType";
+    public static final String KEY_STORE_PASSWORD_OPTION = "keyStorePassword";
+    public static final String TRUST_STORE_OPTION = "trustStore";
+    public static final String TRUST_STORE_TYPE_OPTION = "trustStoreType";
+    public static final String TRUST_STORE_PASSWORD_OPTION = 
"trustStorePassword";
+    public static final String PEER_PERSISTENCE_FILE_OPTION = 
"peerPersistenceFile";
+    public static final String COMPRESSION_OPTION = "compression";
+    public static final String TRANSPORT_PROTOCOL_OPTION = "transportProtocol";
+    public static final String TRANSPORT_PROTOCOL_OPTION_DEFAULT = 
SiteToSiteTransportProtocol.RAW.toString();
+    public static final String BATCH_COUNT_OPTION = "batchCount";
+    public static final String BATCH_SIZE_OPTION = "batchSize";
+    public static final String BATCH_DURATION_OPTION = "batchDuration";
+    public static final String HELP_OPTION = "help";
+    public static final String PROXY_HOST_OPTION = "proxyHost";
+    public static final String PROXY_PORT_OPTION = "proxyPort";
+    public static final String PROXY_USERNAME_OPTION = "proxyUsername";
+    public static final String PROXY_PASSWORD_OPTION = "proxyPassword";
+    public static final String PROXY_PORT_OPTION_DEFAULT = "80";
+    public static final String KEYSTORE_TYPE_OPTION_DEFAULT = 
KeystoreType.JKS.toString();
+    public static final String NEED_CLIENT_AUTH_OPTION = "needClientAuth";
+
+    /**
+     * Prints the usage to System.out
+     *
+     * @param errorMessage optional error message
+     * @param options      the options object to print usage for
+     */
+    public static void printUsage(String errorMessage, Options options) {
+        if (errorMessage != null) {
+            System.out.println(errorMessage);
+            System.out.println();
+            System.out.println();
+        }
+        ObjectMapper objectMapper = new ObjectMapper();
+        objectMapper.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+        System.out.println("s2s is a command line tool that can either read a 
list of DataPackets from stdin to send over site-to-site or write the received 
DataPackets to stdout");
+        System.out.println();
+        System.out.println("The s2s cli input/output format is a JSON list of 
DataPackets.  They can have the following formats:");
+        try {
+            System.out.println();
+            objectMapper.writeValue(System.out, Arrays.asList(new 
DataPacketDto("hello 
nifi".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value")));
+            System.out.println();
+            System.out.println("Where data is the base64 encoded value of the 
FlowFile content (always used for received data) or");
+            System.out.println();
+            objectMapper.writeValue(System.out, Arrays.asList(new 
DataPacketDto(new HashMap<>(), new 
File("EXAMPLE").getAbsolutePath()).putAttribute("key", "value")));
+            System.out.println();
+            System.out.println("Where dataFile is a file to read the FlowFile 
content from");
+            System.out.println();
+            System.out.println();
+            System.out.println("Example usage to send a FlowFile with the 
contents of \"hey nifi\" to a local unsecured NiFi over http with an input port 
named input:");
+            System.out.print("echo '");
+            DataPacketDto dataPacketDto = new DataPacketDto("hey 
nifi".getBytes(StandardCharsets.UTF_8));
+            dataPacketDto.setAttributes(null);
+            objectMapper.writeValue(System.out, Arrays.asList(dataPacketDto));
+            System.out.println("' | bin/s2s.sh -n input -p http");
+            System.out.println();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        HelpFormatter helpFormatter = new HelpFormatter();
+        helpFormatter.setWidth(160);
+        helpFormatter.printHelp("s2s", options);
+        System.out.flush();
+    }
+
+    /**
+     * Parses command line options into a CliParse object
+     *
+     * @param options an empty options object (so callers can print usage if 
the parse fails
+     * @param args    the string array of arguments
+     * @return a CliParse object containing the constructed 
SiteToSiteClient.Builder and a TransferDirection
+     * @throws ParseException if there is an error parsing the command line
+     */
+    public static CliParse parseCli(Options options, String[] args) throws 
ParseException {
+        options.addOption("u", URL_OPTION, true, "NiFI URL to connect to 
(default: " + URL_OPTION_DEFAULT + ")");
+        options.addOption("d", DIRECTION_OPTION, true, "Direction (valid 
directions: "
+                + 
Arrays.stream(TransferDirection.values()).map(Object::toString).collect(Collectors.joining(",
 ")) + ") (default: " + DIRECTION_OPTION_DEFAULT + ")");
+        options.addOption("n", PORT_NAME_OPTION, true, "Port name");
+        options.addOption("i", PORT_IDENTIFIER_OPTION, true, "Port id");
+        options.addOption(null, TIMEOUT_OPTION, true, "Timeout");
+        options.addOption(null, PENALIZATION_OPTION, true, "Penalization 
period");
+        options.addOption(null, KEYSTORE_OPTION, true, "Keystore");
+        options.addOption(null, KEY_STORE_TYPE_OPTION, true, "Keystore type 
(default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
+        options.addOption(null, KEY_STORE_PASSWORD_OPTION, true, "Keystore 
password");
+        options.addOption(null, TRUST_STORE_OPTION, true, "Truststore");
+        options.addOption(null, TRUST_STORE_TYPE_OPTION, true, "Truststore 
type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
+        options.addOption(null, TRUST_STORE_PASSWORD_OPTION, true, "Truststore 
password");
+        options.addOption(null, NEED_CLIENT_AUTH_OPTION, false, "Need client 
auth");
+        options.addOption("c", COMPRESSION_OPTION, false, "Use compression");
+        options.addOption(null, PEER_PERSISTENCE_FILE_OPTION, true, "File to 
write peer information to so it can be recovered on restart");
+        options.addOption("p", TRANSPORT_PROTOCOL_OPTION, true, "Site to site 
transport protocol (default: " + TRANSPORT_PROTOCOL_OPTION_DEFAULT + ")");
+        options.addOption(null, BATCH_COUNT_OPTION, true, "Number of flow 
files in a batch");
+        options.addOption(null, BATCH_SIZE_OPTION, true, "Size of flow files 
in a batch");
+        options.addOption(null, BATCH_DURATION_OPTION, true, "Duration of a 
batch");
+        options.addOption(null, PROXY_HOST_OPTION, true, "Proxy hostname");
+        options.addOption(null, PROXY_PORT_OPTION, true, "Proxy port");
+        options.addOption(null, PROXY_USERNAME_OPTION, true, "Proxy username");
+        options.addOption(null, PROXY_PASSWORD_OPTION, true, "Proxy password");
+        options.addOption("h", HELP_OPTION, false, "Show help message and 
exit");
+        CommandLineParser parser = new DefaultParser();
+        CommandLine commandLine;
+        commandLine = parser.parse(options, args);
+        if (commandLine.hasOption(HELP_OPTION)) {
+            printUsage(null, options);
+            System.exit(1);
+        }
+        SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
+        builder.url(commandLine.getOptionValue(URL_OPTION, 
URL_OPTION_DEFAULT));
+        if (commandLine.hasOption(PORT_NAME_OPTION)) {
+            builder.portName(commandLine.getOptionValue(PORT_NAME_OPTION));
+        }
+        if (commandLine.hasOption(PORT_IDENTIFIER_OPTION)) {
+            
builder.portIdentifier(commandLine.getOptionValue(PORT_IDENTIFIER_OPTION));
+        }
+        if (commandLine.hasOption(TIMEOUT_OPTION)) {
+            
builder.timeout(FormatUtils.getTimeDuration(commandLine.getOptionValue(TIMEOUT_OPTION),
 TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        }
+        if (commandLine.hasOption(PENALIZATION_OPTION)) {
+            
builder.nodePenalizationPeriod(FormatUtils.getTimeDuration(commandLine.getOptionValue(PENALIZATION_OPTION),
 TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        }
+        if (commandLine.hasOption(KEYSTORE_OPTION)) {
+            
builder.keystoreFilename(commandLine.getOptionValue(KEYSTORE_OPTION));
+            
builder.keystoreType(KeystoreType.valueOf(commandLine.getOptionValue(KEY_STORE_TYPE_OPTION,
 KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
+
+            if (commandLine.hasOption(KEY_STORE_PASSWORD_OPTION)) {
+                
builder.keystorePass(commandLine.getOptionValue(KEY_STORE_PASSWORD_OPTION));
+            } else {
+                throw new ParseException("Must specify keystore password");
+            }
+        }
+        if (commandLine.hasOption(TRUST_STORE_OPTION)) {
+            
builder.truststoreFilename(commandLine.getOptionValue(TRUST_STORE_OPTION));
+            
builder.truststoreType(KeystoreType.valueOf(commandLine.getOptionValue(TRUST_STORE_TYPE_OPTION,
 KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
+
+            if (commandLine.hasOption(TRUST_STORE_PASSWORD_OPTION)) {
+                
builder.truststorePass(commandLine.getOptionValue(TRUST_STORE_PASSWORD_OPTION));
+            } else {
+                throw new ParseException("Must specify truststore password");
+            }
+        }
+        if (commandLine.hasOption(COMPRESSION_OPTION)) {
+            builder.useCompression(true);
+        } else {
+            builder.useCompression(false);
+        }
+        if (commandLine.hasOption(PEER_PERSISTENCE_FILE_OPTION)) {
+            builder.peerPersistenceFile(new 
File(commandLine.getOptionValue(PEER_PERSISTENCE_FILE_OPTION)));
+        }
+        if (commandLine.hasOption(BATCH_COUNT_OPTION)) {
+            
builder.requestBatchCount(Integer.parseInt(commandLine.getOptionValue(BATCH_COUNT_OPTION)));
+        }
+        if (commandLine.hasOption(BATCH_SIZE_OPTION)) {
+            
builder.requestBatchSize(Long.parseLong(commandLine.getOptionValue(BATCH_SIZE_OPTION)));
+        }
+        if (commandLine.hasOption(BATCH_DURATION_OPTION)) {
+            
builder.requestBatchDuration(FormatUtils.getTimeDuration(commandLine.getOptionValue(BATCH_DURATION_OPTION),
 TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        }
+        if (commandLine.hasOption(PROXY_HOST_OPTION)) {
+            builder.httpProxy(new 
HttpProxy(commandLine.getOptionValue(PROXY_HOST_OPTION), 
Integer.parseInt(commandLine.getOptionValue(PROXY_PORT_OPTION, 
PROXY_PORT_OPTION_DEFAULT)),
+                    commandLine.getOptionValue(PROXY_USERNAME_OPTION), 
commandLine.getOptionValue(PROXY_PASSWORD_OPTION)));
+        }
+        
builder.transportProtocol(SiteToSiteTransportProtocol.valueOf(commandLine.getOptionValue(TRANSPORT_PROTOCOL_OPTION,
 TRANSPORT_PROTOCOL_OPTION_DEFAULT).toUpperCase()));
+        TransferDirection transferDirection = 
TransferDirection.valueOf(commandLine.getOptionValue(DIRECTION_OPTION, 
DIRECTION_OPTION_DEFAULT));
+        return new CliParse() {
+            @Override
+            public SiteToSiteClient.Builder getBuilder() {
+                return builder;
+            }
+
+            @Override
+            public TransferDirection getTransferDirection() {
+                return transferDirection;
+            }
+        };
+    }
+
+    public static void main(String[] args) {
+        // Make IO redirection useful
+        PrintStream output = System.out;
+        System.setOut(System.err);
+        Options options = new Options();
+        try {
+            CliParse cliParse = parseCli(options, args);
+            try (SiteToSiteClient siteToSiteClient = 
cliParse.getBuilder().build()) {
+                if (cliParse.getTransferDirection() == TransferDirection.SEND) 
{
+                    new SiteToSiteSender(siteToSiteClient, 
System.in).sendFiles();
+                } else {
+                    new SiteToSiteReceiver(siteToSiteClient, 
output).receiveFiles();
+                }
+            }
+        } catch (Exception e) {
+            printUsage(e.getMessage(), options);
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Combines a SiteToSiteClient.Builder and TransferDirection into a return 
value for parseCli
+     */
+    public interface CliParse {
+        SiteToSiteClient.Builder getBuilder();
+
+        TransferDirection getTransferDirection();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
new file mode 100644
index 0000000..b68185f
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Class that will print received DataPackets to output
+ */
+public class SiteToSiteReceiver {
+    private final SiteToSiteClient siteToSiteClient;
+    private final OutputStream output;
+
+    public SiteToSiteReceiver(SiteToSiteClient siteToSiteClient, OutputStream 
output) {
+        this.siteToSiteClient = siteToSiteClient;
+        this.output = output;
+    }
+
+    public TransactionCompletion receiveFiles() throws IOException {
+        Transaction transaction = 
siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
+        JsonGenerator jsonGenerator = new 
JsonFactory().createJsonGenerator(output);
+        jsonGenerator.writeStartArray();
+        DataPacket dataPacket;
+        while ((dataPacket = transaction.receive()) != null) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeFieldName("attributes");
+            jsonGenerator.writeStartObject();
+            Map<String, String> attributes = dataPacket.getAttributes();
+            if (attributes != null) {
+                for (Map.Entry<String, String> stringStringEntry : 
attributes.entrySet()) {
+                    jsonGenerator.writeStringField(stringStringEntry.getKey(), 
stringStringEntry.getValue());
+                }
+            }
+            jsonGenerator.writeEndObject();
+            InputStream data = dataPacket.getData();
+            if (data != null) {
+                jsonGenerator.writeBinaryField("data", 
IOUtils.toByteArray(data));
+            }
+            jsonGenerator.writeEndObject();
+        }
+        jsonGenerator.writeEndArray();
+        jsonGenerator.close();
+        transaction.confirm();
+        return transaction.complete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
new file mode 100644
index 0000000..54c135a
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Class that will send DataPackets read from input
+ */
+public class SiteToSiteSender {
+    private final SiteToSiteClient siteToSiteClient;
+    private final InputStream input;
+
+    public SiteToSiteSender(SiteToSiteClient siteToSiteClient, InputStream 
input) {
+        this.siteToSiteClient = siteToSiteClient;
+        this.input = input;
+    }
+
+    public TransactionCompletion sendFiles() throws IOException {
+        Transaction transaction = 
siteToSiteClient.createTransaction(TransferDirection.SEND);
+        try {
+            DataPacketDto.getDataPacketStream(input).forEachOrdered(d -> {
+                try {
+                    transaction.send(d);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } catch (RuntimeException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof IOException) {
+                throw (IOException) cause;
+            }
+            throw new IOException(e.getMessage(), e);
+        }
+        transaction.confirm();
+        return transaction.complete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
new file mode 100644
index 0000000..6c27ce4
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class DataPacketDtoTest {
+    public static DataPacketDto create(byte[] data) {
+        return new DataPacketDto(new HashMap<>(), data);
+    }
+
+    @Test
+    public void testNoArgConstructor() {
+        DataPacketDto dataPacketDto = new DataPacketDto();
+        assertEquals(0, dataPacketDto.getAttributes().size());
+        assertNull(dataPacketDto.getData());
+    }
+
+    @Test
+    public void testGetSetAttributes() {
+        DataPacketDto dataPacketDto = create(null);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("key", "value");
+        dataPacketDto.setAttributes(attributes);
+        assertEquals(attributes, 
Collections.unmodifiableMap(dataPacketDto.getAttributes()));
+    }
+
+    @Test
+    public void testDataFileConstructor() {
+        String dataFile = "dataFile";
+        assertEquals(dataFile, new DataPacketDto(null, 
dataFile).getDataFile());
+    }
+
+    @Test
+    public void testParserNone() throws IOException {
+        List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new 
ByteArrayInputStream(("[]").getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
+        assertEquals(0, dataPackets.size());
+    }
+
+    @Test
+    public void testParserSingle() throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        StringBuilder stringBuilder = new StringBuilder("[");
+        DataPacketDto dataPacketDto = new DataPacketDto("test 
data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
+        stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
+        stringBuilder.append("]");
+        List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new 
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
+        assertEquals(1, dataPackets.size());
+        assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
+    }
+
+    @Test
+    public void testParserMultiple() throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        StringBuilder stringBuilder = new StringBuilder("[");
+        DataPacketDto dataPacketDto = new DataPacketDto("test 
data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
+        stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
+        DataPacketDto dataPacketDto2 = new DataPacketDto("test data 
2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2");
+        stringBuilder.append(",");
+        stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto2));
+        stringBuilder.append("]");
+        List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new 
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
+        assertEquals(2, dataPackets.size());
+        assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
+        assertEquals(dataPacketDto2.toDataPacket(), dataPackets.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
new file mode 100644
index 0000000..6dfcd38
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class DataPacketImplTest {
+    private Map<String, String> testAttributes;
+
+    @Before
+    public void setup() {
+        testAttributes = new HashMap<>();
+        testAttributes.put("testKey", "testVal");
+    }
+
+    @Test
+    public void testPacketNulls() throws IOException {
+        DataPacketImpl dataPacket = new DataPacketImpl(null, null, null);
+        assertEquals(0, dataPacket.getAttributes().size());
+        assertEquals(-1, dataPacket.getData().read(new byte[1]));
+        assertEquals(0, dataPacket.getSize());
+    }
+
+    @Test
+    public void testPacketAttributes() {
+        assertEquals(Collections.unmodifiableMap(testAttributes), new 
DataPacketImpl(testAttributes, null, null).getAttributes());
+    }
+
+    @Test
+    public void testPacketData() throws IOException {
+        byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
+        DataPacketImpl dataPacket = new DataPacketImpl(null, testData, null);
+        assertEquals(testData.length, dataPacket.getSize());
+        assertArrayEquals(testData, IOUtils.toByteArray(dataPacket.getData()));
+    }
+
+    @Test
+    public void testDataFile() throws IOException {
+        byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
+        File tempFile = File.createTempFile("abc", "def");
+        try {
+            try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempFile)) {
+                fileOutputStream.write(testData);
+            }
+            DataPacketImpl dataPacket = new DataPacketImpl(null, null, 
tempFile.getAbsolutePath());
+            assertEquals(testData.length, dataPacket.getSize());
+            try (InputStream input = dataPacket.getData()) {
+                assertArrayEquals(testData, IOUtils.toByteArray(input));
+            }
+        } finally {
+            if (!tempFile.delete()) {
+                tempFile.deleteOnExit();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
new file mode 100644
index 0000000..6f9e771
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.KeystoreType;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class SiteToSiteCliMainTest {
+    private String expectedUrl;
+    private TransferDirection expectedTransferDirection;
+    private SiteToSiteTransportProtocol expectedSiteToSiteTransportProtocol;
+    private String expectedPortName;
+    private String expectedPortIdentifier;
+    private long expectedTimeoutNs;
+    private long expectedPenalizationNs;
+    private String expectedKeystoreFilename;
+    private String expectedKeystorePass;
+    private KeystoreType expectedKeystoreType;
+    private String expectedTruststoreFilename;
+    private String expectedTruststorePass;
+    private KeystoreType expectedTruststoreType;
+    private boolean expectedCompression;
+    private File expectedPeerPersistenceFile;
+    private int expectedBatchCount;
+    private long expectedBatchDuration;
+    private long expectedBatchSize;
+    private HttpProxy expectedHttpProxy;
+
+    @Before
+    public void setup() {
+        SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
+        expectedUrl = SiteToSiteCliMain.URL_OPTION_DEFAULT;
+        expectedTransferDirection = 
TransferDirection.valueOf(SiteToSiteCliMain.DIRECTION_OPTION_DEFAULT);
+        expectedSiteToSiteTransportProtocol = 
SiteToSiteTransportProtocol.valueOf(SiteToSiteCliMain.TRANSPORT_PROTOCOL_OPTION_DEFAULT);
+        expectedPortName = builder.getPortName();
+        expectedPortIdentifier = builder.getPortIdentifier();
+        expectedTimeoutNs = builder.getTimeout(TimeUnit.NANOSECONDS);
+        expectedPenalizationNs = 
builder.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+        expectedKeystoreFilename = builder.getKeystoreFilename();
+        expectedKeystorePass = builder.getKeystorePass();
+        expectedKeystoreType = builder.getKeystoreType();
+        expectedTruststoreFilename = builder.getTruststoreFilename();
+        expectedTruststorePass = builder.getTruststorePass();
+        expectedTruststoreType = builder.getTruststoreType();
+        expectedCompression = false;
+        expectedPeerPersistenceFile = builder.getPeerPersistenceFile();
+        SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
+        expectedBatchCount = siteToSiteClientConfig.getPreferredBatchCount();
+        expectedBatchDuration = 
siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+        expectedBatchSize = siteToSiteClientConfig.getPreferredBatchSize();
+        expectedHttpProxy = siteToSiteClientConfig.getHttpProxy();
+    }
+
+    @Test
+    public void testParseNoArgs() throws ParseException {
+        parseAndCheckExpected(new String[0]);
+    }
+
+    @Test
+    public void testParseUrl() throws ParseException {
+        expectedUrl = "http://fake.url:8080/nifi";;
+        parseAndCheckExpected("u", SiteToSiteCliMain.URL_OPTION, expectedUrl);
+    }
+
+    @Test
+    public void testParsePortName() throws ParseException {
+        expectedPortName = "testPortName";
+        parseAndCheckExpected("n", SiteToSiteCliMain.PORT_NAME_OPTION, 
expectedPortName);
+    }
+
+    @Test
+    public void testParsePortIdentifier() throws ParseException {
+        expectedPortIdentifier = "testPortId";
+        parseAndCheckExpected("i", SiteToSiteCliMain.PORT_IDENTIFIER_OPTION, 
expectedPortIdentifier);
+    }
+
+    @Test
+    public void testParseTimeout() throws ParseException {
+        expectedTimeoutNs = TimeUnit.DAYS.toNanos(3);
+        parseAndCheckExpected(null, SiteToSiteCliMain.TIMEOUT_OPTION, "3 
days");
+    }
+
+    @Test
+    public void testParsePenalization() throws ParseException {
+        expectedPenalizationNs = TimeUnit.HOURS.toNanos(4);
+        parseAndCheckExpected(null, SiteToSiteCliMain.PENALIZATION_OPTION, "4 
hours");
+    }
+
+    @Test
+    public void testParseKeystore() throws ParseException {
+        expectedKeystoreFilename = "keystore.pkcs12";
+        expectedKeystorePass = "badPassword";
+        expectedKeystoreType = KeystoreType.PKCS12;
+        parseAndCheckExpected(new String[]{
+                "--" + SiteToSiteCliMain.KEYSTORE_OPTION, 
expectedKeystoreFilename,
+                "--" + SiteToSiteCliMain.KEY_STORE_PASSWORD_OPTION, 
expectedKeystorePass,
+                "--" + SiteToSiteCliMain.KEY_STORE_TYPE_OPTION, 
expectedKeystoreType.toString()
+        });
+    }
+
+    @Test
+    public void testParseTruststore() throws ParseException {
+        expectedTruststoreFilename = "truststore.pkcs12";
+        expectedTruststorePass = "badPassword";
+        expectedTruststoreType = KeystoreType.PKCS12;
+        parseAndCheckExpected(new String[]{
+                "--" + SiteToSiteCliMain.TRUST_STORE_OPTION, 
expectedTruststoreFilename,
+                "--" + SiteToSiteCliMain.TRUST_STORE_PASSWORD_OPTION, 
expectedTruststorePass,
+                "--" + SiteToSiteCliMain.TRUST_STORE_TYPE_OPTION, 
expectedTruststoreType.toString()
+        });
+    }
+
+    @Test
+    public void testParseCompression() throws ParseException {
+        expectedCompression = true;
+        parseAndCheckExpected("c", SiteToSiteCliMain.COMPRESSION_OPTION, null);
+    }
+
+    @Test
+    public void testParsePeerPersistenceFile() throws ParseException {
+        String pathname = "test";
+        expectedPeerPersistenceFile = new File(pathname);
+        parseAndCheckExpected(null, 
SiteToSiteCliMain.PEER_PERSISTENCE_FILE_OPTION, pathname);
+    }
+
+    @Test
+    public void testParseBatchCount() throws ParseException {
+        expectedBatchCount = 55;
+        parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_COUNT_OPTION, 
Integer.toString(expectedBatchCount));
+    }
+
+    @Test
+    public void testParseBatchDuration() throws ParseException {
+        expectedBatchDuration = TimeUnit.MINUTES.toNanos(5);
+        parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_DURATION_OPTION, 
"5 min");
+    }
+
+    @Test
+    public void testParseBatchSize() throws ParseException {
+        expectedBatchSize = 1026;
+        parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_SIZE_OPTION, 
Long.toString(expectedBatchSize));
+    }
+
+    @Test
+    public void testParseProxy() throws ParseException {
+        String expectedHost = "testHost";
+        int expectedPort = 292;
+        String expectedUser = "testUser";
+        String expectedPassword = "badPassword";
+        expectedHttpProxy = new HttpProxy(expectedHost, expectedPort, 
expectedUser, expectedPassword);
+        parseAndCheckExpected(new String[]{
+                "--" + SiteToSiteCliMain.PROXY_HOST_OPTION, expectedHost,
+                "--" + SiteToSiteCliMain.PROXY_PORT_OPTION, 
Integer.toString(expectedPort),
+                "--" + SiteToSiteCliMain.PROXY_USERNAME_OPTION, expectedUser,
+                "--" + SiteToSiteCliMain.PROXY_PASSWORD_OPTION, 
expectedPassword});
+    }
+
+    @Test
+    public void testParseTransferDirection() throws ParseException {
+        expectedTransferDirection = TransferDirection.RECEIVE;
+        parseAndCheckExpected("d", SiteToSiteCliMain.DIRECTION_OPTION, 
expectedTransferDirection.toString());
+    }
+
+    private void parseAndCheckExpected(String shortOption, String longOption, 
String value) throws ParseException {
+        if (shortOption != null) {
+            String[] args;
+            if (value == null) {
+                args = new String[]{"-" + shortOption};
+            } else {
+                args = new String[]{"-" + shortOption, value};
+            }
+            parseAndCheckExpected(args);
+        }
+        String[] args;
+        if (value == null) {
+            args = new String[]{"--" + longOption};
+        } else {
+            args = new String[]{"--" + longOption, value};
+        }
+        parseAndCheckExpected(args);
+    }
+
+    private void parseAndCheckExpected(String[] args) throws ParseException {
+        SiteToSiteCliMain.CliParse cliParse = SiteToSiteCliMain.parseCli(new 
Options(), args);
+        SiteToSiteClient.Builder builder = cliParse.getBuilder();
+        assertEquals(expectedUrl, builder.getUrl());
+        assertEquals(expectedSiteToSiteTransportProtocol, 
builder.getTransportProtocol());
+        assertEquals(expectedPortName, builder.getPortName());
+        assertEquals(expectedPortIdentifier, builder.getPortIdentifier());
+        assertEquals(expectedTimeoutNs, 
builder.getTimeout(TimeUnit.NANOSECONDS));
+        assertEquals(expectedPenalizationNs, 
builder.getPenalizationPeriod(TimeUnit.NANOSECONDS));
+        assertEquals(expectedKeystoreFilename, builder.getKeystoreFilename());
+        assertEquals(expectedKeystorePass, builder.getKeystorePass());
+        assertEquals(expectedKeystoreType, builder.getKeystoreType());
+        assertEquals(expectedTruststoreFilename, 
builder.getTruststoreFilename());
+        assertEquals(expectedTruststorePass, builder.getTruststorePass());
+        assertEquals(expectedTruststoreType, builder.getTruststoreType());
+        assertEquals(expectedCompression, builder.isUseCompression());
+        assertEquals(expectedPeerPersistenceFile, 
builder.getPeerPersistenceFile());
+        if (expectedHttpProxy == null) {
+            assertNull(builder.getHttpProxy());
+        } else {
+            HttpProxy httpProxy = builder.getHttpProxy();
+            assertNotNull(httpProxy);
+            assertEquals(expectedHttpProxy.getHttpHost(), 
httpProxy.getHttpHost());
+            assertEquals(expectedHttpProxy.getUsername(), 
httpProxy.getUsername());
+            assertEquals(expectedHttpProxy.getPassword(), 
httpProxy.getPassword());
+        }
+        SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
+        assertEquals(expectedBatchCount, 
siteToSiteClientConfig.getPreferredBatchCount());
+        assertEquals(expectedBatchDuration, 
siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS));
+        assertEquals(expectedBatchSize, 
siteToSiteClientConfig.getPreferredBatchSize());
+        assertEquals(expectedTransferDirection, 
cliParse.getTransferDirection());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
new file mode 100644
index 0000000..d5ea6be
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SiteToSiteReceiverTest {
+    private final ObjectMapper objectMapper = new 
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    @Mock
+    SiteToSiteClient siteToSiteClient;
+    @Mock
+    Transaction transaction;
+    @Mock
+    TransactionCompletion transactionCompletion;
+    ByteArrayOutputStream data;
+    private final Supplier<SiteToSiteReceiver> receiverSupplier = () -> new 
SiteToSiteReceiver(siteToSiteClient, data);
+    ByteArrayOutputStream expectedData;
+
+    @Before
+    public void setup() throws IOException {
+        data = new ByteArrayOutputStream();
+        expectedData = new ByteArrayOutputStream();
+        
when(siteToSiteClient.createTransaction(TransferDirection.RECEIVE)).thenReturn(transaction);
+        when(transaction.complete()).thenAnswer(invocation -> {
+            
verify(siteToSiteClient).createTransaction(TransferDirection.RECEIVE);
+            verify(transaction).confirm();
+            return transactionCompletion;
+        });
+    }
+
+    @Test
+    public void testEmpty() throws IOException {
+        assertEquals(transactionCompletion, 
receiverSupplier.get().receiveFiles());
+
+        objectMapper.writeValue(expectedData, Collections.emptyList());
+        assertEquals(expectedData.toString(), data.toString());
+    }
+
+    @Test
+    public void testSingle() throws IOException {
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(null);
+
+        assertEquals(transactionCompletion, 
receiverSupplier.get().receiveFiles());
+
+        objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto));
+        assertEquals(expectedData.toString(), data.toString());
+    }
+
+    @Test
+    public void testMulti() throws IOException {
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        DataPacketDto dataPacketDto2 = new 
DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2",
 "value2");
+        
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(dataPacketDto2.toDataPacket()).thenReturn(null);
+
+        assertEquals(transactionCompletion, 
receiverSupplier.get().receiveFiles());
+
+        objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto, 
dataPacketDto2));
+        assertEquals(expectedData.toString(), data.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
new file mode 100644
index 0000000..5a82b7c
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.toolkit.s2s;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SiteToSiteSenderTest {
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    @Mock
+    SiteToSiteClient siteToSiteClient;
+    @Mock
+    Transaction transaction;
+    @Mock
+    TransactionCompletion transactionCompletion;
+    ByteArrayOutputStream data;
+    private final Supplier<SiteToSiteSender> senderSupplier = () -> new 
SiteToSiteSender(siteToSiteClient, new 
ByteArrayInputStream(data.toByteArray()));
+
+    @Before
+    public void setup() throws IOException {
+        data = new ByteArrayOutputStream();
+        
when(siteToSiteClient.createTransaction(TransferDirection.SEND)).thenReturn(transaction);
+        when(transaction.complete()).thenAnswer(invocation -> {
+            verify(siteToSiteClient).createTransaction(TransferDirection.SEND);
+            verify(transaction).confirm();
+            return transactionCompletion;
+        });
+    }
+
+    @Test
+    public void testEmptyList() throws IOException {
+        objectMapper.writeValue(data, Collections.emptyList());
+        assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
+        verify(transaction, never()).send(any(DataPacket.class));
+        verify(transaction).complete();
+        verifyNoMoreInteractions(siteToSiteClient, transaction, 
transactionCompletion);
+    }
+
+    @Test
+    public void testSingleElement() throws IOException {
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        objectMapper.writeValue(data, Arrays.stream(new 
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
+        assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
+        verify(transaction).send(dataPacketDto.toDataPacket());
+        verify(transaction).complete();
+        verifyNoMoreInteractions(siteToSiteClient, transaction, 
transactionCompletion);
+    }
+
+    @Test
+    public void testMultipleElements() throws IOException {
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        DataPacketDto dataPacketDto2 = new 
DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2",
 "value2");
+        objectMapper.writeValue(data, Arrays.stream(new 
DataPacketDto[]{dataPacketDto, dataPacketDto2}).collect(Collectors.toList()));
+        assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
+        verify(transaction).send(dataPacketDto.toDataPacket());
+        verify(transaction).send(dataPacketDto2.toDataPacket());
+        verify(transaction).complete();
+        verifyNoMoreInteractions(siteToSiteClient, transaction, 
transactionCompletion);
+    }
+
+    @Test(expected = IOException.class)
+    public void testIOException() throws IOException {
+        IOException test = new IOException("test");
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        objectMapper.writeValue(data, Arrays.stream(new 
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
+        doThrow(test).when(transaction).send(any(DataPacket.class));
+        try {
+            senderSupplier.get().sendFiles();
+        } catch (IOException e) {
+            assertEquals(test, e);
+            throw e;
+        }
+    }
+
+    @Test(expected = IOException.class)
+    public void testRuntimeException() throws IOException {
+        RuntimeException test = new RuntimeException("test");
+        DataPacketDto dataPacketDto = new 
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", 
"value");
+        objectMapper.writeValue(data, Arrays.stream(new 
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
+        doThrow(test).when(transaction).send(any(DataPacket.class));
+        try {
+            senderSupplier.get().sendFiles();
+        } catch (IOException e) {
+            assertEquals(test, e.getCause());
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml
index debc51d..ca0f352 100644
--- a/nifi-toolkit/pom.xml
+++ b/nifi-toolkit/pom.xml
@@ -25,6 +25,7 @@
     <modules>
         <module>nifi-toolkit-tls</module>
         <module>nifi-toolkit-encrypt-config</module>
+        <module>nifi-toolkit-s2s</module>
         <module>nifi-toolkit-assembly</module>
     </modules>
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 22a35cd..cad5a24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -894,6 +894,11 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-toolkit-s2s</artifactId>
+                <version>1.1.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-resources</artifactId>
                 <version>1.1.0-SNAPSHOT</version>
                 <classifier>resources</classifier>

Reply via email to