This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ce484a0418 NIFI-11307 Removed S2S Toolkit (#7060)
ce484a0418 is described below
commit ce484a04185f2863e24269eafd03bf71f837d0c2
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Mar 29 09:32:01 2023 -0500
NIFI-11307 Removed S2S Toolkit (#7060)
---
nifi-docs/src/main/asciidoc/toolkit-guide.adoc | 49 ----
nifi-toolkit/nifi-toolkit-assembly/pom.xml | 5 -
.../src/main/resources/bin/s2s.bat | 41 ----
.../src/main/resources/bin/s2s.sh | 119 ----------
nifi-toolkit/nifi-toolkit-s2s/pom.xml | 48 ----
.../org/apache/nifi/toolkit/s2s/DataPacketDto.java | 136 -----------
.../apache/nifi/toolkit/s2s/DataPacketImpl.java | 97 --------
.../apache/nifi/toolkit/s2s/SiteToSiteCliMain.java | 259 ---------------------
.../nifi/toolkit/s2s/SiteToSiteReceiver.java | 73 ------
.../apache/nifi/toolkit/s2s/SiteToSiteSender.java | 60 -----
.../apache/nifi/toolkit/s2s/DataPacketDtoTest.java | 96 --------
.../nifi/toolkit/s2s/DataPacketImplTest.java | 85 -------
.../nifi/toolkit/s2s/SiteToSiteCliMainTest.java | 245 -------------------
.../nifi/toolkit/s2s/SiteToSiteReceiverTest.java | 99 --------
.../nifi/toolkit/s2s/SiteToSiteSenderTest.java | 132 -----------
nifi-toolkit/pom.xml | 1 -
16 files changed, 1545 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
index 6e24472adf..ff75608f19 100644
--- a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
@@ -28,7 +28,6 @@ The NiFi Toolkit contains several command line utilities to
setup and support Ni
* Flow Analyzer -- The `flow-analyzer` tool produces a report that helps
administrators understand the max amount of data which can be stored in
backpressure for a given flow.
* Node Manager -- The `node-manager` tool enables administrators to perform
status checks on nodes as well as the ability to connect, disconnect, or remove
nodes from the cluster.
* Notify -- The `notify` tool enables administrators to send bulletins to the
NiFi UI.
-* S2S -- The `s2s` tool enables administrators to send data into or out of
NiFi flows over site-to-site.
* TLS Toolkit -- The `tls-toolkit` utility generates the required keystores,
truststore, and relevant configuration files to facilitate the setup of a
secure NiFi instance.
* ZooKeeper Migrator -- The `zk-migrator` tool enables administrators to:
** move ZooKeeper information from one ZooKeeper cluster to another
@@ -965,54 +964,6 @@ Executing the above command line should result in a
bulletin appearing in NiFi:
image::nifi-notifications.png["NiFi Notifications"]
-== S2S
-S2S is a command line tool (invoked as `./bin/s2s.sh` or `bin\s2s.bat`) that
can either read a list of DataPackets from stdin to send over site-to-site or
write the received DataPackets to stdout.
-
-=== Usage
-To show help:
-
- ./bin/s2s.sh -h
-
-The following are available options:
-
-* `--batchCount <arg>` Number of flow files in a batch
-* `--batchDuration <arg>` Duration of a batch
-* `--batchSize <arg>` Size of flow files in a batch
-* `-c`,`--compression` Use compression
-* `-d`,`--direction` Direction (valid directions: `SEND`,
`RECEIVE`) (default: `SEND`)
-* `-h`,`--help` Help Text (optional)
-* `-i`,`--portIdentifier <arg>` Port id
-* `--keystore <arg>` Keystore
-* `--keyStorePassword <arg>` Keystore password
-* `--keyStoreType <arg>` Keystore type (default: `JKS`)
-* `-n`,`--portName` Port name
-* `-p`,`--transportProtocol` Site to site transport protocol (default:
`RAW`)
-* `--peerPersistenceFile <arg>` File to write peer information to so it can
be recovered on restart
-* `--penalization <arg>` Penalization period
-* `--proxyHost <arg>` Proxy hostname
-* `--proxyPassword <arg>` Proxy password
-* `--proxyPort <arg>` Proxy port
-* `--proxyUsername <arg>` Proxy username
-* `--timeout <arg>` Timeout
-* `--trustStore <arg>` Truststore
-* `--trustStorePassword <arg>` Truststore password
-* `--trustStoreType <arg>` Truststore type (default: `JKS`)
-* `-u,--url <arg>` NiFI URL to connect to (default:
`http://localhost:8080/nifi`)
-
-The s2s cli input/output format is a JSON list of DataPackets. They can have
the following formats:
-
- [{"attributes":{"key":"value"},"data":"aGVsbG8gbmlmaQ=="}]
-
-where data is the base64 encoded value of the FlowFile content (always used
for received data) or:
-
-
[{"attributes":{"key":"value"},"dataFile":"/Users/pvillard/Documents/GitHub/nifi/nifi-toolkit/nifi-toolkit-assembly/target/nifi-toolkit-1.9.0-SNAPSHOT-bin/nifi-toolkit-1.9.0-SNAPSHOT/bin/EXAMPLE"}]
-
-where dataFile is a file to read the FlowFile content from.
-
-Example usage to send a FlowFile with the contents of "hey nifi" to a local
unsecured NiFi over http with an input port named "input":
-
- echo '[{"data":"aGV5IG5pZmk="}]' | bin/s2s.sh -n input -p http
-
[[tls_toolkit]]
== TLS Toolkit
In order to facilitate the secure setup of NiFi, you can use the `tls-toolkit`
command line utility to automatically generate the required keystores,
truststore, and relevant configuration files. This is especially useful for
securing multiple NiFi nodes, which can be a tedious and error-prone process.
diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
index 44e407907c..22561173bd 100644
--- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
@@ -73,11 +73,6 @@ language governing permissions and limitations under the
License. -->
<artifactId>nifi-toolkit-encrypt-config</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-toolkit-s2s</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-admin</artifactId>
diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
deleted file mode 100644
index ce27b8515c..0000000000
--- a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat
+++ /dev/null
@@ -1,41 +0,0 @@
-@echo off
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem Use JAVA_HOME if it's set; otherwise, just use java
-
-if "%JAVA_HOME%" == "" goto noJavaHome
-if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
-set JAVA_EXE=%JAVA_HOME%\bin\java.exe
-goto startConfig
-
-:noJavaHome
-echo The JAVA_HOME environment variable is not defined correctly.
-echo Instead the PATH will be used to find the java executable.
-echo.
-set JAVA_EXE=java
-goto startConfig
-
-:startConfig
-set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
-
-if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms128m -Xmx256m
-
-SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS%
org.apache.nifi.toolkit.s2s.SiteToSiteCliMain
-
-cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
-
diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
deleted file mode 100644
index 077fb77894..0000000000
--- a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-# Script structure inspired from Apache Karaf and other Apache projects with
similar startup approaches
-
-SCRIPT_DIR=$(dirname "$0")
-SCRIPT_NAME=$(basename "$0")
-NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
-PROGNAME=$(basename "$0")
-
-
-warn() {
- (>&2 echo "${PROGNAME}: $*")
-}
-
-die() {
- warn "$*"
- exit 1
-}
-
-detectOS() {
- # OS specific support (must be 'true' or 'false').
- cygwin=false;
- aix=false;
- os400=false;
- darwin=false;
- case "$(uname)" in
- CYGWIN*)
- cygwin=true
- ;;
- AIX*)
- aix=true
- ;;
- OS400*)
- os400=true
- ;;
- Darwin)
- darwin=true
- ;;
- esac
- # For AIX, set an environment variable
- if ${aix}; then
- export LDR_CNTRL=MAXDATA=0xB0000000@DSA
- echo ${LDR_CNTRL}
- fi
-}
-
-locateJava() {
- # Setup the Java Virtual Machine
- if $cygwin ; then
- [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
- [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
- fi
-
- if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=$(java-config --jre-home)
- fi
- if [ "x${JAVA}" = "x" ]; then
- if [ "x${JAVA_HOME}" != "x" ]; then
- if [ ! -d "${JAVA_HOME}" ]; then
- die "JAVA_HOME is not valid: ${JAVA_HOME}"
- fi
- JAVA="${JAVA_HOME}/bin/java"
- else
- warn "JAVA_HOME not set; results may vary"
- JAVA=$(type java)
- JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
- if [ "x${JAVA}" = "x" ]; then
- die "java command not found"
- fi
- fi
- fi
-}
-
-init() {
- # Determine if there is special OS handling we must perform
- detectOS
-
- # Locate the Java VM to execute
- locateJava "$1"
-}
-
-run() {
- LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
-
- sudo_cmd_prefix=""
- if $cygwin; then
- NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
- CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows
"${LIBS}")"
- else
- CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
- fi
-
- export JAVA_HOME="$JAVA_HOME"
- export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
-
- umask 0077
- exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms128m -Xmx256m}
org.apache.nifi.toolkit.s2s.SiteToSiteCliMain "$@"
-}
-
-
-init "$1"
-run "$@"
diff --git a/nifi-toolkit/nifi-toolkit-s2s/pom.xml
b/nifi-toolkit/nifi-toolkit-s2s/pom.xml
deleted file mode 100644
index d3a34df351..0000000000
--- a/nifi-toolkit/nifi-toolkit-s2s/pom.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-toolkit</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-toolkit-s2s</artifactId>
- <description>Site-to-site cli</description>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-site-to-site-client</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
deleted file mode 100644
index 8d7e7b3e50..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.remote.protocol.DataPacket;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * DTO object for serializing and deserializing DataPackets via JSON
- */
-public class DataPacketDto {
- public static final TypeReference<DataPacketDto>
DATA_PACKET_DTO_TYPE_REFERENCE = new TypeReference<DataPacketDto>() {
- };
- private Map<String, String> attributes;
- private byte[] data;
- private String dataFile;
-
- public DataPacketDto() {
- this(null);
- }
-
- public DataPacketDto(byte[] data) {
- this(new HashMap<>(), data);
- }
-
- public DataPacketDto(Map<String, String> attributes, byte[] data) {
- this(attributes, data, null);
- }
-
- public DataPacketDto(Map<String, String> attributes, String dataFile) {
- this(attributes, null, dataFile);
- }
-
- public DataPacketDto(Map<String, String> attributes, byte[] data, String
dataFile) {
- this.attributes = attributes;
- this.data = data;
- this.dataFile = dataFile;
- }
-
- public static Stream<DataPacket> getDataPacketStream(InputStream
inputStream) throws IOException {
- JsonParser jsonParser = new JsonFactory().createParser(inputStream);
- if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
- throw new IOException("Expecting start array token to begin object
array.");
- }
- jsonParser.setCodec(new ObjectMapper());
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new
Iterator<DataPacket>() {
- DataPacket next = getNext();
-
- @Override
- public boolean hasNext() {
- return next != null;
- }
-
- @Override
- public DataPacket next() {
- DataPacket next = this.next;
- this.next = getNext();
- return next;
- }
-
- DataPacket getNext() throws RuntimeException {
- try {
- if (jsonParser.nextToken() == JsonToken.END_ARRAY) {
- return null;
- }
- DataPacketDto dataPacketDto =
jsonParser.readValueAs(DATA_PACKET_DTO_TYPE_REFERENCE);
- return dataPacketDto.toDataPacket();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }, Spliterator.ORDERED), false);
- }
-
- public Map<String, String> getAttributes() {
- return attributes;
- }
-
- public void setAttributes(Map<String, String> attributes) {
- this.attributes = attributes;
- }
-
- public byte[] getData() {
- return data;
- }
-
- public void setData(byte[] data) {
- this.data = data;
- }
-
- public String getDataFile() {
- return dataFile;
- }
-
- public void setDataFile(String dataFile) {
- this.dataFile = dataFile;
- }
-
- public DataPacket toDataPacket() {
- return new DataPacketImpl(attributes, data, dataFile);
- }
-
- public DataPacketDto putAttribute(String key, String value) {
- attributes.put(key, value);
- return this;
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
deleted file mode 100644
index 8675e33708..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import org.apache.nifi.remote.protocol.DataPacket;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements DataPacket either taking the data passed in or reading from a
file on disk
- */
-public class DataPacketImpl implements DataPacket {
- private final Map<String, String> attributes;
- private final byte[] data;
- private final String dataFile;
-
- public DataPacketImpl(Map<String, String> attributes, byte[] data, String
dataFile) {
- this.attributes = attributes == null ? Collections.emptyMap() :
Collections.unmodifiableMap(new HashMap<>(attributes));
- this.data = data;
- this.dataFile = dataFile;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return attributes;
- }
-
- @Override
- public InputStream getData() {
- if (data == null) {
- if (dataFile != null && dataFile.length() > 0) {
- try {
- return new FileInputStream(dataFile);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- return new ByteArrayInputStream(new byte[0]);
- }
- }
- return new ByteArrayInputStream(data);
- }
-
- @Override
- public long getSize() {
- if (data == null) {
- if (dataFile != null && dataFile.length() > 0) {
- return new File(dataFile).length();
- } else {
- return 0;
- }
- }
- return data.length;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- DataPacketImpl that = (DataPacketImpl) o;
-
- if (attributes != null ? !attributes.equals(that.attributes) :
that.attributes != null) return false;
- return Arrays.equals(data, that.data);
-
- }
-
- @Override
- public int hashCode() {
- int result = attributes != null ? attributes.hashCode() : 0;
- result = 31 * result + Arrays.hashCode(data);
- return result;
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
deleted file mode 100644
index e57dbbd544..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Value;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.KeystoreType;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.apache.nifi.util.FormatUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-public class SiteToSiteCliMain {
- public static final String URL_OPTION = "url";
- public static final String URL_OPTION_DEFAULT =
"http://localhost:8080/nifi";
- public static final String DIRECTION_OPTION = "direction";
- public static final String DIRECTION_OPTION_DEFAULT =
TransferDirection.SEND.toString();
- public static final String PORT_NAME_OPTION = "portName";
- public static final String PORT_IDENTIFIER_OPTION = "portIdentifier";
- public static final String TIMEOUT_OPTION = "timeout";
- public static final String PENALIZATION_OPTION = "penalization";
- public static final String KEYSTORE_OPTION = "keyStore";
- public static final String KEY_STORE_TYPE_OPTION = "keyStoreType";
- public static final String KEY_STORE_PASSWORD_OPTION = "keyStorePassword";
- public static final String TRUST_STORE_OPTION = "trustStore";
- public static final String TRUST_STORE_TYPE_OPTION = "trustStoreType";
- public static final String TRUST_STORE_PASSWORD_OPTION =
"trustStorePassword";
- public static final String PEER_PERSISTENCE_FILE_OPTION =
"peerPersistenceFile";
- public static final String COMPRESSION_OPTION = "compression";
- public static final String TRANSPORT_PROTOCOL_OPTION = "transportProtocol";
- public static final String TRANSPORT_PROTOCOL_OPTION_DEFAULT =
SiteToSiteTransportProtocol.RAW.toString();
- public static final String BATCH_COUNT_OPTION = "batchCount";
- public static final String BATCH_SIZE_OPTION = "batchSize";
- public static final String BATCH_DURATION_OPTION = "batchDuration";
- public static final String HELP_OPTION = "help";
- public static final String PROXY_HOST_OPTION = "proxyHost";
- public static final String PROXY_PORT_OPTION = "proxyPort";
- public static final String PROXY_USERNAME_OPTION = "proxyUsername";
- public static final String PROXY_PASSWORD_OPTION = "proxyPassword";
- public static final String PROXY_PORT_OPTION_DEFAULT = "80";
- public static final String KEYSTORE_TYPE_OPTION_DEFAULT =
KeystoreType.JKS.toString();
-
- /**
- * Prints the usage to System.out
- *
- * @param errorMessage optional error message
- * @param options the options object to print usage for
- */
- public static void printUsage(String errorMessage, Options options) {
- if (errorMessage != null) {
- System.out.println(errorMessage);
- System.out.println();
- System.out.println();
- }
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
-
objectMapper.setDefaultPropertyInclusion(Value.construct(JsonInclude.Include.NON_NULL,
JsonInclude.Include.ALWAYS));
- System.out.println("s2s is a command line tool that can either read a
list of DataPackets from stdin to send over site-to-site or write the received
DataPackets to stdout");
- System.out.println();
- System.out.println("The s2s cli input/output format is a JSON list of
DataPackets. They can have the following formats:");
- try {
- System.out.println();
- objectMapper.writeValue(System.out, Arrays.asList(new
DataPacketDto("hello
nifi".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value")));
- System.out.println();
- System.out.println("Where data is the base64 encoded value of the
FlowFile content (always used for received data) or");
- System.out.println();
- objectMapper.writeValue(System.out, Arrays.asList(new
DataPacketDto(new HashMap<>(), new
File("EXAMPLE").getAbsolutePath()).putAttribute("key", "value")));
- System.out.println();
- System.out.println("Where dataFile is a file to read the FlowFile
content from");
- System.out.println();
- System.out.println();
- System.out.println("Example usage to send a FlowFile with the
contents of \"hey nifi\" to a local unsecured NiFi over http with an input port
named input:");
- System.out.print("echo '");
- DataPacketDto dataPacketDto = new DataPacketDto("hey
nifi".getBytes(StandardCharsets.UTF_8));
- dataPacketDto.setAttributes(null);
- objectMapper.writeValue(System.out, Arrays.asList(dataPacketDto));
- System.out.println("' | bin/s2s.sh -n input -p http");
- System.out.println();
- } catch (IOException e) {
- e.printStackTrace();
- }
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setWidth(160);
- helpFormatter.printHelp("s2s", options);
- System.out.flush();
- }
-
- /**
- * Parses command line options into a CliParse object
- *
- * @param options an empty options object (so callers can print usage if
the parse fails
- * @param args the string array of arguments
- * @return a CliParse object containing the constructed
SiteToSiteClient.Builder and a TransferDirection
- * @throws ParseException if there is an error parsing the command line
- */
- public static CliParse parseCli(Options options, String[] args) throws
ParseException {
- options.addOption("u", URL_OPTION, true, "NiFI URL to connect to
(default: " + URL_OPTION_DEFAULT + ")");
- options.addOption("d", DIRECTION_OPTION, true, "Direction (valid
directions: "
- +
Arrays.stream(TransferDirection.values()).map(Object::toString).collect(Collectors.joining(",
")) + ") (default: " + DIRECTION_OPTION_DEFAULT + ")");
- options.addOption("n", PORT_NAME_OPTION, true, "Port name");
- options.addOption("i", PORT_IDENTIFIER_OPTION, true, "Port id");
- options.addOption(null, TIMEOUT_OPTION, true, "Timeout");
- options.addOption(null, PENALIZATION_OPTION, true, "Penalization
period");
- options.addOption(null, KEYSTORE_OPTION, true, "Keystore");
- options.addOption(null, KEY_STORE_TYPE_OPTION, true, "Keystore type
(default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
- options.addOption(null, KEY_STORE_PASSWORD_OPTION, true, "Keystore
password");
- options.addOption(null, TRUST_STORE_OPTION, true, "Truststore");
- options.addOption(null, TRUST_STORE_TYPE_OPTION, true, "Truststore
type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
- options.addOption(null, TRUST_STORE_PASSWORD_OPTION, true, "Truststore
password");
- options.addOption("c", COMPRESSION_OPTION, false, "Use compression");
- options.addOption(null, PEER_PERSISTENCE_FILE_OPTION, true, "File to
write peer information to so it can be recovered on restart");
- options.addOption("p", TRANSPORT_PROTOCOL_OPTION, true, "Site to site
transport protocol (default: " + TRANSPORT_PROTOCOL_OPTION_DEFAULT + ")");
- options.addOption(null, BATCH_COUNT_OPTION, true, "Number of flow
files in a batch");
- options.addOption(null, BATCH_SIZE_OPTION, true, "Size of flow files
in a batch");
- options.addOption(null, BATCH_DURATION_OPTION, true, "Duration of a
batch");
- options.addOption(null, PROXY_HOST_OPTION, true, "Proxy hostname");
- options.addOption(null, PROXY_PORT_OPTION, true, "Proxy port");
- options.addOption(null, PROXY_USERNAME_OPTION, true, "Proxy username");
- options.addOption(null, PROXY_PASSWORD_OPTION, true, "Proxy password");
- options.addOption("h", HELP_OPTION, false, "Show help message and
exit");
- CommandLineParser parser = new DefaultParser();
- CommandLine commandLine;
- commandLine = parser.parse(options, args);
- if (commandLine.hasOption(HELP_OPTION)) {
- printUsage(null, options);
- System.exit(1);
- }
- SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
- builder.url(commandLine.getOptionValue(URL_OPTION,
URL_OPTION_DEFAULT));
- if (commandLine.hasOption(PORT_NAME_OPTION)) {
- builder.portName(commandLine.getOptionValue(PORT_NAME_OPTION));
- }
- if (commandLine.hasOption(PORT_IDENTIFIER_OPTION)) {
-
builder.portIdentifier(commandLine.getOptionValue(PORT_IDENTIFIER_OPTION));
- }
- if (commandLine.hasOption(TIMEOUT_OPTION)) {
-
builder.timeout(FormatUtils.getTimeDuration(commandLine.getOptionValue(TIMEOUT_OPTION),
TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- }
- if (commandLine.hasOption(PENALIZATION_OPTION)) {
-
builder.nodePenalizationPeriod(FormatUtils.getTimeDuration(commandLine.getOptionValue(PENALIZATION_OPTION),
TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- }
- if (commandLine.hasOption(KEYSTORE_OPTION)) {
-
builder.keystoreFilename(commandLine.getOptionValue(KEYSTORE_OPTION));
-
builder.keystoreType(KeystoreType.valueOf(commandLine.getOptionValue(KEY_STORE_TYPE_OPTION,
KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
-
- if (commandLine.hasOption(KEY_STORE_PASSWORD_OPTION)) {
-
builder.keystorePass(commandLine.getOptionValue(KEY_STORE_PASSWORD_OPTION));
- } else {
- throw new ParseException("Must specify keystore password");
- }
- }
- if (commandLine.hasOption(TRUST_STORE_OPTION)) {
-
builder.truststoreFilename(commandLine.getOptionValue(TRUST_STORE_OPTION));
-
builder.truststoreType(KeystoreType.valueOf(commandLine.getOptionValue(TRUST_STORE_TYPE_OPTION,
KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
-
- if (commandLine.hasOption(TRUST_STORE_PASSWORD_OPTION)) {
-
builder.truststorePass(commandLine.getOptionValue(TRUST_STORE_PASSWORD_OPTION));
- } else {
- throw new ParseException("Must specify truststore password");
- }
- }
- if (commandLine.hasOption(COMPRESSION_OPTION)) {
- builder.useCompression(true);
- } else {
- builder.useCompression(false);
- }
- if (commandLine.hasOption(PEER_PERSISTENCE_FILE_OPTION)) {
- builder.peerPersistenceFile(new
File(commandLine.getOptionValue(PEER_PERSISTENCE_FILE_OPTION)));
- }
- if (commandLine.hasOption(BATCH_COUNT_OPTION)) {
-
builder.requestBatchCount(Integer.parseInt(commandLine.getOptionValue(BATCH_COUNT_OPTION)));
- }
- if (commandLine.hasOption(BATCH_SIZE_OPTION)) {
-
builder.requestBatchSize(Long.parseLong(commandLine.getOptionValue(BATCH_SIZE_OPTION)));
- }
- if (commandLine.hasOption(BATCH_DURATION_OPTION)) {
-
builder.requestBatchDuration(FormatUtils.getTimeDuration(commandLine.getOptionValue(BATCH_DURATION_OPTION),
TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- }
- if (commandLine.hasOption(PROXY_HOST_OPTION)) {
- builder.httpProxy(new
HttpProxy(commandLine.getOptionValue(PROXY_HOST_OPTION),
Integer.parseInt(commandLine.getOptionValue(PROXY_PORT_OPTION,
PROXY_PORT_OPTION_DEFAULT)),
- commandLine.getOptionValue(PROXY_USERNAME_OPTION),
commandLine.getOptionValue(PROXY_PASSWORD_OPTION)));
- }
-
builder.transportProtocol(SiteToSiteTransportProtocol.valueOf(commandLine.getOptionValue(TRANSPORT_PROTOCOL_OPTION,
TRANSPORT_PROTOCOL_OPTION_DEFAULT).toUpperCase()));
- TransferDirection transferDirection =
TransferDirection.valueOf(commandLine.getOptionValue(DIRECTION_OPTION,
DIRECTION_OPTION_DEFAULT));
- return new CliParse() {
- @Override
- public SiteToSiteClient.Builder getBuilder() {
- return builder;
- }
-
- @Override
- public TransferDirection getTransferDirection() {
- return transferDirection;
- }
- };
- }
-
- public static void main(String[] args) {
- // Make IO redirection useful
- PrintStream output = System.out;
- System.setOut(System.err);
- Options options = new Options();
- try {
- CliParse cliParse = parseCli(options, args);
- try (SiteToSiteClient siteToSiteClient =
cliParse.getBuilder().build()) {
- if (cliParse.getTransferDirection() == TransferDirection.SEND)
{
- new SiteToSiteSender(siteToSiteClient,
System.in).sendFiles();
- } else {
- new SiteToSiteReceiver(siteToSiteClient,
output).receiveFiles();
- }
- }
- } catch (Exception e) {
- printUsage(e.getMessage(), options);
- e.printStackTrace();
- }
- }
-
- /**
- * Combines a SiteToSiteClient.Builder and TransferDirection into a return
value for parseCli
- */
- public interface CliParse {
- SiteToSiteClient.Builder getBuilder();
-
- TransferDirection getTransferDirection();
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
deleted file mode 100644
index 88ad8f3781..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransactionCompletion;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.protocol.DataPacket;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * Class that will print received DataPackets to output
- */
-public class SiteToSiteReceiver {
- private final SiteToSiteClient siteToSiteClient;
- private final OutputStream output;
-
- public SiteToSiteReceiver(SiteToSiteClient siteToSiteClient, OutputStream
output) {
- this.siteToSiteClient = siteToSiteClient;
- this.output = output;
- }
-
- public TransactionCompletion receiveFiles() throws IOException {
- Transaction transaction =
siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
- JsonGenerator jsonGenerator = new
JsonFactory().createJsonGenerator(output);
- jsonGenerator.writeStartArray();
- DataPacket dataPacket;
- while ((dataPacket = transaction.receive()) != null) {
- jsonGenerator.writeStartObject();
- jsonGenerator.writeFieldName("attributes");
- jsonGenerator.writeStartObject();
- Map<String, String> attributes = dataPacket.getAttributes();
- if (attributes != null) {
- for (Map.Entry<String, String> stringStringEntry :
attributes.entrySet()) {
- jsonGenerator.writeStringField(stringStringEntry.getKey(),
stringStringEntry.getValue());
- }
- }
- jsonGenerator.writeEndObject();
- InputStream data = dataPacket.getData();
- if (data != null) {
- jsonGenerator.writeBinaryField("data",
IOUtils.toByteArray(data));
- }
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndArray();
- jsonGenerator.close();
- transaction.confirm();
- return transaction.complete();
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
deleted file mode 100644
index 54c135a23a..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransactionCompletion;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Class that will send DataPackets read from input
- */
-public class SiteToSiteSender {
- private final SiteToSiteClient siteToSiteClient;
- private final InputStream input;
-
- public SiteToSiteSender(SiteToSiteClient siteToSiteClient, InputStream
input) {
- this.siteToSiteClient = siteToSiteClient;
- this.input = input;
- }
-
- public TransactionCompletion sendFiles() throws IOException {
- Transaction transaction =
siteToSiteClient.createTransaction(TransferDirection.SEND);
- try {
- DataPacketDto.getDataPacketStream(input).forEachOrdered(d -> {
- try {
- transaction.send(d);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (RuntimeException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- throw (IOException) cause;
- }
- throw new IOException(e.getMessage(), e);
- }
- transaction.confirm();
- return transaction.complete();
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
deleted file mode 100644
index 6ca6354a33..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.junit.jupiter.api.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-public class DataPacketDtoTest {
- public static DataPacketDto create(byte[] data) {
- return new DataPacketDto(new HashMap<>(), data);
- }
-
- @Test
- public void testNoArgConstructor() {
- DataPacketDto dataPacketDto = new DataPacketDto();
- assertEquals(0, dataPacketDto.getAttributes().size());
- assertNull(dataPacketDto.getData());
- }
-
- @Test
- public void testGetSetAttributes() {
- DataPacketDto dataPacketDto = create(null);
- Map<String, String> attributes = new HashMap<>();
- attributes.put("key", "value");
- dataPacketDto.setAttributes(attributes);
- assertEquals(attributes,
Collections.unmodifiableMap(dataPacketDto.getAttributes()));
- }
-
- @Test
- public void testDataFileConstructor() {
- String dataFile = "dataFile";
- assertEquals(dataFile, new DataPacketDto(null,
dataFile).getDataFile());
- }
-
- @Test
- public void testParserNone() throws IOException {
- List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new
ByteArrayInputStream(("[]").getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
- assertEquals(0, dataPackets.size());
- }
-
- @Test
- public void testParserSingle() throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- StringBuilder stringBuilder = new StringBuilder("[");
- DataPacketDto dataPacketDto = new DataPacketDto("test
data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
- stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
- stringBuilder.append("]");
- List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
- assertEquals(1, dataPackets.size());
- assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
- }
-
- @Test
- public void testParserMultiple() throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- StringBuilder stringBuilder = new StringBuilder("[");
- DataPacketDto dataPacketDto = new DataPacketDto("test
data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
- stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
- DataPacketDto dataPacketDto2 = new DataPacketDto("test data
2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2");
- stringBuilder.append(",");
- stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto2));
- stringBuilder.append("]");
- List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
- assertEquals(2, dataPackets.size());
- assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
- assertEquals(dataPacketDto2.toDataPacket(), dataPackets.get(1));
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
deleted file mode 100644
index 628a2ee4df..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import org.apache.commons.io.IOUtils;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class DataPacketImplTest {
- private Map<String, String> testAttributes;
-
- @BeforeEach
- public void setup() {
- testAttributes = new HashMap<>();
- testAttributes.put("testKey", "testVal");
- }
-
- @Test
- public void testPacketNulls() throws IOException {
- DataPacketImpl dataPacket = new DataPacketImpl(null, null, null);
- assertEquals(0, dataPacket.getAttributes().size());
- assertEquals(-1, dataPacket.getData().read(new byte[1]));
- assertEquals(0, dataPacket.getSize());
- }
-
- @Test
- public void testPacketAttributes() {
- assertEquals(Collections.unmodifiableMap(testAttributes), new
DataPacketImpl(testAttributes, null, null).getAttributes());
- }
-
- @Test
- public void testPacketData() throws IOException {
- byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
- DataPacketImpl dataPacket = new DataPacketImpl(null, testData, null);
- assertEquals(testData.length, dataPacket.getSize());
- assertArrayEquals(testData, IOUtils.toByteArray(dataPacket.getData()));
- }
-
- @Test
- public void testDataFile() throws IOException {
- byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
- File tempFile = File.createTempFile("abc", "def");
- try {
- try (FileOutputStream fileOutputStream = new
FileOutputStream(tempFile)) {
- fileOutputStream.write(testData);
- }
- DataPacketImpl dataPacket = new DataPacketImpl(null, null,
tempFile.getAbsolutePath());
- assertEquals(testData.length, dataPacket.getSize());
- try (InputStream input = dataPacket.getData()) {
- assertArrayEquals(testData, IOUtils.toByteArray(input));
- }
- } finally {
- if (!tempFile.delete()) {
- tempFile.deleteOnExit();
- }
- }
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
deleted file mode 100644
index ee05460a62..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.KeystoreType;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
-import org.apache.nifi.remote.protocol.http.HttpProxy;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-public class SiteToSiteCliMainTest {
- private String expectedUrl;
- private TransferDirection expectedTransferDirection;
- private SiteToSiteTransportProtocol expectedSiteToSiteTransportProtocol;
- private String expectedPortName;
- private String expectedPortIdentifier;
- private long expectedTimeoutNs;
- private long expectedPenalizationNs;
- private String expectedKeystoreFilename;
- private String expectedKeystorePass;
- private KeystoreType expectedKeystoreType;
- private String expectedTruststoreFilename;
- private String expectedTruststorePass;
- private KeystoreType expectedTruststoreType;
- private boolean expectedCompression;
- private File expectedPeerPersistenceFile;
- private int expectedBatchCount;
- private long expectedBatchDuration;
- private long expectedBatchSize;
- private HttpProxy expectedHttpProxy;
-
- @BeforeEach
- public void setup() {
- SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
- expectedUrl = SiteToSiteCliMain.URL_OPTION_DEFAULT;
- expectedTransferDirection =
TransferDirection.valueOf(SiteToSiteCliMain.DIRECTION_OPTION_DEFAULT);
- expectedSiteToSiteTransportProtocol =
SiteToSiteTransportProtocol.valueOf(SiteToSiteCliMain.TRANSPORT_PROTOCOL_OPTION_DEFAULT);
- expectedPortName = builder.getPortName();
- expectedPortIdentifier = builder.getPortIdentifier();
- expectedTimeoutNs = builder.getTimeout(TimeUnit.NANOSECONDS);
- expectedPenalizationNs =
builder.getPenalizationPeriod(TimeUnit.NANOSECONDS);
- expectedKeystoreFilename = builder.getKeystoreFilename();
- expectedKeystorePass = builder.getKeystorePass();
- expectedKeystoreType = builder.getKeystoreType();
- expectedTruststoreFilename = builder.getTruststoreFilename();
- expectedTruststorePass = builder.getTruststorePass();
- expectedTruststoreType = builder.getTruststoreType();
- expectedCompression = false;
- expectedPeerPersistenceFile = builder.getPeerPersistenceFile();
- SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
- expectedBatchCount = siteToSiteClientConfig.getPreferredBatchCount();
- expectedBatchDuration =
siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
- expectedBatchSize = siteToSiteClientConfig.getPreferredBatchSize();
- expectedHttpProxy = siteToSiteClientConfig.getHttpProxy();
- }
-
- @Test
- public void testParseNoArgs() throws ParseException {
- parseAndCheckExpected(new String[0]);
- }
-
- @Test
- public void testParseUrl() throws ParseException {
- expectedUrl = "http://fake.url:8080/nifi";
- parseAndCheckExpected("u", SiteToSiteCliMain.URL_OPTION, expectedUrl);
- }
-
- @Test
- public void testParsePortName() throws ParseException {
- expectedPortName = "testPortName";
- parseAndCheckExpected("n", SiteToSiteCliMain.PORT_NAME_OPTION,
expectedPortName);
- }
-
- @Test
- public void testParsePortIdentifier() throws ParseException {
- expectedPortIdentifier = "testPortId";
- parseAndCheckExpected("i", SiteToSiteCliMain.PORT_IDENTIFIER_OPTION,
expectedPortIdentifier);
- }
-
- @Test
- public void testParseTimeout() throws ParseException {
- expectedTimeoutNs = TimeUnit.DAYS.toNanos(3);
- parseAndCheckExpected(null, SiteToSiteCliMain.TIMEOUT_OPTION, "3
days");
- }
-
- @Test
- public void testParsePenalization() throws ParseException {
- expectedPenalizationNs = TimeUnit.HOURS.toNanos(4);
- parseAndCheckExpected(null, SiteToSiteCliMain.PENALIZATION_OPTION, "4
hours");
- }
-
- @Test
- public void testParseKeystore() throws ParseException {
- expectedKeystoreFilename = "keystore.pkcs12";
- expectedKeystorePass = "badPassword";
- expectedKeystoreType = KeystoreType.PKCS12;
- parseAndCheckExpected(new String[]{
- "--" + SiteToSiteCliMain.KEYSTORE_OPTION,
expectedKeystoreFilename,
- "--" + SiteToSiteCliMain.KEY_STORE_PASSWORD_OPTION,
expectedKeystorePass,
- "--" + SiteToSiteCliMain.KEY_STORE_TYPE_OPTION,
expectedKeystoreType.toString()
- });
- }
-
- @Test
- public void testParseTruststore() throws ParseException {
- expectedTruststoreFilename = "truststore.pkcs12";
- expectedTruststorePass = "badPassword";
- expectedTruststoreType = KeystoreType.PKCS12;
- parseAndCheckExpected(new String[]{
- "--" + SiteToSiteCliMain.TRUST_STORE_OPTION,
expectedTruststoreFilename,
- "--" + SiteToSiteCliMain.TRUST_STORE_PASSWORD_OPTION,
expectedTruststorePass,
- "--" + SiteToSiteCliMain.TRUST_STORE_TYPE_OPTION,
expectedTruststoreType.toString()
- });
- }
-
- @Test
- public void testParseCompression() throws ParseException {
- expectedCompression = true;
- parseAndCheckExpected("c", SiteToSiteCliMain.COMPRESSION_OPTION, null);
- }
-
- @Test
- public void testParsePeerPersistenceFile() throws ParseException {
- String pathname = "test";
- expectedPeerPersistenceFile = new File(pathname);
- parseAndCheckExpected(null,
SiteToSiteCliMain.PEER_PERSISTENCE_FILE_OPTION, pathname);
- }
-
- @Test
- public void testParseBatchCount() throws ParseException {
- expectedBatchCount = 55;
- parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_COUNT_OPTION,
Integer.toString(expectedBatchCount));
- }
-
- @Test
- public void testParseBatchDuration() throws ParseException {
- expectedBatchDuration = TimeUnit.MINUTES.toNanos(5);
- parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_DURATION_OPTION,
"5 min");
- }
-
- @Test
- public void testParseBatchSize() throws ParseException {
- expectedBatchSize = 1026;
- parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_SIZE_OPTION,
Long.toString(expectedBatchSize));
- }
-
- @Test
- public void testParseProxy() throws ParseException {
- String expectedHost = "testHost";
- int expectedPort = 292;
- String expectedUser = "testUser";
- String expectedPassword = "badPassword";
- expectedHttpProxy = new HttpProxy(expectedHost, expectedPort,
expectedUser, expectedPassword);
- parseAndCheckExpected(new String[]{
- "--" + SiteToSiteCliMain.PROXY_HOST_OPTION, expectedHost,
- "--" + SiteToSiteCliMain.PROXY_PORT_OPTION,
Integer.toString(expectedPort),
- "--" + SiteToSiteCliMain.PROXY_USERNAME_OPTION, expectedUser,
- "--" + SiteToSiteCliMain.PROXY_PASSWORD_OPTION,
expectedPassword});
- }
-
- @Test
- public void testParseTransferDirection() throws ParseException {
- expectedTransferDirection = TransferDirection.RECEIVE;
- parseAndCheckExpected("d", SiteToSiteCliMain.DIRECTION_OPTION,
expectedTransferDirection.toString());
- }
-
- private void parseAndCheckExpected(String shortOption, String longOption,
String value) throws ParseException {
- if (shortOption != null) {
- String[] args;
- if (value == null) {
- args = new String[]{"-" + shortOption};
- } else {
- args = new String[]{"-" + shortOption, value};
- }
- parseAndCheckExpected(args);
- }
- String[] args;
- if (value == null) {
- args = new String[]{"--" + longOption};
- } else {
- args = new String[]{"--" + longOption, value};
- }
- parseAndCheckExpected(args);
- }
-
- private void parseAndCheckExpected(String[] args) throws ParseException {
- SiteToSiteCliMain.CliParse cliParse = SiteToSiteCliMain.parseCli(new
Options(), args);
- SiteToSiteClient.Builder builder = cliParse.getBuilder();
- assertEquals(expectedUrl, builder.getUrl());
- assertEquals(expectedSiteToSiteTransportProtocol,
builder.getTransportProtocol());
- assertEquals(expectedPortName, builder.getPortName());
- assertEquals(expectedPortIdentifier, builder.getPortIdentifier());
- assertEquals(expectedTimeoutNs,
builder.getTimeout(TimeUnit.NANOSECONDS));
- assertEquals(expectedPenalizationNs,
builder.getPenalizationPeriod(TimeUnit.NANOSECONDS));
- assertEquals(expectedKeystoreFilename, builder.getKeystoreFilename());
- assertEquals(expectedKeystorePass, builder.getKeystorePass());
- assertEquals(expectedKeystoreType, builder.getKeystoreType());
- assertEquals(expectedTruststoreFilename,
builder.getTruststoreFilename());
- assertEquals(expectedTruststorePass, builder.getTruststorePass());
- assertEquals(expectedTruststoreType, builder.getTruststoreType());
- assertEquals(expectedCompression, builder.isUseCompression());
- assertEquals(expectedPeerPersistenceFile,
builder.getPeerPersistenceFile());
- if (expectedHttpProxy == null) {
- assertNull(builder.getHttpProxy());
- } else {
- HttpProxy httpProxy = builder.getHttpProxy();
- assertNotNull(httpProxy);
- assertEquals(expectedHttpProxy.getHttpHost(),
httpProxy.getHttpHost());
- assertEquals(expectedHttpProxy.getUsername(),
httpProxy.getUsername());
- assertEquals(expectedHttpProxy.getPassword(),
httpProxy.getPassword());
- }
- SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
- assertEquals(expectedBatchCount,
siteToSiteClientConfig.getPreferredBatchCount());
- assertEquals(expectedBatchDuration,
siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS));
- assertEquals(expectedBatchSize,
siteToSiteClientConfig.getPreferredBatchSize());
- assertEquals(expectedTransferDirection,
cliParse.getTransferDirection());
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
deleted file mode 100644
index e3119d3e8f..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonInclude.Value;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransactionCompletion;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.Supplier;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class SiteToSiteReceiverTest {
- private final ObjectMapper objectMapper = new
ObjectMapper().setDefaultPropertyInclusion(Value.construct(Include.NON_NULL,
Include.ALWAYS));
- @Mock
- SiteToSiteClient siteToSiteClient;
- @Mock
- Transaction transaction;
- @Mock
- TransactionCompletion transactionCompletion;
- ByteArrayOutputStream data;
- private final Supplier<SiteToSiteReceiver> receiverSupplier = () -> new
SiteToSiteReceiver(siteToSiteClient, data);
- ByteArrayOutputStream expectedData;
-
- @BeforeEach
- public void setup() throws IOException {
- data = new ByteArrayOutputStream();
- expectedData = new ByteArrayOutputStream();
-
when(siteToSiteClient.createTransaction(TransferDirection.RECEIVE)).thenReturn(transaction);
- when(transaction.complete()).thenAnswer(invocation -> {
-
verify(siteToSiteClient).createTransaction(TransferDirection.RECEIVE);
- verify(transaction).confirm();
- return transactionCompletion;
- });
- }
-
- @Test
- public void testEmpty() throws IOException {
- assertEquals(transactionCompletion,
receiverSupplier.get().receiveFiles());
-
- objectMapper.writeValue(expectedData, Collections.emptyList());
- assertEquals(expectedData.toString(), data.toString());
- }
-
- @Test
- public void testSingle() throws IOException {
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
-
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(null);
-
- assertEquals(transactionCompletion,
receiverSupplier.get().receiveFiles());
-
- objectMapper.writeValue(expectedData,
Collections.singletonList(dataPacketDto));
- assertEquals(expectedData.toString(), data.toString());
- }
-
- @Test
- public void testMulti() throws IOException {
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
- DataPacketDto dataPacketDto2 = new
DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2",
"value2");
-
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(dataPacketDto2.toDataPacket()).thenReturn(null);
-
- assertEquals(transactionCompletion,
receiverSupplier.get().receiveFiles());
-
- objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto,
dataPacketDto2));
- assertEquals(expectedData.toString(), data.toString());
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
deleted file mode 100644
index 9ae7743447..0000000000
---
a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.toolkit.s2s;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransactionCompletion;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class SiteToSiteSenderTest {
- private final ObjectMapper objectMapper = new ObjectMapper();
- @Mock
- SiteToSiteClient siteToSiteClient;
- @Mock
- Transaction transaction;
- @Mock
- TransactionCompletion transactionCompletion;
- ByteArrayOutputStream data;
- private final Supplier<SiteToSiteSender> senderSupplier = () -> new
SiteToSiteSender(siteToSiteClient, new
ByteArrayInputStream(data.toByteArray()));
-
- @BeforeEach
- public void setup() throws IOException {
- data = new ByteArrayOutputStream();
-
when(siteToSiteClient.createTransaction(TransferDirection.SEND)).thenReturn(transaction);
- }
-
- @Test
- public void testEmptyList() throws IOException {
- setTransactionCompletion();
-
- objectMapper.writeValue(data, Collections.emptyList());
- assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
- verify(transaction, never()).send(any(DataPacket.class));
- verify(transaction).complete();
- verifyNoMoreInteractions(siteToSiteClient, transaction,
transactionCompletion);
- }
-
- @Test
- public void testSingleElement() throws IOException {
- setTransactionCompletion();
-
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
- objectMapper.writeValue(data, Arrays.stream(new
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
- assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
- verify(transaction).send(dataPacketDto.toDataPacket());
- verify(transaction).complete();
- verifyNoMoreInteractions(siteToSiteClient, transaction,
transactionCompletion);
- }
-
- @Test
- public void testMultipleElements() throws IOException {
- setTransactionCompletion();
-
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
- DataPacketDto dataPacketDto2 = new
DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2",
"value2");
- objectMapper.writeValue(data, Arrays.stream(new
DataPacketDto[]{dataPacketDto, dataPacketDto2}).collect(Collectors.toList()));
- assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
- verify(transaction).send(dataPacketDto.toDataPacket());
- verify(transaction).send(dataPacketDto2.toDataPacket());
- verify(transaction).complete();
- verifyNoMoreInteractions(siteToSiteClient, transaction,
transactionCompletion);
- }
-
- @Test
- public void testIOException() throws IOException {
- IOException test = new IOException("test");
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
- objectMapper.writeValue(data, Arrays.stream(new
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
- doThrow(test).when(transaction).send(any(DataPacket.class));
-
- assertThrows(IOException.class, () ->
senderSupplier.get().sendFiles());
- }
-
- @Test
- public void testRuntimeException() throws IOException {
- RuntimeException test = new RuntimeException("test");
- DataPacketDto dataPacketDto = new
DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key",
"value");
- objectMapper.writeValue(data, Arrays.stream(new
DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
- doThrow(test).when(transaction).send(any(DataPacket.class));
-
- assertThrows(IOException.class, () ->
senderSupplier.get().sendFiles());
- }
-
- private void setTransactionCompletion() throws IOException {
- when(transaction.complete()).thenAnswer(invocation -> {
- verify(siteToSiteClient).createTransaction(TransferDirection.SEND);
- verify(transaction).confirm();
- return transactionCompletion;
- });
- }
-}
diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml
index 71d2103d26..28c160adcd 100644
--- a/nifi-toolkit/pom.xml
+++ b/nifi-toolkit/pom.xml
@@ -25,7 +25,6 @@
<modules>
<module>nifi-toolkit-tls</module>
<module>nifi-toolkit-encrypt-config</module>
- <module>nifi-toolkit-s2s</module>
<module>nifi-toolkit-admin</module>
<module>nifi-toolkit-zookeeper-migrator</module>
<module>nifi-toolkit-flowfile-repo</module>