Repository: nifi Updated Branches: refs/heads/master e230e50f9 -> 4e13fef72
NIFI-2783 - Site-to-site command line client - s2s.sh shell script warnings -> stderr, usage improvement with examples - Increasing heap settings for s2s cli This closes #1056. Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e13fef7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e13fef7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e13fef7 Branch: refs/heads/master Commit: 4e13fef7243935962848ae83b3e31a5283c71c66 Parents: e230e50 Author: Bryan Rosander <bryanrosan...@gmail.com> Authored: Fri Sep 23 11:32:05 2016 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Fri Sep 30 09:12:16 2016 -0400 ---------------------------------------------------------------------- nifi-toolkit/nifi-toolkit-assembly/pom.xml | 4 + .../src/main/resources/bin/s2s.bat | 40 +++ .../src/main/resources/bin/s2s.sh | 120 +++++++++ nifi-toolkit/nifi-toolkit-s2s/pom.xml | 43 +++ .../apache/nifi/toolkit/s2s/DataPacketDto.java | 136 ++++++++++ .../apache/nifi/toolkit/s2s/DataPacketImpl.java | 97 +++++++ .../nifi/toolkit/s2s/SiteToSiteCliMain.java | 260 +++++++++++++++++++ .../nifi/toolkit/s2s/SiteToSiteReceiver.java | 73 ++++++ .../nifi/toolkit/s2s/SiteToSiteSender.java | 60 +++++ .../nifi/toolkit/s2s/DataPacketDtoTest.java | 96 +++++++ .../nifi/toolkit/s2s/DataPacketImplTest.java | 85 ++++++ .../nifi/toolkit/s2s/SiteToSiteCliMainTest.java | 245 +++++++++++++++++ .../toolkit/s2s/SiteToSiteReceiverTest.java | 98 +++++++ .../nifi/toolkit/s2s/SiteToSiteSenderTest.java | 130 ++++++++++ nifi-toolkit/pom.xml | 1 + pom.xml | 5 + 16 files changed, 1493 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml b/nifi-toolkit/nifi-toolkit-assembly/pom.xml index 9df9b0c..8d189e4 100644 --- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml +++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml @@ -69,6 +69,10 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-toolkit-encrypt-config</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit-s2s</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <scope>compile</scope> http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat new file mode 100644 index 0000000..6155715 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.bat @@ -0,0 +1,40 @@ +@echo off +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Use JAVA_HOME if it's set; otherwise, just use java + +if "%JAVA_HOME%" == "" goto noJavaHome +if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome +set JAVA_EXE=%JAVA_HOME%\bin\java.exe +goto startConfig + +:noJavaHome +echo The JAVA_HOME environment variable is not defined correctly. +echo Instead the PATH will be used to find the java executable. +echo. +set JAVA_EXE=java +goto startConfig + +:startConfig +set LIB_DIR=%~dp0..\classpath;%~dp0..\lib + +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms128m -Xmx256m %JAVA_ARGS% org.apache.nifi.toolkit.s2s.SiteToSiteCliMain + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %* + +popd http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh new file mode 100644 index 0000000..e113ca0 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/s2s.sh @@ -0,0 +1,120 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches + +SCRIPT_DIR=$(dirname "$0") +SCRIPT_NAME=$(basename "$0") +NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) +PROGNAME=$(basename "$0") + + +warn() { + (>&2 echo "${PROGNAME}: $*") +} + +die() { + warn "$*" + exit 1 +} + +detectOS() { + # OS specific support (must be 'true' or 'false'). + cygwin=false; + aix=false; + os400=false; + darwin=false; + case "$(uname)" in + CYGWIN*) + cygwin=true + ;; + AIX*) + aix=true + ;; + OS400*) + os400=true + ;; + Darwin) + darwin=true + ;; + esac + # For AIX, set an environment variable + if ${aix}; then + export LDR_CNTRL=MAXDATA=0xB0000000@DSA + echo ${LDR_CNTRL} + fi +} + +locateJava() { + # Setup the Java Virtual Machine + if $cygwin ; then + [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}") + [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}") + fi + + if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi + if [ "x${JAVA}" = "x" ]; then + if [ "x${JAVA_HOME}" != "x" ]; then + if [ ! -d "${JAVA_HOME}" ]; then + die "JAVA_HOME is not valid: ${JAVA_HOME}" + fi + JAVA="${JAVA_HOME}/bin/java" + else + warn "JAVA_HOME not set; results may vary" + JAVA=$(type java) + JAVA=$(expr "${JAVA}" : '.* \(/.*\)$') + if [ "x${JAVA}" = "x" ]; then + die "java command not found" + fi + fi + fi +} + +init() { + # Determine if there is special OS handling we must perform + detectOS + + # Locate the Java VM to execute + locateJava "$1" +} + +run() { + LIBS="${NIFI_TOOLKIT_HOME}/lib/*" + + sudo_cmd_prefix="" + if $cygwin; then + NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}") + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath";$(cygpath --path --windows "${LIBS}") + else + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}" + fi + + export JAVA_HOME="$JAVA_HOME" + export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME" + + umask 0077 + "${JAVA}" -cp "${CLASSPATH}" -Xms128m -Xmx256m org.apache.nifi.toolkit.s2s.SiteToSiteCliMain "$@" + return $? +} + + +init "$1" +run "$@" http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/pom.xml b/nifi-toolkit/nifi-toolkit-s2s/pom.xml new file mode 100644 index 0000000..fac6bf1 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-toolkit-s2s</artifactId> + <description>Site-to-site cli</description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java new file mode 100644 index 0000000..8d7e7b3 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketDto.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.remote.protocol.DataPacket; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * DTO object for serializing and deserializing DataPackets via JSON + */ +public class DataPacketDto { + public static final TypeReference<DataPacketDto> DATA_PACKET_DTO_TYPE_REFERENCE = new TypeReference<DataPacketDto>() { + }; + private Map<String, String> attributes; + private byte[] data; + private String dataFile; + + public DataPacketDto() { + this(null); + } + + public DataPacketDto(byte[] data) { + this(new HashMap<>(), data); + } + + public DataPacketDto(Map<String, String> attributes, byte[] data) { + this(attributes, data, null); + } + + public DataPacketDto(Map<String, String> attributes, String dataFile) { + this(attributes, null, dataFile); + } + + public DataPacketDto(Map<String, String> attributes, byte[] data, String dataFile) { + this.attributes = attributes; + this.data = data; + this.dataFile = dataFile; + } + + public static Stream<DataPacket> getDataPacketStream(InputStream inputStream) throws IOException { + JsonParser jsonParser = new JsonFactory().createParser(inputStream); + if (jsonParser.nextToken() != JsonToken.START_ARRAY) { + throw new IOException("Expecting start array token to begin object array."); + } + jsonParser.setCodec(new ObjectMapper()); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<DataPacket>() { + DataPacket next = getNext(); + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public DataPacket next() { + DataPacket next = this.next; + this.next = getNext(); + return next; + } + + DataPacket getNext() throws RuntimeException { + try { + if (jsonParser.nextToken() == JsonToken.END_ARRAY) { + return null; + } + DataPacketDto dataPacketDto = jsonParser.readValueAs(DATA_PACKET_DTO_TYPE_REFERENCE); + return dataPacketDto.toDataPacket(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, Spliterator.ORDERED), false); + } + + public Map<String, String> getAttributes() { + return attributes; + } + + public void setAttributes(Map<String, String> attributes) { + this.attributes = attributes; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public String getDataFile() { + return dataFile; + } + + public void setDataFile(String dataFile) { + this.dataFile = dataFile; + } + + public DataPacket toDataPacket() { + return new DataPacketImpl(attributes, data, dataFile); + } + + public DataPacketDto putAttribute(String key, String value) { + attributes.put(key, value); + return this; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java new file mode 100644 index 0000000..8675e33 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/DataPacketImpl.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import org.apache.nifi.remote.protocol.DataPacket; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Implements DataPacket either taking the data passed in or reading from a file on disk + */ +public class DataPacketImpl implements DataPacket { + private final Map<String, String> attributes; + private final byte[] data; + private final String dataFile; + + public DataPacketImpl(Map<String, String> attributes, byte[] data, String dataFile) { + this.attributes = attributes == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(attributes)); + this.data = data; + this.dataFile = dataFile; + } + + @Override + public Map<String, String> getAttributes() { + return attributes; + } + + @Override + public InputStream getData() { + if (data == null) { + if (dataFile != null && dataFile.length() > 0) { + try { + return new FileInputStream(dataFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return new ByteArrayInputStream(new byte[0]); + } + } + return new ByteArrayInputStream(data); + } + + @Override + public long getSize() { + if (data == null) { + if (dataFile != null && dataFile.length() > 0) { + return new File(dataFile).length(); + } else { + return 0; + } + } + return data.length; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + DataPacketImpl that = (DataPacketImpl) o; + + if (attributes != null ? !attributes.equals(that.attributes) : that.attributes != null) return false; + return Arrays.equals(data, that.data); + + } + + @Override + public int hashCode() { + int result = attributes != null ? attributes.hashCode() : 0; + result = 31 * result + Arrays.hashCode(data); + return result; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java new file mode 100644 index 0000000..f03a0bf --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMain.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.KeystoreType; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.util.FormatUtils; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class SiteToSiteCliMain { + public static final String URL_OPTION = "url"; + public static final String URL_OPTION_DEFAULT = "http://localhost:8080/nifi"; + public static final String DIRECTION_OPTION = "direction"; + public static final String DIRECTION_OPTION_DEFAULT = TransferDirection.SEND.toString(); + public static final String PORT_NAME_OPTION = "portName"; + public static final String PORT_IDENTIFIER_OPTION = "portIdentifier"; + public static final String TIMEOUT_OPTION = "timeout"; + public static final String PENALIZATION_OPTION = "penalization"; + public static final String KEYSTORE_OPTION = "keyStore"; + public static final String KEY_STORE_TYPE_OPTION = "keyStoreType"; + public static final String KEY_STORE_PASSWORD_OPTION = "keyStorePassword"; + public static final String TRUST_STORE_OPTION = "trustStore"; + public static final String TRUST_STORE_TYPE_OPTION = "trustStoreType"; + public static final String TRUST_STORE_PASSWORD_OPTION = "trustStorePassword"; + public static final String PEER_PERSISTENCE_FILE_OPTION = "peerPersistenceFile"; + public static final String COMPRESSION_OPTION = "compression"; + public static final String TRANSPORT_PROTOCOL_OPTION = "transportProtocol"; + public static final String TRANSPORT_PROTOCOL_OPTION_DEFAULT = SiteToSiteTransportProtocol.RAW.toString(); + public static final String BATCH_COUNT_OPTION = "batchCount"; + public static final String BATCH_SIZE_OPTION = "batchSize"; + public static final String BATCH_DURATION_OPTION = "batchDuration"; + public static final String HELP_OPTION = "help"; + public static final String PROXY_HOST_OPTION = "proxyHost"; + public static final String PROXY_PORT_OPTION = "proxyPort"; + public static final String PROXY_USERNAME_OPTION = "proxyUsername"; + public static final String PROXY_PASSWORD_OPTION = "proxyPassword"; + public static final String PROXY_PORT_OPTION_DEFAULT = "80"; + public static final String KEYSTORE_TYPE_OPTION_DEFAULT = KeystoreType.JKS.toString(); + public static final String NEED_CLIENT_AUTH_OPTION = "needClientAuth"; + + /** + * Prints the usage to System.out + * + * @param errorMessage optional error message + * @param options the options object to print usage for + */ + public static void printUsage(String errorMessage, Options options) { + if (errorMessage != null) { + System.out.println(errorMessage); + System.out.println(); + System.out.println(); + } + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + System.out.println("s2s is a command line tool that can either read a list of DataPackets from stdin to send over site-to-site or write the received DataPackets to stdout"); + System.out.println(); + System.out.println("The s2s cli input/output format is a JSON list of DataPackets. They can have the following formats:"); + try { + System.out.println(); + objectMapper.writeValue(System.out, Arrays.asList(new DataPacketDto("hello nifi".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"))); + System.out.println(); + System.out.println("Where data is the base64 encoded value of the FlowFile content (always used for received data) or"); + System.out.println(); + objectMapper.writeValue(System.out, Arrays.asList(new DataPacketDto(new HashMap<>(), new File("EXAMPLE").getAbsolutePath()).putAttribute("key", "value"))); + System.out.println(); + System.out.println("Where dataFile is a file to read the FlowFile content from"); + System.out.println(); + System.out.println(); + System.out.println("Example usage to send a FlowFile with the contents of \"hey nifi\" to a local unsecured NiFi over http with an input port named input:"); + System.out.print("echo '"); + DataPacketDto dataPacketDto = new DataPacketDto("hey nifi".getBytes(StandardCharsets.UTF_8)); + dataPacketDto.setAttributes(null); + objectMapper.writeValue(System.out, Arrays.asList(dataPacketDto)); + System.out.println("' | bin/s2s.sh -n input -p http"); + System.out.println(); + } catch (IOException e) { + e.printStackTrace(); + } + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(160); + helpFormatter.printHelp("s2s", options); + System.out.flush(); + } + + /** + * Parses command line options into a CliParse object + * + * @param options an empty options object (so callers can print usage if the parse fails + * @param args the string array of arguments + * @return a CliParse object containing the constructed SiteToSiteClient.Builder and a TransferDirection + * @throws ParseException if there is an error parsing the command line + */ + public static CliParse parseCli(Options options, String[] args) throws ParseException { + options.addOption("u", URL_OPTION, true, "NiFI URL to connect to (default: " + URL_OPTION_DEFAULT + ")"); + options.addOption("d", DIRECTION_OPTION, true, "Direction (valid directions: " + + Arrays.stream(TransferDirection.values()).map(Object::toString).collect(Collectors.joining(", ")) + ") (default: " + DIRECTION_OPTION_DEFAULT + ")"); + options.addOption("n", PORT_NAME_OPTION, true, "Port name"); + options.addOption("i", PORT_IDENTIFIER_OPTION, true, "Port id"); + options.addOption(null, TIMEOUT_OPTION, true, "Timeout"); + options.addOption(null, PENALIZATION_OPTION, true, "Penalization period"); + options.addOption(null, KEYSTORE_OPTION, true, "Keystore"); + options.addOption(null, KEY_STORE_TYPE_OPTION, true, "Keystore type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")"); + options.addOption(null, KEY_STORE_PASSWORD_OPTION, true, "Keystore password"); + options.addOption(null, TRUST_STORE_OPTION, true, "Truststore"); + options.addOption(null, TRUST_STORE_TYPE_OPTION, true, "Truststore type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")"); + options.addOption(null, TRUST_STORE_PASSWORD_OPTION, true, "Truststore password"); + options.addOption(null, NEED_CLIENT_AUTH_OPTION, false, "Need client auth"); + options.addOption("c", COMPRESSION_OPTION, false, "Use compression"); + options.addOption(null, PEER_PERSISTENCE_FILE_OPTION, true, "File to write peer information to so it can be recovered on restart"); + options.addOption("p", TRANSPORT_PROTOCOL_OPTION, true, "Site to site transport protocol (default: " + TRANSPORT_PROTOCOL_OPTION_DEFAULT + ")"); + options.addOption(null, BATCH_COUNT_OPTION, true, "Number of flow files in a batch"); + options.addOption(null, BATCH_SIZE_OPTION, true, "Size of flow files in a batch"); + options.addOption(null, BATCH_DURATION_OPTION, true, "Duration of a batch"); + options.addOption(null, PROXY_HOST_OPTION, true, "Proxy hostname"); + options.addOption(null, PROXY_PORT_OPTION, true, "Proxy port"); + options.addOption(null, PROXY_USERNAME_OPTION, true, "Proxy username"); + options.addOption(null, PROXY_PASSWORD_OPTION, true, "Proxy password"); + options.addOption("h", HELP_OPTION, false, "Show help message and exit"); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine; + commandLine = parser.parse(options, args); + if (commandLine.hasOption(HELP_OPTION)) { + printUsage(null, options); + System.exit(1); + } + SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder(); + builder.url(commandLine.getOptionValue(URL_OPTION, URL_OPTION_DEFAULT)); + if (commandLine.hasOption(PORT_NAME_OPTION)) { + builder.portName(commandLine.getOptionValue(PORT_NAME_OPTION)); + } + if (commandLine.hasOption(PORT_IDENTIFIER_OPTION)) { + builder.portIdentifier(commandLine.getOptionValue(PORT_IDENTIFIER_OPTION)); + } + if (commandLine.hasOption(TIMEOUT_OPTION)) { + builder.timeout(FormatUtils.getTimeDuration(commandLine.getOptionValue(TIMEOUT_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + if (commandLine.hasOption(PENALIZATION_OPTION)) { + builder.nodePenalizationPeriod(FormatUtils.getTimeDuration(commandLine.getOptionValue(PENALIZATION_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + if (commandLine.hasOption(KEYSTORE_OPTION)) { + builder.keystoreFilename(commandLine.getOptionValue(KEYSTORE_OPTION)); + builder.keystoreType(KeystoreType.valueOf(commandLine.getOptionValue(KEY_STORE_TYPE_OPTION, KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase())); + + if (commandLine.hasOption(KEY_STORE_PASSWORD_OPTION)) { + builder.keystorePass(commandLine.getOptionValue(KEY_STORE_PASSWORD_OPTION)); + } else { + throw new ParseException("Must specify keystore password"); + } + } + if (commandLine.hasOption(TRUST_STORE_OPTION)) { + builder.truststoreFilename(commandLine.getOptionValue(TRUST_STORE_OPTION)); + builder.truststoreType(KeystoreType.valueOf(commandLine.getOptionValue(TRUST_STORE_TYPE_OPTION, KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase())); + + if (commandLine.hasOption(TRUST_STORE_PASSWORD_OPTION)) { + builder.truststorePass(commandLine.getOptionValue(TRUST_STORE_PASSWORD_OPTION)); + } else { + throw new ParseException("Must specify truststore password"); + } + } + if (commandLine.hasOption(COMPRESSION_OPTION)) { + builder.useCompression(true); + } else { + builder.useCompression(false); + } + if (commandLine.hasOption(PEER_PERSISTENCE_FILE_OPTION)) { + builder.peerPersistenceFile(new File(commandLine.getOptionValue(PEER_PERSISTENCE_FILE_OPTION))); + } + if (commandLine.hasOption(BATCH_COUNT_OPTION)) { + builder.requestBatchCount(Integer.parseInt(commandLine.getOptionValue(BATCH_COUNT_OPTION))); + } + if (commandLine.hasOption(BATCH_SIZE_OPTION)) { + builder.requestBatchSize(Long.parseLong(commandLine.getOptionValue(BATCH_SIZE_OPTION))); + } + if (commandLine.hasOption(BATCH_DURATION_OPTION)) { + builder.requestBatchDuration(FormatUtils.getTimeDuration(commandLine.getOptionValue(BATCH_DURATION_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + if (commandLine.hasOption(PROXY_HOST_OPTION)) { + builder.httpProxy(new HttpProxy(commandLine.getOptionValue(PROXY_HOST_OPTION), Integer.parseInt(commandLine.getOptionValue(PROXY_PORT_OPTION, PROXY_PORT_OPTION_DEFAULT)), + commandLine.getOptionValue(PROXY_USERNAME_OPTION), commandLine.getOptionValue(PROXY_PASSWORD_OPTION))); + } + builder.transportProtocol(SiteToSiteTransportProtocol.valueOf(commandLine.getOptionValue(TRANSPORT_PROTOCOL_OPTION, TRANSPORT_PROTOCOL_OPTION_DEFAULT).toUpperCase())); + TransferDirection transferDirection = TransferDirection.valueOf(commandLine.getOptionValue(DIRECTION_OPTION, DIRECTION_OPTION_DEFAULT)); + return new CliParse() { + @Override + public SiteToSiteClient.Builder getBuilder() { + return builder; + } + + @Override + public TransferDirection getTransferDirection() { + return transferDirection; + } + }; + } + + public static void main(String[] args) { + // Make IO redirection useful + PrintStream output = System.out; + System.setOut(System.err); + Options options = new Options(); + try { + CliParse cliParse = parseCli(options, args); + try (SiteToSiteClient siteToSiteClient = cliParse.getBuilder().build()) { + if (cliParse.getTransferDirection() == TransferDirection.SEND) { + new SiteToSiteSender(siteToSiteClient, System.in).sendFiles(); + } else { + new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles(); + } + } + } catch (Exception e) { + printUsage(e.getMessage(), options); + e.printStackTrace(); + } + } + + /** + * Combines a SiteToSiteClient.Builder and TransferDirection into a return value for parseCli + */ + public interface CliParse { + SiteToSiteClient.Builder getBuilder(); + + TransferDirection getTransferDirection(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java new file mode 100644 index 0000000..b68185f --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiver.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransactionCompletion; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * Class that will print received DataPackets to output + */ +public class SiteToSiteReceiver { + private final SiteToSiteClient siteToSiteClient; + private final OutputStream output; + + public SiteToSiteReceiver(SiteToSiteClient siteToSiteClient, OutputStream output) { + this.siteToSiteClient = siteToSiteClient; + this.output = output; + } + + public TransactionCompletion receiveFiles() throws IOException { + Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE); + JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output); + jsonGenerator.writeStartArray(); + DataPacket dataPacket; + while ((dataPacket = transaction.receive()) != null) { + jsonGenerator.writeStartObject(); + jsonGenerator.writeFieldName("attributes"); + jsonGenerator.writeStartObject(); + Map<String, String> attributes = dataPacket.getAttributes(); + if (attributes != null) { + for (Map.Entry<String, String> stringStringEntry : attributes.entrySet()) { + jsonGenerator.writeStringField(stringStringEntry.getKey(), stringStringEntry.getValue()); + } + } + jsonGenerator.writeEndObject(); + InputStream data = dataPacket.getData(); + if (data != null) { + jsonGenerator.writeBinaryField("data", IOUtils.toByteArray(data)); + } + jsonGenerator.writeEndObject(); + } + jsonGenerator.writeEndArray(); + jsonGenerator.close(); + transaction.confirm(); + return transaction.complete(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java new file mode 100644 index 0000000..54c135a --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/main/java/org/apache/nifi/toolkit/s2s/SiteToSiteSender.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransactionCompletion; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Class that will send DataPackets read from input + */ +public class SiteToSiteSender { + private final SiteToSiteClient siteToSiteClient; + private final InputStream input; + + public SiteToSiteSender(SiteToSiteClient siteToSiteClient, InputStream input) { + this.siteToSiteClient = siteToSiteClient; + this.input = input; + } + + public TransactionCompletion sendFiles() throws IOException { + Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.SEND); + try { + DataPacketDto.getDataPacketStream(input).forEachOrdered(d -> { + try { + transaction.send(d); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException(e.getMessage(), e); + } + transaction.confirm(); + return transaction.complete(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java new file mode 100644 index 0000000..6c27ce4 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketDtoTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.remote.protocol.DataPacket; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class DataPacketDtoTest { + public static DataPacketDto create(byte[] data) { + return new DataPacketDto(new HashMap<>(), data); + } + + @Test + public void testNoArgConstructor() { + DataPacketDto dataPacketDto = new DataPacketDto(); + assertEquals(0, dataPacketDto.getAttributes().size()); + assertNull(dataPacketDto.getData()); + } + + @Test + public void testGetSetAttributes() { + DataPacketDto dataPacketDto = create(null); + Map<String, String> attributes = new HashMap<>(); + attributes.put("key", "value"); + dataPacketDto.setAttributes(attributes); + assertEquals(attributes, Collections.unmodifiableMap(dataPacketDto.getAttributes())); + } + + @Test + public void testDataFileConstructor() { + String dataFile = "dataFile"; + assertEquals(dataFile, new DataPacketDto(null, dataFile).getDataFile()); + } + + @Test + public void testParserNone() throws IOException { + List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(("[]").getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList()); + assertEquals(0, dataPackets.size()); + } + + @Test + public void testParserSingle() throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + StringBuilder stringBuilder = new StringBuilder("["); + DataPacketDto dataPacketDto = new DataPacketDto("test data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto)); + stringBuilder.append("]"); + List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList()); + assertEquals(1, dataPackets.size()); + assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0)); + } + + @Test + public void testParserMultiple() throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + StringBuilder stringBuilder = new StringBuilder("["); + DataPacketDto dataPacketDto = new DataPacketDto("test data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto)); + DataPacketDto dataPacketDto2 = new DataPacketDto("test data 2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2"); + stringBuilder.append(","); + stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto2)); + stringBuilder.append("]"); + List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList()); + assertEquals(2, dataPackets.size()); + assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0)); + assertEquals(dataPacketDto2.toDataPacket(), dataPackets.get(1)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java new file mode 100644 index 0000000..6dfcd38 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/DataPacketImplTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import org.apache.commons.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class DataPacketImplTest { + private Map<String, String> testAttributes; + + @Before + public void setup() { + testAttributes = new HashMap<>(); + testAttributes.put("testKey", "testVal"); + } + + @Test + public void testPacketNulls() throws IOException { + DataPacketImpl dataPacket = new DataPacketImpl(null, null, null); + assertEquals(0, dataPacket.getAttributes().size()); + assertEquals(-1, dataPacket.getData().read(new byte[1])); + assertEquals(0, dataPacket.getSize()); + } + + @Test + public void testPacketAttributes() { + assertEquals(Collections.unmodifiableMap(testAttributes), new DataPacketImpl(testAttributes, null, null).getAttributes()); + } + + @Test + public void testPacketData() throws IOException { + byte[] testData = "test data".getBytes(StandardCharsets.UTF_8); + DataPacketImpl dataPacket = new DataPacketImpl(null, testData, null); + assertEquals(testData.length, dataPacket.getSize()); + assertArrayEquals(testData, IOUtils.toByteArray(dataPacket.getData())); + } + + @Test + public void testDataFile() throws IOException { + byte[] testData = "test data".getBytes(StandardCharsets.UTF_8); + File tempFile = File.createTempFile("abc", "def"); + try { + try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) { + fileOutputStream.write(testData); + } + DataPacketImpl dataPacket = new DataPacketImpl(null, null, tempFile.getAbsolutePath()); + assertEquals(testData.length, dataPacket.getSize()); + try (InputStream input = dataPacket.getData()) { + assertArrayEquals(testData, IOUtils.toByteArray(input)); + } + } finally { + if (!tempFile.delete()) { + tempFile.deleteOnExit(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java new file mode 100644 index 0000000..6f9e771 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteCliMainTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.KeystoreType; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class SiteToSiteCliMainTest { + private String expectedUrl; + private TransferDirection expectedTransferDirection; + private SiteToSiteTransportProtocol expectedSiteToSiteTransportProtocol; + private String expectedPortName; + private String expectedPortIdentifier; + private long expectedTimeoutNs; + private long expectedPenalizationNs; + private String expectedKeystoreFilename; + private String expectedKeystorePass; + private KeystoreType expectedKeystoreType; + private String expectedTruststoreFilename; + private String expectedTruststorePass; + private KeystoreType expectedTruststoreType; + private boolean expectedCompression; + private File expectedPeerPersistenceFile; + private int expectedBatchCount; + private long expectedBatchDuration; + private long expectedBatchSize; + private HttpProxy expectedHttpProxy; + + @Before + public void setup() { + SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder(); + expectedUrl = SiteToSiteCliMain.URL_OPTION_DEFAULT; + expectedTransferDirection = TransferDirection.valueOf(SiteToSiteCliMain.DIRECTION_OPTION_DEFAULT); + expectedSiteToSiteTransportProtocol = SiteToSiteTransportProtocol.valueOf(SiteToSiteCliMain.TRANSPORT_PROTOCOL_OPTION_DEFAULT); + expectedPortName = builder.getPortName(); + expectedPortIdentifier = builder.getPortIdentifier(); + expectedTimeoutNs = builder.getTimeout(TimeUnit.NANOSECONDS); + expectedPenalizationNs = builder.getPenalizationPeriod(TimeUnit.NANOSECONDS); + expectedKeystoreFilename = builder.getKeystoreFilename(); + expectedKeystorePass = builder.getKeystorePass(); + expectedKeystoreType = builder.getKeystoreType(); + expectedTruststoreFilename = builder.getTruststoreFilename(); + expectedTruststorePass = builder.getTruststorePass(); + expectedTruststoreType = builder.getTruststoreType(); + expectedCompression = false; + expectedPeerPersistenceFile = builder.getPeerPersistenceFile(); + SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig(); + expectedBatchCount = siteToSiteClientConfig.getPreferredBatchCount(); + expectedBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + expectedBatchSize = siteToSiteClientConfig.getPreferredBatchSize(); + expectedHttpProxy = siteToSiteClientConfig.getHttpProxy(); + } + + @Test + public void testParseNoArgs() throws ParseException { + parseAndCheckExpected(new String[0]); + } + + @Test + public void testParseUrl() throws ParseException { + expectedUrl = "http://fake.url:8080/nifi"; + parseAndCheckExpected("u", SiteToSiteCliMain.URL_OPTION, expectedUrl); + } + + @Test + public void testParsePortName() throws ParseException { + expectedPortName = "testPortName"; + parseAndCheckExpected("n", SiteToSiteCliMain.PORT_NAME_OPTION, expectedPortName); + } + + @Test + public void testParsePortIdentifier() throws ParseException { + expectedPortIdentifier = "testPortId"; + parseAndCheckExpected("i", SiteToSiteCliMain.PORT_IDENTIFIER_OPTION, expectedPortIdentifier); + } + + @Test + public void testParseTimeout() throws ParseException { + expectedTimeoutNs = TimeUnit.DAYS.toNanos(3); + parseAndCheckExpected(null, SiteToSiteCliMain.TIMEOUT_OPTION, "3 days"); + } + + @Test + public void testParsePenalization() throws ParseException { + expectedPenalizationNs = TimeUnit.HOURS.toNanos(4); + parseAndCheckExpected(null, SiteToSiteCliMain.PENALIZATION_OPTION, "4 hours"); + } + + @Test + public void testParseKeystore() throws ParseException { + expectedKeystoreFilename = "keystore.pkcs12"; + expectedKeystorePass = "badPassword"; + expectedKeystoreType = KeystoreType.PKCS12; + parseAndCheckExpected(new String[]{ + "--" + SiteToSiteCliMain.KEYSTORE_OPTION, expectedKeystoreFilename, + "--" + SiteToSiteCliMain.KEY_STORE_PASSWORD_OPTION, expectedKeystorePass, + "--" + SiteToSiteCliMain.KEY_STORE_TYPE_OPTION, expectedKeystoreType.toString() + }); + } + + @Test + public void testParseTruststore() throws ParseException { + expectedTruststoreFilename = "truststore.pkcs12"; + expectedTruststorePass = "badPassword"; + expectedTruststoreType = KeystoreType.PKCS12; + parseAndCheckExpected(new String[]{ + "--" + SiteToSiteCliMain.TRUST_STORE_OPTION, expectedTruststoreFilename, + "--" + SiteToSiteCliMain.TRUST_STORE_PASSWORD_OPTION, expectedTruststorePass, + "--" + SiteToSiteCliMain.TRUST_STORE_TYPE_OPTION, expectedTruststoreType.toString() + }); + } + + @Test + public void testParseCompression() throws ParseException { + expectedCompression = true; + parseAndCheckExpected("c", SiteToSiteCliMain.COMPRESSION_OPTION, null); + } + + @Test + public void testParsePeerPersistenceFile() throws ParseException { + String pathname = "test"; + expectedPeerPersistenceFile = new File(pathname); + parseAndCheckExpected(null, SiteToSiteCliMain.PEER_PERSISTENCE_FILE_OPTION, pathname); + } + + @Test + public void testParseBatchCount() throws ParseException { + expectedBatchCount = 55; + parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_COUNT_OPTION, Integer.toString(expectedBatchCount)); + } + + @Test + public void testParseBatchDuration() throws ParseException { + expectedBatchDuration = TimeUnit.MINUTES.toNanos(5); + parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_DURATION_OPTION, "5 min"); + } + + @Test + public void testParseBatchSize() throws ParseException { + expectedBatchSize = 1026; + parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_SIZE_OPTION, Long.toString(expectedBatchSize)); + } + + @Test + public void testParseProxy() throws ParseException { + String expectedHost = "testHost"; + int expectedPort = 292; + String expectedUser = "testUser"; + String expectedPassword = "badPassword"; + expectedHttpProxy = new HttpProxy(expectedHost, expectedPort, expectedUser, expectedPassword); + parseAndCheckExpected(new String[]{ + "--" + SiteToSiteCliMain.PROXY_HOST_OPTION, expectedHost, + "--" + SiteToSiteCliMain.PROXY_PORT_OPTION, Integer.toString(expectedPort), + "--" + SiteToSiteCliMain.PROXY_USERNAME_OPTION, expectedUser, + "--" + SiteToSiteCliMain.PROXY_PASSWORD_OPTION, expectedPassword}); + } + + @Test + public void testParseTransferDirection() throws ParseException { + expectedTransferDirection = TransferDirection.RECEIVE; + parseAndCheckExpected("d", SiteToSiteCliMain.DIRECTION_OPTION, expectedTransferDirection.toString()); + } + + private void parseAndCheckExpected(String shortOption, String longOption, String value) throws ParseException { + if (shortOption != null) { + String[] args; + if (value == null) { + args = new String[]{"-" + shortOption}; + } else { + args = new String[]{"-" + shortOption, value}; + } + parseAndCheckExpected(args); + } + String[] args; + if (value == null) { + args = new String[]{"--" + longOption}; + } else { + args = new String[]{"--" + longOption, value}; + } + parseAndCheckExpected(args); + } + + private void parseAndCheckExpected(String[] args) throws ParseException { + SiteToSiteCliMain.CliParse cliParse = SiteToSiteCliMain.parseCli(new Options(), args); + SiteToSiteClient.Builder builder = cliParse.getBuilder(); + assertEquals(expectedUrl, builder.getUrl()); + assertEquals(expectedSiteToSiteTransportProtocol, builder.getTransportProtocol()); + assertEquals(expectedPortName, builder.getPortName()); + assertEquals(expectedPortIdentifier, builder.getPortIdentifier()); + assertEquals(expectedTimeoutNs, builder.getTimeout(TimeUnit.NANOSECONDS)); + assertEquals(expectedPenalizationNs, builder.getPenalizationPeriod(TimeUnit.NANOSECONDS)); + assertEquals(expectedKeystoreFilename, builder.getKeystoreFilename()); + assertEquals(expectedKeystorePass, builder.getKeystorePass()); + assertEquals(expectedKeystoreType, builder.getKeystoreType()); + assertEquals(expectedTruststoreFilename, builder.getTruststoreFilename()); + assertEquals(expectedTruststorePass, builder.getTruststorePass()); + assertEquals(expectedTruststoreType, builder.getTruststoreType()); + assertEquals(expectedCompression, builder.isUseCompression()); + assertEquals(expectedPeerPersistenceFile, builder.getPeerPersistenceFile()); + if (expectedHttpProxy == null) { + assertNull(builder.getHttpProxy()); + } else { + HttpProxy httpProxy = builder.getHttpProxy(); + assertNotNull(httpProxy); + assertEquals(expectedHttpProxy.getHttpHost(), httpProxy.getHttpHost()); + assertEquals(expectedHttpProxy.getUsername(), httpProxy.getUsername()); + assertEquals(expectedHttpProxy.getPassword(), httpProxy.getPassword()); + } + SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig(); + assertEquals(expectedBatchCount, siteToSiteClientConfig.getPreferredBatchCount()); + assertEquals(expectedBatchDuration, siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS)); + assertEquals(expectedBatchSize, siteToSiteClientConfig.getPreferredBatchSize()); + assertEquals(expectedTransferDirection, cliParse.getTransferDirection()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java new file mode 100644 index 0000000..d5ea6be --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteReceiverTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransactionCompletion; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SiteToSiteReceiverTest { + private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + @Mock + SiteToSiteClient siteToSiteClient; + @Mock + Transaction transaction; + @Mock + TransactionCompletion transactionCompletion; + ByteArrayOutputStream data; + private final Supplier<SiteToSiteReceiver> receiverSupplier = () -> new SiteToSiteReceiver(siteToSiteClient, data); + ByteArrayOutputStream expectedData; + + @Before + public void setup() throws IOException { + data = new ByteArrayOutputStream(); + expectedData = new ByteArrayOutputStream(); + when(siteToSiteClient.createTransaction(TransferDirection.RECEIVE)).thenReturn(transaction); + when(transaction.complete()).thenAnswer(invocation -> { + verify(siteToSiteClient).createTransaction(TransferDirection.RECEIVE); + verify(transaction).confirm(); + return transactionCompletion; + }); + } + + @Test + public void testEmpty() throws IOException { + assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles()); + + objectMapper.writeValue(expectedData, Collections.emptyList()); + assertEquals(expectedData.toString(), data.toString()); + } + + @Test + public void testSingle() throws IOException { + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(null); + + assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles()); + + objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto)); + assertEquals(expectedData.toString(), data.toString()); + } + + @Test + public void testMulti() throws IOException { + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + DataPacketDto dataPacketDto2 = new DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2"); + when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(dataPacketDto2.toDataPacket()).thenReturn(null); + + assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles()); + + objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto, dataPacketDto2)); + assertEquals(expectedData.toString(), data.toString()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java new file mode 100644 index 0000000..5a82b7c --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-s2s/src/test/java/org/apache/nifi/toolkit/s2s/SiteToSiteSenderTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.s2s; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransactionCompletion; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SiteToSiteSenderTest { + private final ObjectMapper objectMapper = new ObjectMapper(); + @Mock + SiteToSiteClient siteToSiteClient; + @Mock + Transaction transaction; + @Mock + TransactionCompletion transactionCompletion; + ByteArrayOutputStream data; + private final Supplier<SiteToSiteSender> senderSupplier = () -> new SiteToSiteSender(siteToSiteClient, new ByteArrayInputStream(data.toByteArray())); + + @Before + public void setup() throws IOException { + data = new ByteArrayOutputStream(); + when(siteToSiteClient.createTransaction(TransferDirection.SEND)).thenReturn(transaction); + when(transaction.complete()).thenAnswer(invocation -> { + verify(siteToSiteClient).createTransaction(TransferDirection.SEND); + verify(transaction).confirm(); + return transactionCompletion; + }); + } + + @Test + public void testEmptyList() throws IOException { + objectMapper.writeValue(data, Collections.emptyList()); + assertEquals(transactionCompletion, senderSupplier.get().sendFiles()); + verify(transaction, never()).send(any(DataPacket.class)); + verify(transaction).complete(); + verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion); + } + + @Test + public void testSingleElement() throws IOException { + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList())); + assertEquals(transactionCompletion, senderSupplier.get().sendFiles()); + verify(transaction).send(dataPacketDto.toDataPacket()); + verify(transaction).complete(); + verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion); + } + + @Test + public void testMultipleElements() throws IOException { + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + DataPacketDto dataPacketDto2 = new DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2"); + objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto, dataPacketDto2}).collect(Collectors.toList())); + assertEquals(transactionCompletion, senderSupplier.get().sendFiles()); + verify(transaction).send(dataPacketDto.toDataPacket()); + verify(transaction).send(dataPacketDto2.toDataPacket()); + verify(transaction).complete(); + verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion); + } + + @Test(expected = IOException.class) + public void testIOException() throws IOException { + IOException test = new IOException("test"); + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList())); + doThrow(test).when(transaction).send(any(DataPacket.class)); + try { + senderSupplier.get().sendFiles(); + } catch (IOException e) { + assertEquals(test, e); + throw e; + } + } + + @Test(expected = IOException.class) + public void testRuntimeException() throws IOException { + RuntimeException test = new RuntimeException("test"); + DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value"); + objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList())); + doThrow(test).when(transaction).send(any(DataPacket.class)); + try { + senderSupplier.get().sendFiles(); + } catch (IOException e) { + assertEquals(test, e.getCause()); + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/nifi-toolkit/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml index debc51d..ca0f352 100644 --- a/nifi-toolkit/pom.xml +++ b/nifi-toolkit/pom.xml @@ -25,6 +25,7 @@ <modules> <module>nifi-toolkit-tls</module> <module>nifi-toolkit-encrypt-config</module> + <module>nifi-toolkit-s2s</module> <module>nifi-toolkit-assembly</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/nifi/blob/4e13fef7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 22a35cd..cad5a24 100644 --- a/pom.xml +++ b/pom.xml @@ -894,6 +894,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit-s2s</artifactId> + <version>1.1.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-resources</artifactId> <version>1.1.0-SNAPSHOT</version> <classifier>resources</classifier>