This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 00985edd80 NIFI-11044 Script/commands to migrate Kafka processors
00985edd80 is described below
commit 00985edd803b06ace9a12b9fd19f29585e191330
Author: Timea Barna <[email protected]>
AuthorDate: Thu Jan 12 09:31:38 2023 +0100
NIFI-11044 Script/commands to migrate Kafka processors
This closes #6838.
Reviewed-by: Robert Kalmar <[email protected]>
Reviewed-by: Zoltan Kornel Torok <[email protected]>
Signed-off-by: Peter Turcsanyi <[email protected]>
---
nifi-docs/src/main/asciidoc/toolkit-guide.adoc | 69 ++++-
nifi-toolkit/nifi-toolkit-assembly/pom.xml | 5 +
.../src/main/resources/bin/kafka-migrator.bat | 41 +++
.../src/main/resources/bin/kafka-migrator.sh | 119 +++++++++
nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml | 51 ++++
.../toolkit/kafkamigrator/KafkaMigratorMain.java | 130 ++++++++++
.../kafkamigrator/MigratorConfiguration.java | 95 +++++++
.../descriptor/FlowPropertyXpathDescriptor.java | 69 +++++
.../descriptor/KafkaProcessorDescriptor.java | 128 ++++++++++
.../descriptor/KafkaProcessorType.java | 33 +++
.../descriptor/ProcessorDescriptor.java | 26 ++
.../descriptor/PropertyXpathDescriptor.java | 25 ++
.../TemplatePropertyXpathDescriptor.java | 69 +++++
.../migrator/AbstractKafkaMigrator.java | 193 ++++++++++++++
.../migrator/ConsumeKafkaFlowMigrator.java | 38 +++
.../migrator/ConsumeKafkaTemplateMigrator.java | 52 ++++
.../toolkit/kafkamigrator/migrator/Migrator.java | 29 +++
.../migrator/PublishKafkaFlowMigrator.java | 48 ++++
.../migrator/PublishKafkaTemplateMigrator.java | 57 +++++
.../service/KafkaFlowMigrationService.java | 76 ++++++
.../service/KafkaMigrationService.java | 72 ++++++
.../service/KafkaTemplateMigrationService.java | 75 ++++++
.../kafkamigrator/KafkaMigrationServiceTest.java | 155 ++++++++++++
.../toolkit/kafkamigrator/KafkaMigrationUtil.java | 32 +++
.../toolkit/kafkamigrator/KafkaMigratorTest.java | 278 +++++++++++++++++++++
.../src/test/resources/flow.xml | 136 ++++++++++
nifi-toolkit/pom.xml | 1 +
27 files changed, 2101 insertions(+), 1 deletion(-)
diff --git a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
index 6e24472adf..4c05dd6980 100644
--- a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
@@ -1583,4 +1583,71 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to
ZooKeeper 3.5.x, the migrator
* For a ZooKeeper using Kerberos for authentication:
** `zk-migrator.sh -s -z
destinationHostname:destinationClientPort/destinationRootPath/components -k
/path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
-6. Once the migration has completed successfully, start the processors in the
NiFi flow. Processing should continue from the point at which it was stopped
when the NiFi flow was stopped.
\ No newline at end of file
+6. Once the migration has completed successfully, start the processors in the
NiFi flow. Processing should continue from the point at which it was stopped
when the NiFi flow was stopped.
+
+[[kafka_migrator]]
+== Kafka Processor Migrator
+With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11
were removed.
+In large flows having many numbers of components it is challenging to replace
these processors manually.
+This tool can be used to update a flow in an automated way.
+
+=== Usage
+Running the script requires 3 mandatory and 1 optional parameters:
+
+* Input file, the full path of the flow.xml.gz in which the replacement is
required.
+* Output file, the full path of the file where the results should be saved.
+* Transaction, whether the new processors should be configured with or without
transaction usage.
+* Optional: Kafka Brokers, a comma separated list of Kafka Brokers in
<host>:<port> format.
+
+Different input and output files must be used.
+Kafka Broker argument can be omitted if flow does not contain GetKafka or
PutKafka processors.
+
+1. Run script, a possible example:
+
+ ./bin/kafka-migrator.sh -i "/tmp/flow/flow.xml.gz" -o
"/tmp/flow/flow_result.xml.gz" -t false -k
"mykafkaserver1:1234,mykafkaserver2:1235"
+
+2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input
file.
+3. Copy flow.xml.gz file to all the NiFi nodes conf directory
+4. Start NiFi
+5. Verify the results.
+
+=== Expected Behaviour
+* Flow replacement:
+* For all replaced processors:
+** changing class and artifact
+** configure transaction as true
+*** 'Delivery Guarantee' property will be set to 'Replicated'
+*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in
the file they will be set to true
+*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be
translated as true in NiFi
+** configure transaction as false
+*** 'Delivery Guarantee' property will keep its original setting.
+*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
+* For version 0.8 processors (when kafka broker list argument provided)
+** remove all version 0.8 properties
+** add version 2.0 properties with default value except for 'Topic Name',
'Group ID',
+'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
+'Compression Codec' values which will be copied over
+
+* Template replacement:
+* For all replaced processors:
+** changing type and artifact
+** configure transaction as true
+*** 'Delivery Guarantee' property will be set to 'Replicated'
+*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in
the file they will be set to true
+*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be
translated as true in NiFi
+** configure transaction as false
+*** 'Delivery Guarantee' property will keep its original setting.
+*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
+* For version 0.8 processors (when kafka broker list argument provided)
+** remove all version 0.8 properties and descriptors
+** add version 2.0 properties with default value except for 'Topic Name',
'Group ID',
+'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
+'Compression Codec' values which will be copied over
+*** add version 2.0 descriptors
+
+=== Limitations
+* All deprecated Kafka processors will be replaced with version 2.0 processors.
+* Script will not rename the processors, only their type will be changed.
+* Transaction setting will be applied to all the replaced processors.
+* The flow.xml.gz file needs to be fit in memory and process time depends on
the file size.
+* Processors in templates will be replaced as well, please download the
original templates if desired.
\ No newline at end of file
diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
index 8b858102dc..d3fde95ace 100644
--- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
@@ -93,6 +93,11 @@ language governing permissions and limitations under the
License. -->
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-toolkit-kafka-migrator</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-cli</artifactId>
diff --git
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
new file mode 100644
index 0000000000..36fa74c722
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
@@ -0,0 +1,41 @@
+@echo off
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem Use JAVA_HOME if it's set; otherwise, just use java
+
+if "%JAVA_HOME%" == "" goto noJavaHome
+if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
+set JAVA_EXE=%JAVA_HOME%\bin\java.exe
+goto startConfig
+
+:noJavaHome
+echo The JAVA_HOME environment variable is not defined correctly.
+echo Instead the PATH will be used to find the java executable.
+echo.
+set JAVA_EXE=java
+goto startConfig
+
+:startConfig
+set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
+
+if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m
+
+SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS%
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain
+
+cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
+
diff --git
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
new file mode 100755
index 0000000000..c4925ff87b
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
@@ -0,0 +1,119 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+# Script structure inspired from Apache Karaf and other Apache projects with
similar startup approaches
+
+SCRIPT_DIR=$(dirname "$0")
+SCRIPT_NAME=$(basename "$0")
+NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
+PROGNAME=$(basename "$0")
+
+
+warn() {
+ (>&2 echo "${PROGNAME}: $*")
+}
+
+die() {
+ warn "$*"
+ exit 1
+}
+
+detectOS() {
+ # OS specific support (must be 'true' or 'false').
+ cygwin=false;
+ aix=false;
+ os400=false;
+ darwin=false;
+ case "$(uname)" in
+ CYGWIN*)
+ cygwin=true
+ ;;
+ AIX*)
+ aix=true
+ ;;
+ OS400*)
+ os400=true
+ ;;
+ Darwin)
+ darwin=true
+ ;;
+ esac
+ # For AIX, set an environment variable
+ if ${aix}; then
+ export LDR_CNTRL=MAXDATA=0xB0000000@DSA
+ echo ${LDR_CNTRL}
+ fi
+}
+
+locateJava() {
+ # Setup the Java Virtual Machine
+ if $cygwin ; then
+ [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
+ [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
+ fi
+
+ if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=$(java-config --jre-home)
+ fi
+ if [ "x${JAVA}" = "x" ]; then
+ if [ "x${JAVA_HOME}" != "x" ]; then
+ if [ ! -d "${JAVA_HOME}" ]; then
+ die "JAVA_HOME is not valid: ${JAVA_HOME}"
+ fi
+ JAVA="${JAVA_HOME}/bin/java"
+ else
+ warn "JAVA_HOME not set; results may vary"
+ JAVA=$(type java)
+ JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
+ if [ "x${JAVA}" = "x" ]; then
+ die "java command not found"
+ fi
+ fi
+ fi
+}
+
+init() {
+ # Determine if there is special OS handling we must perform
+ detectOS
+
+ # Locate the Java VM to execute
+ locateJava "$1"
+}
+
+run() {
+ LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
+
+ sudo_cmd_prefix=""
+ if $cygwin; then
+ NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
+ CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows
"${LIBS}")"
+ else
+ CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
+ fi
+
+ export JAVA_HOME="$JAVA_HOME"
+ export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
+
+ umask 0077
+ exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m}
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain "$@"
+}
+
+
+init "$1"
+run "$@"
diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
new file mode 100644
index 0000000000..8fd37c4ea3
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-toolkit</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-toolkit-kafka-migrator</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-xml-processing</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/flow.xml</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
new file mode 100644
index 0000000000..29b26c94f8
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
+import
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
+import org.w3c.dom.Document;
+
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+
+public class KafkaMigratorMain {
+
+ private static void printUsage() {
+ System.out.println("This application replaces Kafka processors from
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
+ " in a flow.xml.gz file.");
+ System.out.println("\n");
+ System.out.println("Usage: kafka-migrator.sh -i <path to input
flow.xml.gz> -o <path to output flow.xml.gz>" +
+ " -t <use transaction true or false>\noptional: -k <comma
separated kafka brokers in <host>:<port> format. " +
+ "Required for version 0.8 processors only>");
+ }
+
+ public static void main(final String[] args) throws Exception {
+ if (showingUsageNeeded(args)) {
+ printUsage();
+ return;
+ }
+
+ String input = "";
+ if (args[0].equalsIgnoreCase("-i")) {
+ input = args[1];
+ }
+
+ String output = "";
+ if (args[2].equalsIgnoreCase("-o")) {
+ output = args[3];
+ }
+ if (input.equalsIgnoreCase(output)) {
+ System.out.println("Input and output files should be different.");
+ return;
+ }
+
+ String transaction = "";
+ if (args[4].equalsIgnoreCase("-t")) {
+ transaction = args[5];
+ }
+
+ if (!(transaction.equalsIgnoreCase("true") ||
transaction.equalsIgnoreCase("false"))) {
+ System.out.println("Transaction argument should be either true or
false.");
+ return;
+ }
+
+ String kafkaBrokers = "";
+ if (args.length == 8) {
+ if (args[6].equalsIgnoreCase("-k") && args[7].matches(".+:\\d+")) {
+ kafkaBrokers = args[7];
+ } else {
+ System.out.println("Kafka Brokers must be in a <host>:<port>
format, can be separated by comma. " +
+ "For example: hostname:1234, host:5678");
+ return;
+ }
+ }
+
+ final MigratorConfigurationBuilder configurationBuilder = new
MigratorConfigurationBuilder();
+ configurationBuilder.setKafkaBrokers(kafkaBrokers)
+ .setTransaction(Boolean.parseBoolean(transaction));
+
+ final InputStream fileStream = Files.newInputStream(Paths.get(input));
+ final OutputStream outputStream =
Files.newOutputStream(Paths.get(output));
+ final InputStream gzipStream = new GZIPInputStream(fileStream);
+ final OutputStream gzipOutStream = new GZIPOutputStream(outputStream);
+
+ System.out.println("Using flow=" + input);
+
+ try {
+ final DocumentProvider documentProvider = new
StandardDocumentProvider();
+ final Document document = documentProvider.parse(gzipStream);
+
+ final KafkaFlowMigrationService flowMigrationService = new
KafkaFlowMigrationService();
+ final KafkaTemplateMigrationService templateMigrationService = new
KafkaTemplateMigrationService();
+
+ System.out.println("Replacing processors.");
+ flowMigrationService.replaceKafkaProcessors(document,
configurationBuilder);
+ templateMigrationService.replaceKafkaProcessors(document,
configurationBuilder);
+
+ final StreamResult streamResult = new StreamResult(gzipOutStream);
+ final StandardTransformProvider transformProvider = new
StandardTransformProvider();
+ transformProvider.setIndent(true);
+ transformProvider.transform(new DOMSource(document), streamResult);
+ System.out.println("Replacing completed.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Exception occurred while attempting to parse
flow.xml.gz. Cause: " + e.getCause());
+ } finally {
+ gzipOutStream.close();
+ outputStream.close();
+ gzipStream.close();
+ fileStream.close();
+ }
+ }
+
+ private static boolean showingUsageNeeded(String[] args) {
+ return args.length < 6 || args[0].equalsIgnoreCase("-h") ||
args[0].equalsIgnoreCase("--help");
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
new file mode 100644
index 0000000000..3ffc33d0db
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.ProcessorDescriptor;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
+
+public class MigratorConfiguration {
+ final private String kafkaBrokers;
+ final private boolean transaction;
+ final private boolean isVersion8Processor;
+ final private ProcessorDescriptor processorDescriptor;
+ final private PropertyXpathDescriptor propertyXpathDescriptor;
+
+ public MigratorConfiguration(final String kafkaBrokers, final boolean
transaction, final boolean isVersion8Processor,
+ final ProcessorDescriptor
processorDescriptor, final PropertyXpathDescriptor propertyXpathDescriptor) {
+ this.kafkaBrokers = kafkaBrokers;
+ this.transaction = transaction;
+ this.isVersion8Processor = isVersion8Processor;
+ this.processorDescriptor = processorDescriptor;
+ this.propertyXpathDescriptor = propertyXpathDescriptor;
+ }
+
+ public String getKafkaBrokers() {
+ return kafkaBrokers;
+ }
+
+ public boolean isTransaction() {
+ return transaction;
+ }
+
+ public boolean isVersion8Processor() {
+ return isVersion8Processor;
+ }
+
+ public ProcessorDescriptor getProcessorDescriptor() {
+ return processorDescriptor;
+ }
+
+ public PropertyXpathDescriptor getPropertyXpathDescriptor() {
+ return propertyXpathDescriptor;
+ }
+
+ public static class MigratorConfigurationBuilder {
+ private String kafkaBrokers;
+ private boolean transaction;
+ private boolean isVersion8Processor;
+ private ProcessorDescriptor processorDescriptor;
+ private PropertyXpathDescriptor propertyXpathDescriptor;
+
+ public MigratorConfigurationBuilder setKafkaBrokers(final String
kafkaBrokers) {
+ this.kafkaBrokers = kafkaBrokers;
+ return this;
+ }
+
+ public MigratorConfigurationBuilder setTransaction(final boolean
transaction) {
+ this.transaction = transaction;
+ return this;
+ }
+
+ public MigratorConfigurationBuilder setIsVersion8Processor(final
boolean isVersion8Processor) {
+ this.isVersion8Processor = isVersion8Processor;
+ return this;
+ }
+
+ public MigratorConfigurationBuilder setProcessorDescriptor(final
ProcessorDescriptor processorDescriptor) {
+ this.processorDescriptor = processorDescriptor;
+ return this;
+ }
+
+ public MigratorConfigurationBuilder setPropertyXpathDescriptor(final
PropertyXpathDescriptor propertyXpathDescriptor) {
+ this.propertyXpathDescriptor = propertyXpathDescriptor;
+ return this;
+ }
+
+ public MigratorConfiguration build() {
+ return new MigratorConfiguration(kafkaBrokers, transaction,
isVersion8Processor,
+ processorDescriptor,
propertyXpathDescriptor);
+ }
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
new file mode 100644
index 0000000000..ad0aa453b0
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlowPropertyXpathDescriptor implements PropertyXpathDescriptor {
+
+ private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
+ private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
+ private static final Map<KafkaProcessorType, Map<String, String>>
TRANSACTION_PROPERTIES;
+ static {
+ CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
+ CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"property[name=\"honor-transactions\"]/value");
+ CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName",
"honor-transactions");
+ PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
+ PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"property[name=\"use-transactions\"]/value");
+ PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName",
"use-transactions");
+ TRANSACTION_PROPERTIES = new HashMap<>();
+ TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_TRANSACTION_PROPERTIES);
+ TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_TRANSACTION_PROPERTIES);
+ }
+
+ private final KafkaProcessorType processorType;
+
+ public FlowPropertyXpathDescriptor(final KafkaProcessorType processorType)
{
+ this.processorType = processorType;
+ }
+
+ @Override
+ public String getXpathForProperties() {
+ return "property";
+ }
+
+ @Override
+ public String getPropertyKeyTagName() {
+ return "name";
+ }
+
+ @Override
+ public String getPropertyTagName() {
+ return "property";
+ }
+
+ @Override
+ public String getXpathForTransactionProperty() {
+ return
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
+ }
+
+ @Override
+ public String getTransactionTagName() {
+ return
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
new file mode 100644
index 0000000000..11f507aaec
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaProcessorDescriptor implements ProcessorDescriptor {
+ private static final Map<String, String>
CONSUME_KAFKA_PROCESSOR_PROPERTIES;
+ private static final Map<String, String> CONSUME_PROPERTIES_TO_BE_SAVED;
+ private static final Map<String, String>
PUBLISH_KAFKA_PROCESSOR_PROPERTIES;
+ private static final Map<String, String> PUBLISH_PROPERTIES_TO_BE_SAVED;
+ private static final Map<String, String> CONTROLLER_SERVICES;
+ private static final Map<KafkaProcessorType, Map<String, String>>
PROPERTIES;
+ private static final Map<KafkaProcessorType, Map<String, String>>
PROPERTIES_TO_BE_SAVED;
+
+ static {
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol",
"PLAINTEXT");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name",
null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service",
null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal",
null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic_type", "names");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("group.id", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("auto.offset.reset", "latest");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding",
"utf-8");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("separate-by-key", "false");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding",
"UTF-8");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("header-name-regex", null);
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max.poll.records", "10000");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max-uncommit-offset-wait", "1
secs");
+ CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("Communications Timeout", "60
secs");
+
+ CONSUME_PROPERTIES_TO_BE_SAVED = new HashMap<>();
+ CONSUME_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
+ CONSUME_PROPERTIES_TO_BE_SAVED.put("Group ID", "group.id");
+
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol",
"PLAINTEXT");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name",
null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service",
null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal",
null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("acks", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("Failure Strategy", "Route to
Failure");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("transactional-id-prefix",
null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("attribute-name-regex", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding",
"UTF-8");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kafka-key", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding",
"utf-8");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.request.size", "1 MB");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ack.wait.time", "5 secs");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.block.ms", "5 sec");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partition", null);
+ PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("compression.type", null);
+
+ PUBLISH_PROPERTIES_TO_BE_SAVED = new HashMap<>();
+ PUBLISH_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
+ PUBLISH_PROPERTIES_TO_BE_SAVED.put("Partition", "partition");
+ PUBLISH_PROPERTIES_TO_BE_SAVED.put("Kafka Key", "kafka-key");
+ PUBLISH_PROPERTIES_TO_BE_SAVED.put("Delivery Guarantee", "acks");
+ PUBLISH_PROPERTIES_TO_BE_SAVED.put( "Compression Codec",
"compression.type");
+
+ CONTROLLER_SERVICES = new HashMap<>();
+ CONTROLLER_SERVICES.put("kerberos-credentials-service",
"org.apache.nifi.kerberos.KerberosCredentialsService");
+ CONTROLLER_SERVICES.put("ssl.context.service",
"org.apache.nifi.ssl.SSLContextService");
+
+ PROPERTIES = new HashMap<>();
+ PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_KAFKA_PROCESSOR_PROPERTIES);
+ PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_KAFKA_PROCESSOR_PROPERTIES);
+
+ PROPERTIES_TO_BE_SAVED = new HashMap<>();
+ PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.CONSUME,
CONSUME_PROPERTIES_TO_BE_SAVED);
+ PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.PUBLISH,
PUBLISH_PROPERTIES_TO_BE_SAVED);
+ }
+
+ private final KafkaProcessorType processorType;
+
+ public KafkaProcessorDescriptor(final KafkaProcessorType processorType) {
+ this.processorType = processorType;
+ }
+
+ @Override
+ public Map<String, String> getProcessorProperties() {
+ return Collections.unmodifiableMap(PROPERTIES.get(processorType));
+ }
+
+ @Override
+ public Map<String, String> getPropertiesToBeSaved() {
+ return
Collections.unmodifiableMap(PROPERTIES_TO_BE_SAVED.get(processorType));
+ }
+
+ @Override
+ public Map<String, String> getControllerServicesForTemplates() {
+ return Collections.unmodifiableMap(CONTROLLER_SERVICES);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
new file mode 100644
index 0000000000..e5f8a5dde7
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+public enum KafkaProcessorType {
+ PUBLISH("Publish"),
+ CONSUME("Consume"),
+ PUT("Put");
+
+ private final String processorType;
+
+ KafkaProcessorType(String processorType) {
+ this.processorType = processorType;
+ }
+
+ public String getProcessorType() {
+ return processorType;
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
new file mode 100644
index 0000000000..9a5f798b7d
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.Map;
+
+public interface ProcessorDescriptor {
+ Map<String, String> getProcessorProperties();
+ Map<String, String> getPropertiesToBeSaved();
+ Map<String, String> getControllerServicesForTemplates();
+
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
new file mode 100644
index 0000000000..ffb266f8c5
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+public interface PropertyXpathDescriptor {
+ String getXpathForProperties();
+ String getPropertyKeyTagName();
+ String getPropertyTagName();
+ String getXpathForTransactionProperty();
+ String getTransactionTagName();
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
new file mode 100644
index 0000000000..3276439375
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.descriptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TemplatePropertyXpathDescriptor implements
PropertyXpathDescriptor {
+
+ private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
+ private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
+ private static final Map<KafkaProcessorType, Map<String, String>>
TRANSACTION_PROPERTIES;
+ static {
+ CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
+ CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"entry[key=\"honor-transactions\"]/value");
+ CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName",
"honor-transactions");
+ PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
+ PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"entry[key=\"use-transactions\"]/value");
+ PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName",
"use-transactions");
+ TRANSACTION_PROPERTIES = new HashMap<>();
+ TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_TRANSACTION_PROPERTIES);
+ TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_TRANSACTION_PROPERTIES);
+ }
+
+ private final KafkaProcessorType processorType;
+
+ public TemplatePropertyXpathDescriptor(final KafkaProcessorType
processorType) {
+ this.processorType = processorType;
+ }
+
+ @Override
+ public String getXpathForProperties() {
+ return "entry";
+ }
+
+ @Override
+ public String getPropertyKeyTagName() {
+ return "key";
+ }
+
+ @Override
+ public String getPropertyTagName() {
+ return "entry";
+ }
+
+ @Override
+ public String getXpathForTransactionProperty() {
+ return
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
+ }
+
+ @Override
+ public String getTransactionTagName() {
+ return
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
new file mode 100644
index 0000000000..ec02bff12e
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractKafkaMigrator implements Migrator {
+ static final XPath XPATH = XPathFactory.newInstance().newXPath();
+ private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0";
+ private final static String ARTIFACT = "nifi-kafka-2-0-nar";
+ private final static String PATH_FOR_ARTIFACT = "bundle/artifact";
+
+ final boolean isVersion8Processor;
+ final boolean isKafkaBrokersPresent;
+
+ final Map<String, String> kafkaProcessorProperties;
+ final Map<String, String> propertiesToBeSaved;
+ final Map<String, String> controllerServices;
+
+ final String xpathForProperties;
+ final String propertyKeyTagName;
+ final String propertyTagName;
+
+ final String xpathForTransactionProperty;
+ final String transactionTagName;
+ final boolean transaction;
+
+
+ public AbstractKafkaMigrator(final MigratorConfiguration configuration) {
+ final String kafkaBrokers = configuration.getKafkaBrokers();
+ this.isKafkaBrokersPresent = !kafkaBrokers.isEmpty();
+ this.isVersion8Processor = configuration.isVersion8Processor();
+ this.kafkaProcessorProperties = new
HashMap<>(configuration.getProcessorDescriptor().getProcessorProperties());
+ this.propertiesToBeSaved =
configuration.getProcessorDescriptor().getPropertiesToBeSaved();
+ this.controllerServices =
configuration.getProcessorDescriptor().getControllerServicesForTemplates();
+ this.xpathForProperties =
configuration.getPropertyXpathDescriptor().getXpathForProperties();
+ this.propertyKeyTagName =
configuration.getPropertyXpathDescriptor().getPropertyKeyTagName();
+ this.propertyTagName =
configuration.getPropertyXpathDescriptor().getPropertyTagName();
+ this.xpathForTransactionProperty =
configuration.getPropertyXpathDescriptor().getXpathForTransactionProperty();
+ this.transactionTagName =
configuration.getPropertyXpathDescriptor().getTransactionTagName();
+ this.transaction = configuration.isTransaction();
+
+ if (isKafkaBrokersPresent) {
+ kafkaProcessorProperties.put("bootstrap.servers", kafkaBrokers);
+ }
+ }
+
+ @Override
+ public void configureProperties(final Node node) throws
XPathExpressionException {
+ if (isVersion8Processor && isKafkaBrokersPresent) {
+ final NodeList properties = (NodeList)
XPATH.evaluate(xpathForProperties, node, XPathConstants.NODESET);
+ for (int i = 0; i < properties.getLength(); i++) {
+ final Node property = properties.item(i);
+ saveRequiredProperties(property);
+ removeElement(node, property);
+ }
+ addNewProperties(node);
+ }
+ }
+
+ @Override
+ public void configureDescriptors(final Node node) throws
XPathExpressionException {
+ if(isVersion8Processor && isKafkaBrokersPresent) {
+ final Element descriptorElement = (Element)
XPATH.evaluate("config/descriptors", node, XPathConstants.NODE);
+ final NodeList descriptors = (NodeList) XPATH.evaluate("entry",
descriptorElement, XPathConstants.NODESET);
+ for (int i = 0; i < descriptors.getLength(); i++) {
+ final Node descriptor = descriptors.item(i);
+ removeElement(descriptorElement, descriptor);
+ }
+ addNewDescriptors(descriptorElement);
+ }
+ }
+
+ @Override
+ public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
+ final String transactionString = Boolean.toString(transaction);
+ final Element transactionsElement = (Element)
XPATH.evaluate(xpathForTransactionProperty, node, XPathConstants.NODE);
+
+ if (transactionsElement != null) {
+ transactionsElement.setTextContent(transactionString);
+ } else {
+ addNewProperty(node, transactionTagName, transactionString);
+ }
+
+ kafkaProcessorProperties.put(transactionTagName, transactionString);
+ }
+
+ public void replaceClassName(final Element className) {
+ final String processorName =
StringUtils.substringAfterLast(className.getTextContent(), ".");
+ final String newClassName =
replaceClassNameWithNewProcessorName(className.getTextContent(), processorName);
+ className.setTextContent(newClassName);
+ }
+
+ public void replaceArtifact(final Node processor) throws
XPathExpressionException {
+ ((Element) XPATH.evaluate(PATH_FOR_ARTIFACT, processor,
XPathConstants.NODE)).setTextContent(ARTIFACT);
+ }
+
+ private static String replaceClassNameWithNewProcessorName(final String
className, final String processorName) {
+ final String newProcessorName = StringUtils.replaceEach(processorName,
new String[]{"Get", "Put"}, new String[]{"pubsub.Consume", "pubsub.Publish"});
+ final String processorNameWithNewVersion =
+ newProcessorName.replaceFirst("$|_0_1\\d?",
NEW_KAFKA_PROCESSOR_VERSION);
+ return StringUtils.replace(className, processorName,
processorNameWithNewVersion);
+ }
+
+ private void addNewDescriptors(final Node node) {
+ for (String key: kafkaProcessorProperties.keySet()) {
+ final Element descriptorElement =
node.getOwnerDocument().createElement("entry");
+ node.appendChild(descriptorElement);
+
+ final Element descriptorKeyElement =
descriptorElement.getOwnerDocument().createElement("key");
+ descriptorKeyElement.setTextContent(key);
+ descriptorElement.appendChild(descriptorKeyElement);
+
+ final Element descriptorValueElement =
descriptorElement.getOwnerDocument().createElement("value");
+ descriptorElement.appendChild(descriptorValueElement);
+
+ final Element descriptorNameElement =
descriptorValueElement.getOwnerDocument().createElement("name");
+ descriptorNameElement.setTextContent(key);
+ descriptorValueElement.appendChild(descriptorNameElement);
+
+ if (controllerServices.containsKey(key)) {
+ final Element controllerServiceElement =
descriptorValueElement.getOwnerDocument().createElement("identifiesControllerService");
+
controllerServiceElement.setTextContent(controllerServices.get(key));
+ descriptorValueElement.appendChild(controllerServiceElement);
+ }
+ }
+ }
+
+ private void saveRequiredProperties(final Node property) throws
XPathExpressionException {
+ final String propertyToBeSaved =
propertiesToBeSaved.get(XPATH.evaluate(propertyKeyTagName, property));
+
+ if (propertyToBeSaved != null) {
+ String propertyValue = XPATH.evaluate("value", property);
+ kafkaProcessorProperties.put(propertyToBeSaved,
convert(propertyValue));
+ }
+ }
+
+ private String convert(final String propertyValue) {
+ return propertyValue.isEmpty() ? null : propertyValue;
+ }
+
+ private void addNewProperties(final Node node) {
+ for (Map.Entry<String, String> entry :
kafkaProcessorProperties.entrySet()) {
+ addNewProperty(node, entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void addNewProperty(final Node node, final String key, final
String value) {
+ final Element propertyElement =
node.getOwnerDocument().createElement(propertyTagName);
+ node.appendChild(propertyElement);
+
+ final Element propertyKeyElement =
propertyElement.getOwnerDocument().createElement(propertyKeyTagName);
+ propertyKeyElement.setTextContent(key);
+
+ propertyElement.appendChild(propertyKeyElement);
+
+ if (value != null) {
+ final Element propertyValueElement =
propertyElement.getOwnerDocument().createElement("value");
+ propertyValueElement.setTextContent(value);
+
+ propertyElement.appendChild(propertyValueElement);
+ }
+ }
+
+ private void removeElement(final Node node, final Node element) {
+ node.removeChild(element);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
new file mode 100644
index 0000000000..022c2db933
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+
+public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
+
+ public ConsumeKafkaFlowMigrator(final MigratorConfiguration configuration)
{
+ super(configuration);
+ }
+
+ @Override
+ public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
+ configureProperties(processor);
+ configureComponentSpecificSteps(processor);
+ replaceClassName(className);
+ replaceArtifact(processor);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
new file mode 100644
index 0000000000..3abb118a3c
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class ConsumeKafkaTemplateMigrator extends AbstractKafkaMigrator {
+
+ public ConsumeKafkaTemplateMigrator(final MigratorConfiguration
configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public void configureProperties(final Node node) throws
XPathExpressionException {
+ final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+ super.configureProperties(propertyElement);
+ }
+
+ @Override
+ public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
+ final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+ super.configureComponentSpecificSteps(propertyElement);
+ }
+
+ @Override
+ public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
+ configureProperties(processor);
+ configureComponentSpecificSteps(processor);
+ configureDescriptors(processor);
+ replaceClassName(className);
+ replaceArtifact(processor);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
new file mode 100644
index 0000000000..cf41451099
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathExpressionException;
+
+public interface Migrator {
+ void configureProperties(final Node node) throws XPathExpressionException;
+ void configureDescriptors(final Node node) throws XPathExpressionException;
+ void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException;
+ void migrate(final Element className, final Node node) throws
XPathExpressionException;
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
new file mode 100644
index 0000000000..7ef865d25c
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class PublishKafkaFlowMigrator extends AbstractKafkaMigrator {
+
+ public PublishKafkaFlowMigrator(final MigratorConfiguration configuration)
{
+ super(configuration);
+ }
+
+ @Override
+ public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
+ final Element deliveryGuaranteeValue = (Element)
XPATH.evaluate("property[name=\"acks\"]/value", node, XPathConstants.NODE);
+ if (this.transaction && deliveryGuaranteeValue != null) {
+ deliveryGuaranteeValue.setTextContent("all");
+ }
+ super.configureComponentSpecificSteps(node);
+ }
+
+ @Override
+ public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
+ configureProperties(processor);
+ configureComponentSpecificSteps(processor);
+ replaceClassName(className);
+ replaceArtifact(processor);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
new file mode 100644
index 0000000000..7b0538462a
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.migrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+
+public class PublishKafkaTemplateMigrator extends AbstractKafkaMigrator {
+
+ public PublishKafkaTemplateMigrator(final MigratorConfiguration
configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public void configureProperties(final Node node) throws
XPathExpressionException {
+ final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+ super.configureProperties(propertyElement);
+ }
+
+ @Override
+ public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
+ //add value if null
+ final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
+ final Element deliveryGuaranteeValue = (Element)
XPATH.evaluate("entry[key=\"acks\"]/value", propertyElement,
XPathConstants.NODE);
+ if (this.transaction && deliveryGuaranteeValue != null) {
+ deliveryGuaranteeValue.setTextContent("all");
+ }
+ super.configureComponentSpecificSteps(propertyElement);
+ }
+
+ @Override
+ public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
+ configureProperties(processor);
+ configureComponentSpecificSteps(processor);
+ configureDescriptors(processor);
+ replaceClassName(className);
+ replaceArtifact(processor);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
new file mode 100644
index 0000000000..f2cb98d19b
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
+
+
+public class KafkaFlowMigrationService implements KafkaMigrationService {
+ private static final String XPATH_FOR_PROCESSORS_IN_FLOW = ".//processor";
+ private static final String CLASS_TAG_NAME = "class";
+
+ public KafkaFlowMigrationService() {
+ }
+
+ @Override
+ public String getPathForProcessors() {
+ return XPATH_FOR_PROCESSORS_IN_FLOW;
+ }
+
+ @Override
+ public String getPathForClass() {
+ return CLASS_TAG_NAME;
+ }
+
+ @Override
+ public Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
+
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+ .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+ return new PublishKafkaFlowMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
+
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+ .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+ return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
+ configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+ .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+ return new PublishKafkaFlowMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
+ configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+ .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+ return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
new file mode 100644
index 0000000000..1428002077
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+
+public interface KafkaMigrationService {
+
+ String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES =
"(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?";
+ boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
+ boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE;
+
+ String getPathForProcessors();
+ String getPathForClass();
+ Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder);
+ Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder);
+ Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder);
+ Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder);
+
+ default void replaceKafkaProcessors(final Document document, final
MigratorConfigurationBuilder configurationBuilder) throws
XPathExpressionException {
+ Migrator migrator;
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+
+ final NodeList processors = (NodeList)
xPath.evaluate(getPathForProcessors(), document, XPathConstants.NODESET);
+ for (int i = 0; i < processors.getLength(); i++) {
+ final Node processor = processors.item(i);
+ final Element className = ((Element)
xPath.evaluate(getPathForClass(), processor, XPathConstants.NODE));
+ final String processorName =
StringUtils.substringAfterLast(className.getTextContent(), ".");
+
+ if (processorName.matches(REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES)) {
+ if
(processorName.contains(KafkaProcessorType.PUBLISH.getProcessorType())) {
+ migrator = createPublishMigrator(configurationBuilder);
+ } else if
(processorName.contains(KafkaProcessorType.PUT.getProcessorType())) {
+ migrator =
createVersionEightPublishMigrator(configurationBuilder);
+ } else if
(processorName.contains(KafkaProcessorType.CONSUME.getProcessorType())) {
+ migrator = createConsumeMigrator(configurationBuilder);
+ } else {
+ migrator =
createVersionEightConsumeMigrator(configurationBuilder);
+ }
+
+ migrator.migrate(className, processor);
+ }
+ }
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
new file mode 100644
index 0000000000..fa4b06d00a
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator.service;
+
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
+import
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
+
+public class KafkaTemplateMigrationService implements KafkaMigrationService {
+ private static final String XPATH_FOR_PROCESSORS_IN_TEMPLATE =
".//processors";
+ private static final String TYPE_TAG_NAME = "type";
+
+ public KafkaTemplateMigrationService() {
+ }
+
+ @Override
+ public String getPathForProcessors() {
+ return XPATH_FOR_PROCESSORS_IN_TEMPLATE;
+ }
+
+ @Override
+ public String getPathForClass() {
+ return TYPE_TAG_NAME;
+ }
+
+ @Override
+ public Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
+
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+ .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+ return new PublishKafkaTemplateMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
+
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+ .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+ return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
+ configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
+ .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
+ return new PublishKafkaTemplateMigrator(configurationBuilder.build());
+ }
+
+ @Override
+ public Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
+ configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
+ .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
+ return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
new file mode 100644
index 0000000000..5c3a54fb9c
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
+import
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class KafkaMigrationServiceTest {
+
+ private static final List<String> EXPECTED_CLASS_OR_TYPE_NAMES =
+
Arrays.asList("org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0",
+
"org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0",
+ "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0");
+
+ private static final List<String> EXPECTED_ARTIFACTS =
+ Arrays.asList("nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar",
"nifi-kafka-2-0-nar");
+
+ private static final MigratorConfigurationBuilder CONFIGURATION_BUILDER =
+ new MigratorConfigurationBuilder().setKafkaBrokers("kafkaBrokers,
localhost:1234")
+ .setTransaction(Boolean.FALSE);
+ private static final XPath XPATH = XPathFactory.newInstance().newXPath();
+ private static final String PATH_FOR_PROCESSORS_IN_FLOWS = ".//processor";
+ private static final String PATH_FOR_PROCESSORS_IN_TEMPLATES =
".//processors";
+ private static final String PATH_FOR_CLASS_ELEMENT = "class";
+ private static final String PATH_FOR_TYPE_ELEMENT = "type";
+ private static final String PATH_FOR_ARTIFACT_ELEMENT = "bundle/artifact";
+
+
+ @Test
+ public void testClassReplacement() throws XPathExpressionException,
IOException {
+ final KafkaFlowMigrationService kafkaMigrationService = new
KafkaFlowMigrationService();
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final List<String> originalClassNames =
createClassResultList(document);
+
+ kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
+ final List<String> actualClassNames = createClassResultList(document);
+
+ assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualClassNames,
originalClassNames);
+ }
+
+ @Test
+ public void testTypeReplacement() throws XPathExpressionException,
IOException {
+ final KafkaTemplateMigrationService kafkaMigrationService = new
KafkaTemplateMigrationService();
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final List<String> originalTypeNames = createTypeResultList(document);
+
+ kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
+ final List<String> actualTypeNames = createTypeResultList(document);
+
+ assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualTypeNames,
originalTypeNames);
+ }
+
+ @Test
+ public void testArtifactReplacementInTemplate() throws
XPathExpressionException, IOException {
+ final KafkaTemplateMigrationService kafkaMigrationService = new
KafkaTemplateMigrationService();
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final List<String> originalArtifacts =
createArtifactResultListForTemplate(document);
+
+ kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
+ final List<String> actualArtifacts =
createArtifactResultListForTemplate(document);
+
+ assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
+ }
+
+ @Test
+ public void testArtifactReplacementInFlow() throws
XPathExpressionException, IOException {
+ final KafkaFlowMigrationService kafkaMigrationService = new
KafkaFlowMigrationService();
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final List<String> originalArtifacts =
createArtifactResultListForFlow(document);
+
+ kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
+ final List<String> actualArtifacts =
createArtifactResultListForFlow(document);
+
+ assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
+ }
+
+ private List<String> createClassResultList(final Document document) throws
XPathExpressionException {
+ return createProcessorResultListForFlow(document,
PATH_FOR_CLASS_ELEMENT);
+ }
+
+ private List<String> createArtifactResultListForFlow(final Document
document) throws XPathExpressionException {
+ return createProcessorResultListForFlow(document,
PATH_FOR_ARTIFACT_ELEMENT);
+ }
+
+ private List<String> createTypeResultList(final Document document) throws
XPathExpressionException {
+ return createProcessorResultListForTemplate(document,
PATH_FOR_TYPE_ELEMENT);
+ }
+
+ private List<String> createArtifactResultListForTemplate(final Document
document) throws XPathExpressionException {
+ return createProcessorResultListForTemplate(document,
PATH_FOR_ARTIFACT_ELEMENT);
+ }
+
+ private List<String> createProcessorResultListForFlow(final Document
document, final String elementPath) throws XPathExpressionException {
+ return createProcessorResultList(document,
PATH_FOR_PROCESSORS_IN_FLOWS, elementPath);
+ }
+
+ private List<String> createProcessorResultListForTemplate(final Document
document, final String elementPath) throws XPathExpressionException {
+ return createProcessorResultList(document,
PATH_FOR_PROCESSORS_IN_TEMPLATES, elementPath);
+ }
+
+ private List<String> createProcessorResultList(final Document document,
final String processorPath, final String elementPath) throws
XPathExpressionException {
+ final List<String> resultList = new ArrayList<>();
+ final NodeList processors = (NodeList) XPATH.evaluate(processorPath,
document, XPathConstants.NODESET);
+ for (int i = 0; i < processors.getLength(); i++) {
+ resultList.add(XPATH.evaluate(elementPath, processors.item(i)));
+ }
+ return resultList;
+ }
+
+ private void assertSuccess(final List<String> expectedArtifacts, final
List<String> actualArtifacts, final List<String> originalArtifacts) {
+ assertArrayEquals(expectedArtifacts.toArray(),
actualArtifacts.toArray());
+ assertNoReplacementHappened(originalArtifacts, actualArtifacts);
+ assertReplacementHappened(originalArtifacts, actualArtifacts);
+ }
+
+ private void assertNoReplacementHappened(final List<String>
originalArtifacts, final List<String> actualArtifacts) {
+ assertEquals(originalArtifacts.get(0), actualArtifacts.get(0));
+ }
+
+ private void assertReplacementHappened(final List<String>
originalArtifacts, final List<String> actualArtifacts) {
+ assertNotEquals(originalArtifacts.get(1), actualArtifacts.get(1));
+ assertNotEquals(originalArtifacts.get(2), actualArtifacts.get(2));
+ }
+}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
new file mode 100644
index 0000000000..b095e1cbc4
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import org.apache.nifi.xml.processing.parsers.DocumentProvider;
+import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
+import org.w3c.dom.Document;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class KafkaMigrationUtil {
+ public static Document parseDocument() throws IOException {
+ final DocumentProvider documentProvider = new
StandardDocumentProvider();
+ return
documentProvider.parse(Files.newInputStream(Paths.get("src/test/resources/flow.xml")));
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
new file mode 100644
index 0000000000..6394516ece
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.kafkamigrator;
+
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
+import
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
+import
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
+import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
+import
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
+import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaMigratorTest {
+ private static final XPath XPATH = XPathFactory.newInstance().newXPath();
+ private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW =
".//processor[class='org.apache.nifi.processors.kafka.PutKafka']";
+ private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE =
".//processors[type='org.apache.nifi.processors.kafka.PutKafka']";
+
+ private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW =
".//processor[class='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
+ private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE =
".//processors[type='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
+ private static final KafkaProcessorDescriptor
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR = new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH);
+ private static final boolean WITH_TRANSACTION = Boolean.TRUE;
+ private static final boolean WITHOUT_TRANSACTION = Boolean.FALSE;
+ private static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
+ private static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR =
Boolean.FALSE;
+ private static final String FLOW = "Flow";
+ private static final String TEMPLATE = "Template";
+
+ @Test
+ public void testPropertiesRemoved() throws XPathExpressionException,
IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
+ final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureProperties(processor);
+
+ assertPropertyRemoveSuccess(processor);
+ }
+
+ @Test
+ public void testPropertiesAdded() throws XPathExpressionException,
IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION,
IS_VERSION_EIGHT_PROCESSOR,KafkaProcessorType.PUBLISH, FLOW);
+ final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureProperties(processor);
+
+ assertPropertyAddSuccess(processor);
+ }
+
+ @Test
+ public void testPropertiesSaved() throws XPathExpressionException,
IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
+ final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+ final List<String> oldValues = getOldValues(processor);
+
+ flowMigrator.configureProperties(processor);
+
+ final List<String> newValues = getNewValues(processor);
+ assertEquals(oldValues, newValues);
+ }
+
+ @Test
+ public void testDescriptorsAdded() throws XPathExpressionException,
IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
+ final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureDescriptors(processor);
+
+ assertDescriptorAddSuccess(processor);
+ }
+
+ @Test
+ public void testDescriptorsRemoved() throws XPathExpressionException,
IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
+ final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureDescriptors(processor);
+
+ assertDescriptorRemoveSuccess(processor);
+ }
+
+ @Test
+ public void testTransactionFlowPropertyForConsumeProcessorWithTrue()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, FLOW);
+ final ConsumeKafkaFlowMigrator flowMigrator = new
ConsumeKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("true",
XPATH.evaluate("property[name='honor-transactions']/value", processor));
+ }
+
+ @Test
+ public void testTransactionTemplatePropertyForConsumeProcessorWithTrue()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, TEMPLATE);
+ final ConsumeKafkaTemplateMigrator templateMigrator = new
ConsumeKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("true",
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value",
processor));
+ }
+
+ @Test
+ public void testTransactionFlowPropertyForConsumeProcessorWithFalse()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, FLOW);
+ final ConsumeKafkaFlowMigrator flowMigrator = new
ConsumeKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("false",
XPATH.evaluate("property[name='honor-transactions']/value", processor));
+ }
+
+ @Test
+ public void testTransactionTemplatePropertyForConsumeProcessorWithFalse()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, TEMPLATE);
+ final ConsumeKafkaTemplateMigrator templateMigrator = new
ConsumeKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("false",
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value",
processor));
+ }
+
+ @Test
+ public void testTransactionFlowPropertyForPublishProcessorWithTrue()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
+ final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("true",
XPATH.evaluate("property[name='use-transactions']/value", processor));
+ assertEquals("", XPATH.evaluate("property[name='acks']/value",
processor));
+
+ }
+
+ @Test
+ public void testTransactionTemplatePropertyForPublishProcessorWithTrue()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
+ final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("true",
XPATH.evaluate("config/properties/entry[key='use-transactions']/value",
processor));
+ assertEquals("",
XPATH.evaluate("config/properties/entry[key='acks']/value", processor));
+ }
+
+ @Test
+ public void testTransactionFlowPropertyForPublishProcessorWithFalse()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
+ final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
+
+ flowMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("false",
XPATH.evaluate("property[name='use-transactions']/value", processor));
+ }
+
+ @Test
+ public void testTransactionTemplatePropertyForPublishProcessorWithFalse()
throws XPathExpressionException, IOException {
+ final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
+ final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
+ final Document document = KafkaMigrationUtil.parseDocument();
+ final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
+
+ templateMigrator.configureComponentSpecificSteps(processor);
+
+ assertEquals("false",
XPATH.evaluate("config/properties/entry[key='use-transactions']/value",
processor));
+ }
+
+
+ private List<String> getValues(final Collection<String> properties, final
Node node) throws XPathExpressionException {
+ final List<String> result = new ArrayList<>();
+ for (String propertyName : properties) {
+
result.add(XPATH.evaluate(String.format("property[name='%s']/value",
propertyName), node));
+ }
+ return result;
+ }
+
+ private List<String> getOldValues(final Node node) throws
XPathExpressionException {
+ return
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().keySet(),
node);
+ }
+
+ private List<String> getNewValues(final Node node) throws
XPathExpressionException {
+ return
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().values(),
node);
+ }
+
+ private void assertPropertyRemoveSuccess(final Node node) throws
XPathExpressionException {
+ assertTrue(XPATH.evaluate("property[name='Known Brokers']",
node).isEmpty());
+ }
+
+ private void assertDescriptorRemoveSuccess(final Node node) throws
XPathExpressionException {
+ assertTrue(XPATH.evaluate("config/descriptors/entry[key='Known
Brokers']", node).isEmpty());
+ }
+
+ private void assertAddSuccess(final String xpath, final Node node) throws
XPathExpressionException {
+ for (String propertyName:
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getProcessorProperties().keySet()) {
+ assertFalse(XPATH.evaluate(String.format(xpath, propertyName),
node).isEmpty());
+ }
+ }
+ private void assertPropertyAddSuccess(final Node node) throws
XPathExpressionException {
+ assertAddSuccess("property[name='%s']/name", node);
+ }
+
+ private void assertDescriptorAddSuccess(final Node node) throws
XPathExpressionException {
+ assertAddSuccess("config/descriptors/entry[key='%s']/key", node);
+ }
+
+ private MigratorConfiguration getConfiguration(final boolean transaction,
final boolean isVersion8Processor,
+ final KafkaProcessorType
processorType, final String migrationType) {
+ final MigratorConfigurationBuilder configurationBuilder = new
MigratorConfigurationBuilder();
+ final PropertyXpathDescriptor propertyXpathDescriptor;
+
+ if (migrationType.equalsIgnoreCase("Flow")) {
+ propertyXpathDescriptor = new
FlowPropertyXpathDescriptor(processorType);
+ } else {
+ propertyXpathDescriptor= new
TemplatePropertyXpathDescriptor(processorType);
+ }
+
+ return configurationBuilder.setKafkaBrokers("kafkaBrokers,
localhost:1234")
+ .setTransaction(transaction)
+
.setIsVersion8Processor(isVersion8Processor)
+ .setProcessorDescriptor(new
KafkaProcessorDescriptor(processorType))
+
.setPropertyXpathDescriptor(propertyXpathDescriptor)
+ .build();
+ }
+}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
new file mode 100644
index 0000000000..b1cc3b5216
--- /dev/null
+++ b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
@@ -0,0 +1,136 @@
+<rootGroup>
+ <processGroup>
+ <processor>
+
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</class>
+ <bundle>
+ <group>org.apache.nifi</group>
+ <artifact>nifi-kafka-2-0-nar</artifact>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <property>
+ <name>bootstrap.servers</name>
+ <value>localhost:9092</value>
+ </property>
+ </processor>
+ <processor>
+
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</class>
+ <bundle>
+ <group>org.apache.nifi</group>
+ <artifact>nifi-kafka-0-10-nar</artifact>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <property>
+ <name>bootstrap.servers</name>
+ <value>localhost:9092</value>
+ </property>
+ </processor>
+ <processor>
+ <class>org.apache.nifi.processors.kafka.PutKafka</class>
+ <bundle>
+ <group>org.apache.nifi</group>
+ <artifact>nifi-kafka-0-8-nar</artifact>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <property>
+ <name>Known Brokers</name>
+ </property>
+ <property>
+ <name>Topic Name</name>
+ <value>test-topic</value>
+ </property>
+ <property>
+ <name>Partition</name>
+ <value>test-partition</value>
+ </property>
+ <property>
+ <name>Kafka Key</name>
+ <value>kafka-key</value>
+ </property>
+ <property>
+ <name>Delivery Guarantee</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>Compression Codec</name>
+ <value>gzip</value>
+ </property>
+ </processor>
+ </processGroup>
+ <template encoding-version="1.3">
+ <snippet>
+ <processGroups>
+ <contents>
+ <processors>
+ <bundle>
+ <artifact>nifi-kafka-2-0-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <config>
+ <descriptors>
+ <entry>
+ <key>Known Brokers</key>
+ <value>
+ <name>Known Brokers</name>
+ </value>
+ </entry>
+ </descriptors>
+ <properties>
+ <entry>
+ <key>Known Brokers</key>
+ </entry>
+ </properties>
+ </config>
+
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</type>
+ </processors>
+ <processors>
+ <bundle>
+ <artifact>nifi-kafka-0-10-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <config>
+ <descriptors>
+ <entry>
+ <key>Known Brokers</key>
+ <value>
+ <name>Known Brokers</name>
+ </value>
+ </entry>
+ </descriptors>
+ <properties>
+ <entry>
+ <key>Known Brokers</key>
+ </entry>
+ </properties>
+ </config>
+
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</type>
+ </processors>
+ <processors>
+ <bundle>
+ <artifact>nifi-kafka-0-8-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.13.2.2.1.2.0-283</version>
+ </bundle>
+ <config>
+ <descriptors>
+ <entry>
+ <key>Known Brokers</key>
+ <value>
+ <name>Known Brokers</name>
+ </value>
+ </entry>
+ </descriptors>
+ <properties>
+ <entry>
+ <key>Known Brokers</key>
+ </entry>
+ </properties>
+ </config>
+ <type>org.apache.nifi.processors.kafka.PutKafka</type>
+ </processors>
+ </contents>
+ </processGroups>
+ </snippet>
+ </template>
+</rootGroup>
\ No newline at end of file
diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml
index 206d24b7a9..3f64eecde1 100644
--- a/nifi-toolkit/pom.xml
+++ b/nifi-toolkit/pom.xml
@@ -33,6 +33,7 @@
<module>nifi-toolkit-flowanalyzer</module>
<module>nifi-toolkit-cli</module>
<module>nifi-toolkit-api</module>
+ <module>nifi-toolkit-kafka-migrator</module>
</modules>
<properties>
<toolkit.groovy.version>3.0.8</toolkit.groovy.version>