This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c19ec90030 Revert "NIFI-11044 Script/commands to migrate Kafka
processors"
c19ec90030 is described below
commit c19ec90030ac401315d425a2fb26e6bf3ecf8e66
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Feb 14 20:06:06 2023 +0100
Revert "NIFI-11044 Script/commands to migrate Kafka processors"
This reverts commit 00985edd803b06ace9a12b9fd19f29585e191330.
Reason for reverting: 0.x Kafka processors are not present in NiFi 2.x.
The migration tool needs to be added on the 1.x branch only.
---
nifi-docs/src/main/asciidoc/toolkit-guide.adoc | 69 +----
nifi-toolkit/nifi-toolkit-assembly/pom.xml | 5 -
.../src/main/resources/bin/kafka-migrator.bat | 41 ---
.../src/main/resources/bin/kafka-migrator.sh | 119 ---------
nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml | 51 ----
.../toolkit/kafkamigrator/KafkaMigratorMain.java | 130 ----------
.../kafkamigrator/MigratorConfiguration.java | 95 -------
.../descriptor/FlowPropertyXpathDescriptor.java | 69 -----
.../descriptor/KafkaProcessorDescriptor.java | 128 ----------
.../descriptor/KafkaProcessorType.java | 33 ---
.../descriptor/ProcessorDescriptor.java | 26 --
.../descriptor/PropertyXpathDescriptor.java | 25 --
.../TemplatePropertyXpathDescriptor.java | 69 -----
.../migrator/AbstractKafkaMigrator.java | 193 --------------
.../migrator/ConsumeKafkaFlowMigrator.java | 38 ---
.../migrator/ConsumeKafkaTemplateMigrator.java | 52 ----
.../toolkit/kafkamigrator/migrator/Migrator.java | 29 ---
.../migrator/PublishKafkaFlowMigrator.java | 48 ----
.../migrator/PublishKafkaTemplateMigrator.java | 57 -----
.../service/KafkaFlowMigrationService.java | 76 ------
.../service/KafkaMigrationService.java | 72 ------
.../service/KafkaTemplateMigrationService.java | 75 ------
.../kafkamigrator/KafkaMigrationServiceTest.java | 155 ------------
.../toolkit/kafkamigrator/KafkaMigrationUtil.java | 32 ---
.../toolkit/kafkamigrator/KafkaMigratorTest.java | 278 ---------------------
.../src/test/resources/flow.xml | 136 ----------
nifi-toolkit/pom.xml | 1 -
27 files changed, 1 insertion(+), 2101 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
index 4c05dd6980..6e24472adf 100644
--- a/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/toolkit-guide.adoc
@@ -1583,71 +1583,4 @@ NOTE: As of NiFi 1.10.x, because of an upgrade to
ZooKeeper 3.5.x, the migrator
* For a ZooKeeper using Kerberos for authentication:
** `zk-migrator.sh -s -z
destinationHostname:destinationClientPort/destinationRootPath/components -k
/path/to/jaasconfig/jaas-config.conf -f /path/to/export/zk-source-data.json`
-6. Once the migration has completed successfully, start the processors in the
NiFi flow. Processing should continue from the point at which it was stopped
when the NiFi flow was stopped.
-
-[[kafka_migrator]]
-== Kafka Processor Migrator
-With NiFi version 1.15.3, Kafka processor versions 0.8, 0.9, 0.10 and 0.11
were removed.
-In large flows having many numbers of components it is challenging to replace
these processors manually.
-This tool can be used to update a flow in an automated way.
-
-=== Usage
-Running the script requires 3 mandatory and 1 optional parameters:
-
-* Input file, the full path of the flow.xml.gz in which the replacement is
required.
-* Output file, the full path of the file where the results should be saved.
-* Transaction, whether the new processors should be configured with or without
transaction usage.
-* Optional: Kafka Brokers, a comma separated list of Kafka Brokers in
<host>:<port> format.
-
-Different input and output files must be used.
-Kafka Broker argument can be omitted if flow does not contain GetKafka or
PutKafka processors.
-
-1. Run script, a possible example:
-
- ./bin/kafka-migrator.sh -i "/tmp/flow/flow.xml.gz" -o
"/tmp/flow/flow_result.xml.gz" -t false -k
"mykafkaserver1:1234,mykafkaserver2:1235"
-
-2. Rename flow_result.xml.gz file to flow.xml.gz, do not overwrite your input
file.
-3. Copy flow.xml.gz file to all the NiFi nodes conf directory
-4. Start NiFi
-5. Verify the results.
-
-=== Expected Behaviour
-* Flow replacement:
-* For all replaced processors:
-** changing class and artifact
-** configure transaction as true
-*** 'Delivery Guarantee' property will be set to 'Replicated'
-*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in
the file they will be set to true
-*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be
translated as true in NiFi
-** configure transaction as false
-*** 'Delivery Guarantee' property will keep its original setting.
-*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
-* For version 0.8 processors (when kafka broker list argument provided)
-** remove all version 0.8 properties
-** add version 2.0 properties with default value except for 'Topic Name',
'Group ID',
-'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
-'Compression Codec' values which will be copied over
-
-* Template replacement:
-* For all replaced processors:
-** changing type and artifact
-** configure transaction as true
-*** 'Delivery Guarantee' property will be set to 'Replicated'
-*** if 'Honor-Transactions' and 'Use-Transactions' properties are present in
the file they will be set to true
-*** if 'Honor-Transactions' and 'Use-Transactions' not present they will be
translated as true in NiFi
-** configure transaction as false
-*** 'Delivery Guarantee' property will keep its original setting.
-*** 'Honor-Transactions' and 'Use-Transactions' will be set to false
-* For version 0.8 processors (when kafka broker list argument provided)
-** remove all version 0.8 properties and descriptors
-** add version 2.0 properties with default value except for 'Topic Name',
'Group ID',
-'Partition', 'Kafka Key', 'Delivery Guarantee' (if transaction false) and
-'Compression Codec' values which will be copied over
-*** add version 2.0 descriptors
-
-=== Limitations
-* All deprecated Kafka processors will be replaced with version 2.0 processors.
-* Script will not rename the processors, only their type will be changed.
-* Transaction setting will be applied to all the replaced processors.
-* The flow.xml.gz file needs to be fit in memory and process time depends on
the file size.
-* Processors in templates will be replaced as well, please download the
original templates if desired.
\ No newline at end of file
+6. Once the migration has completed successfully, start the processors in the
NiFi flow. Processing should continue from the point at which it was stopped
when the NiFi flow was stopped.
\ No newline at end of file
diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
index d3fde95ace..8b858102dc 100644
--- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml
@@ -93,11 +93,6 @@ language governing permissions and limitations under the
License. -->
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-toolkit-kafka-migrator</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-cli</artifactId>
diff --git
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
deleted file mode 100644
index 36fa74c722..0000000000
---
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.bat
+++ /dev/null
@@ -1,41 +0,0 @@
-@echo off
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
-rem Use JAVA_HOME if it's set; otherwise, just use java
-
-if "%JAVA_HOME%" == "" goto noJavaHome
-if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
-set JAVA_EXE=%JAVA_HOME%\bin\java.exe
-goto startConfig
-
-:noJavaHome
-echo The JAVA_HOME environment variable is not defined correctly.
-echo Instead the PATH will be used to find the java executable.
-echo.
-set JAVA_EXE=java
-goto startConfig
-
-:startConfig
-set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
-
-if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms12m -Xmx24m
-
-SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS%
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain
-
-cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
-
diff --git
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
deleted file mode 100755
index c4925ff87b..0000000000
---
a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/kafka-migrator.sh
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-# Script structure inspired from Apache Karaf and other Apache projects with
similar startup approaches
-
-SCRIPT_DIR=$(dirname "$0")
-SCRIPT_NAME=$(basename "$0")
-NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
-PROGNAME=$(basename "$0")
-
-
-warn() {
- (>&2 echo "${PROGNAME}: $*")
-}
-
-die() {
- warn "$*"
- exit 1
-}
-
-detectOS() {
- # OS specific support (must be 'true' or 'false').
- cygwin=false;
- aix=false;
- os400=false;
- darwin=false;
- case "$(uname)" in
- CYGWIN*)
- cygwin=true
- ;;
- AIX*)
- aix=true
- ;;
- OS400*)
- os400=true
- ;;
- Darwin)
- darwin=true
- ;;
- esac
- # For AIX, set an environment variable
- if ${aix}; then
- export LDR_CNTRL=MAXDATA=0xB0000000@DSA
- echo ${LDR_CNTRL}
- fi
-}
-
-locateJava() {
- # Setup the Java Virtual Machine
- if $cygwin ; then
- [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
- [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
- fi
-
- if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=$(java-config --jre-home)
- fi
- if [ "x${JAVA}" = "x" ]; then
- if [ "x${JAVA_HOME}" != "x" ]; then
- if [ ! -d "${JAVA_HOME}" ]; then
- die "JAVA_HOME is not valid: ${JAVA_HOME}"
- fi
- JAVA="${JAVA_HOME}/bin/java"
- else
- warn "JAVA_HOME not set; results may vary"
- JAVA=$(type java)
- JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
- if [ "x${JAVA}" = "x" ]; then
- die "java command not found"
- fi
- fi
- fi
-}
-
-init() {
- # Determine if there is special OS handling we must perform
- detectOS
-
- # Locate the Java VM to execute
- locateJava "$1"
-}
-
-run() {
- LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
-
- sudo_cmd_prefix=""
- if $cygwin; then
- NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
- CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows
"${LIBS}")"
- else
- CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
- fi
-
- export JAVA_HOME="$JAVA_HOME"
- export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
-
- umask 0077
- exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms12m -Xmx24m}
org.apache.nifi.toolkit.kafkamigrator.KafkaMigratorMain "$@"
-}
-
-
-init "$1"
-run "$@"
diff --git a/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
b/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
deleted file mode 100644
index 8fd37c4ea3..0000000000
--- a/nifi-toolkit/nifi-toolkit-kafka-migrator/pom.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-toolkit</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>nifi-toolkit-kafka-migrator</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-xml-processing</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- </dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes combine.children="append">
- <exclude>src/test/resources/flow.xml</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
deleted file mode 100644
index 29b26c94f8..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorMain.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
-import
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
-import org.apache.nifi.xml.processing.parsers.DocumentProvider;
-import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
-import org.apache.nifi.xml.processing.transform.StandardTransformProvider;
-import org.w3c.dom.Document;
-
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-
-public class KafkaMigratorMain {
-
- private static void printUsage() {
- System.out.println("This application replaces Kafka processors from
version 0.8, 0.9, 0.10 and 0.11 to version 2.0 processors" +
- " in a flow.xml.gz file.");
- System.out.println("\n");
- System.out.println("Usage: kafka-migrator.sh -i <path to input
flow.xml.gz> -o <path to output flow.xml.gz>" +
- " -t <use transaction true or false>\noptional: -k <comma
separated kafka brokers in <host>:<port> format. " +
- "Required for version 0.8 processors only>");
- }
-
- public static void main(final String[] args) throws Exception {
- if (showingUsageNeeded(args)) {
- printUsage();
- return;
- }
-
- String input = "";
- if (args[0].equalsIgnoreCase("-i")) {
- input = args[1];
- }
-
- String output = "";
- if (args[2].equalsIgnoreCase("-o")) {
- output = args[3];
- }
- if (input.equalsIgnoreCase(output)) {
- System.out.println("Input and output files should be different.");
- return;
- }
-
- String transaction = "";
- if (args[4].equalsIgnoreCase("-t")) {
- transaction = args[5];
- }
-
- if (!(transaction.equalsIgnoreCase("true") ||
transaction.equalsIgnoreCase("false"))) {
- System.out.println("Transaction argument should be either true or
false.");
- return;
- }
-
- String kafkaBrokers = "";
- if (args.length == 8) {
- if (args[6].equalsIgnoreCase("-k") && args[7].matches(".+:\\d+")) {
- kafkaBrokers = args[7];
- } else {
- System.out.println("Kafka Brokers must be in a <host>:<port>
format, can be separated by comma. " +
- "For example: hostname:1234, host:5678");
- return;
- }
- }
-
- final MigratorConfigurationBuilder configurationBuilder = new
MigratorConfigurationBuilder();
- configurationBuilder.setKafkaBrokers(kafkaBrokers)
- .setTransaction(Boolean.parseBoolean(transaction));
-
- final InputStream fileStream = Files.newInputStream(Paths.get(input));
- final OutputStream outputStream =
Files.newOutputStream(Paths.get(output));
- final InputStream gzipStream = new GZIPInputStream(fileStream);
- final OutputStream gzipOutStream = new GZIPOutputStream(outputStream);
-
- System.out.println("Using flow=" + input);
-
- try {
- final DocumentProvider documentProvider = new
StandardDocumentProvider();
- final Document document = documentProvider.parse(gzipStream);
-
- final KafkaFlowMigrationService flowMigrationService = new
KafkaFlowMigrationService();
- final KafkaTemplateMigrationService templateMigrationService = new
KafkaTemplateMigrationService();
-
- System.out.println("Replacing processors.");
- flowMigrationService.replaceKafkaProcessors(document,
configurationBuilder);
- templateMigrationService.replaceKafkaProcessors(document,
configurationBuilder);
-
- final StreamResult streamResult = new StreamResult(gzipOutStream);
- final StandardTransformProvider transformProvider = new
StandardTransformProvider();
- transformProvider.setIndent(true);
- transformProvider.transform(new DOMSource(document), streamResult);
- System.out.println("Replacing completed.");
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("Exception occurred while attempting to parse
flow.xml.gz. Cause: " + e.getCause());
- } finally {
- gzipOutStream.close();
- outputStream.close();
- gzipStream.close();
- fileStream.close();
- }
- }
-
- private static boolean showingUsageNeeded(String[] args) {
- return args.length < 6 || args[0].equalsIgnoreCase("-h") ||
args[0].equalsIgnoreCase("--help");
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
deleted file mode 100644
index 3ffc33d0db..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/MigratorConfiguration.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.descriptor.ProcessorDescriptor;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
-
-public class MigratorConfiguration {
- final private String kafkaBrokers;
- final private boolean transaction;
- final private boolean isVersion8Processor;
- final private ProcessorDescriptor processorDescriptor;
- final private PropertyXpathDescriptor propertyXpathDescriptor;
-
- public MigratorConfiguration(final String kafkaBrokers, final boolean
transaction, final boolean isVersion8Processor,
- final ProcessorDescriptor
processorDescriptor, final PropertyXpathDescriptor propertyXpathDescriptor) {
- this.kafkaBrokers = kafkaBrokers;
- this.transaction = transaction;
- this.isVersion8Processor = isVersion8Processor;
- this.processorDescriptor = processorDescriptor;
- this.propertyXpathDescriptor = propertyXpathDescriptor;
- }
-
- public String getKafkaBrokers() {
- return kafkaBrokers;
- }
-
- public boolean isTransaction() {
- return transaction;
- }
-
- public boolean isVersion8Processor() {
- return isVersion8Processor;
- }
-
- public ProcessorDescriptor getProcessorDescriptor() {
- return processorDescriptor;
- }
-
- public PropertyXpathDescriptor getPropertyXpathDescriptor() {
- return propertyXpathDescriptor;
- }
-
- public static class MigratorConfigurationBuilder {
- private String kafkaBrokers;
- private boolean transaction;
- private boolean isVersion8Processor;
- private ProcessorDescriptor processorDescriptor;
- private PropertyXpathDescriptor propertyXpathDescriptor;
-
- public MigratorConfigurationBuilder setKafkaBrokers(final String
kafkaBrokers) {
- this.kafkaBrokers = kafkaBrokers;
- return this;
- }
-
- public MigratorConfigurationBuilder setTransaction(final boolean
transaction) {
- this.transaction = transaction;
- return this;
- }
-
- public MigratorConfigurationBuilder setIsVersion8Processor(final
boolean isVersion8Processor) {
- this.isVersion8Processor = isVersion8Processor;
- return this;
- }
-
- public MigratorConfigurationBuilder setProcessorDescriptor(final
ProcessorDescriptor processorDescriptor) {
- this.processorDescriptor = processorDescriptor;
- return this;
- }
-
- public MigratorConfigurationBuilder setPropertyXpathDescriptor(final
PropertyXpathDescriptor propertyXpathDescriptor) {
- this.propertyXpathDescriptor = propertyXpathDescriptor;
- return this;
- }
-
- public MigratorConfiguration build() {
- return new MigratorConfiguration(kafkaBrokers, transaction,
isVersion8Processor,
- processorDescriptor,
propertyXpathDescriptor);
- }
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
deleted file mode 100644
index ad0aa453b0..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/FlowPropertyXpathDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlowPropertyXpathDescriptor implements PropertyXpathDescriptor {
-
- private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
- private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
- private static final Map<KafkaProcessorType, Map<String, String>>
TRANSACTION_PROPERTIES;
- static {
- CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
- CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"property[name=\"honor-transactions\"]/value");
- CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName",
"honor-transactions");
- PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
- PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"property[name=\"use-transactions\"]/value");
- PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName",
"use-transactions");
- TRANSACTION_PROPERTIES = new HashMap<>();
- TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_TRANSACTION_PROPERTIES);
- TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_TRANSACTION_PROPERTIES);
- }
-
- private final KafkaProcessorType processorType;
-
- public FlowPropertyXpathDescriptor(final KafkaProcessorType processorType)
{
- this.processorType = processorType;
- }
-
- @Override
- public String getXpathForProperties() {
- return "property";
- }
-
- @Override
- public String getPropertyKeyTagName() {
- return "name";
- }
-
- @Override
- public String getPropertyTagName() {
- return "property";
- }
-
- @Override
- public String getXpathForTransactionProperty() {
- return
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
- }
-
- @Override
- public String getTransactionTagName() {
- return
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
deleted file mode 100644
index 11f507aaec..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorDescriptor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KafkaProcessorDescriptor implements ProcessorDescriptor {
- private static final Map<String, String>
CONSUME_KAFKA_PROCESSOR_PROPERTIES;
- private static final Map<String, String> CONSUME_PROPERTIES_TO_BE_SAVED;
- private static final Map<String, String>
PUBLISH_KAFKA_PROCESSOR_PROPERTIES;
- private static final Map<String, String> PUBLISH_PROPERTIES_TO_BE_SAVED;
- private static final Map<String, String> CONTROLLER_SERVICES;
- private static final Map<KafkaProcessorType, Map<String, String>>
PROPERTIES;
- private static final Map<KafkaProcessorType, Map<String, String>>
PROPERTIES_TO_BE_SAVED;
-
- static {
- CONSUME_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol",
"PLAINTEXT");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name",
null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service",
null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal",
null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("topic_type", "names");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("group.id", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("auto.offset.reset", "latest");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding",
"utf-8");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("separate-by-key", "false");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding",
"UTF-8");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("header-name-regex", null);
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max.poll.records", "10000");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("max-uncommit-offset-wait", "1
secs");
- CONSUME_KAFKA_PROCESSOR_PROPERTIES.put("Communications Timeout", "60
secs");
-
- CONSUME_PROPERTIES_TO_BE_SAVED = new HashMap<>();
- CONSUME_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
- CONSUME_PROPERTIES_TO_BE_SAVED.put("Group ID", "group.id");
-
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES = new HashMap<>();
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("security.protocol",
"PLAINTEXT");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.mechanism", "GSSAPI");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.service.name",
null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kerberos-credentials-service",
null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.principal",
null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.kerberos.keytab", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.username", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.password", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("sasl.token.auth", "false");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ssl.context.service", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("topic", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("acks", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("Failure Strategy", "Route to
Failure");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("transactional-id-prefix",
null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("attribute-name-regex", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-header-encoding",
"UTF-8");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("kafka-key", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("key-attribute-encoding",
"utf-8");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("message-demarcator", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.request.size", "1 MB");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("ack.wait.time", "5 secs");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("max.block.ms", "5 sec");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("partition", null);
- PUBLISH_KAFKA_PROCESSOR_PROPERTIES.put("compression.type", null);
-
- PUBLISH_PROPERTIES_TO_BE_SAVED = new HashMap<>();
- PUBLISH_PROPERTIES_TO_BE_SAVED.put("Topic Name", "topic");
- PUBLISH_PROPERTIES_TO_BE_SAVED.put("Partition", "partition");
- PUBLISH_PROPERTIES_TO_BE_SAVED.put("Kafka Key", "kafka-key");
- PUBLISH_PROPERTIES_TO_BE_SAVED.put("Delivery Guarantee", "acks");
- PUBLISH_PROPERTIES_TO_BE_SAVED.put( "Compression Codec",
"compression.type");
-
- CONTROLLER_SERVICES = new HashMap<>();
- CONTROLLER_SERVICES.put("kerberos-credentials-service",
"org.apache.nifi.kerberos.KerberosCredentialsService");
- CONTROLLER_SERVICES.put("ssl.context.service",
"org.apache.nifi.ssl.SSLContextService");
-
- PROPERTIES = new HashMap<>();
- PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_KAFKA_PROCESSOR_PROPERTIES);
- PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_KAFKA_PROCESSOR_PROPERTIES);
-
- PROPERTIES_TO_BE_SAVED = new HashMap<>();
- PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.CONSUME,
CONSUME_PROPERTIES_TO_BE_SAVED);
- PROPERTIES_TO_BE_SAVED.put(KafkaProcessorType.PUBLISH,
PUBLISH_PROPERTIES_TO_BE_SAVED);
- }
-
- private final KafkaProcessorType processorType;
-
- public KafkaProcessorDescriptor(final KafkaProcessorType processorType) {
- this.processorType = processorType;
- }
-
- @Override
- public Map<String, String> getProcessorProperties() {
- return Collections.unmodifiableMap(PROPERTIES.get(processorType));
- }
-
- @Override
- public Map<String, String> getPropertiesToBeSaved() {
- return
Collections.unmodifiableMap(PROPERTIES_TO_BE_SAVED.get(processorType));
- }
-
- @Override
- public Map<String, String> getControllerServicesForTemplates() {
- return Collections.unmodifiableMap(CONTROLLER_SERVICES);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
deleted file mode 100644
index e5f8a5dde7..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/KafkaProcessorType.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-public enum KafkaProcessorType {
- PUBLISH("Publish"),
- CONSUME("Consume"),
- PUT("Put");
-
- private final String processorType;
-
- KafkaProcessorType(String processorType) {
- this.processorType = processorType;
- }
-
- public String getProcessorType() {
- return processorType;
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
deleted file mode 100644
index 9a5f798b7d..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/ProcessorDescriptor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-import java.util.Map;
-
-public interface ProcessorDescriptor {
- Map<String, String> getProcessorProperties();
- Map<String, String> getPropertiesToBeSaved();
- Map<String, String> getControllerServicesForTemplates();
-
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
deleted file mode 100644
index ffb266f8c5..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/PropertyXpathDescriptor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-public interface PropertyXpathDescriptor {
- String getXpathForProperties();
- String getPropertyKeyTagName();
- String getPropertyTagName();
- String getXpathForTransactionProperty();
- String getTransactionTagName();
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
deleted file mode 100644
index 3276439375..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/descriptor/TemplatePropertyXpathDescriptor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.descriptor;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TemplatePropertyXpathDescriptor implements
PropertyXpathDescriptor {
-
- private static final Map<String, String> CONSUME_TRANSACTION_PROPERTIES;
- private static final Map<String, String> PUBLISH_TRANSACTION_PROPERTIES;
- private static final Map<KafkaProcessorType, Map<String, String>>
TRANSACTION_PROPERTIES;
- static {
- CONSUME_TRANSACTION_PROPERTIES = new HashMap<>();
- CONSUME_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"entry[key=\"honor-transactions\"]/value");
- CONSUME_TRANSACTION_PROPERTIES.put("transactionTagName",
"honor-transactions");
- PUBLISH_TRANSACTION_PROPERTIES = new HashMap<>();
- PUBLISH_TRANSACTION_PROPERTIES.put("xpathForTransactionProperty",
"entry[key=\"use-transactions\"]/value");
- PUBLISH_TRANSACTION_PROPERTIES.put("transactionTagName",
"use-transactions");
- TRANSACTION_PROPERTIES = new HashMap<>();
- TRANSACTION_PROPERTIES.put(KafkaProcessorType.CONSUME,
CONSUME_TRANSACTION_PROPERTIES);
- TRANSACTION_PROPERTIES.put(KafkaProcessorType.PUBLISH,
PUBLISH_TRANSACTION_PROPERTIES);
- }
-
- private final KafkaProcessorType processorType;
-
- public TemplatePropertyXpathDescriptor(final KafkaProcessorType
processorType) {
- this.processorType = processorType;
- }
-
- @Override
- public String getXpathForProperties() {
- return "entry";
- }
-
- @Override
- public String getPropertyKeyTagName() {
- return "key";
- }
-
- @Override
- public String getPropertyTagName() {
- return "entry";
- }
-
- @Override
- public String getXpathForTransactionProperty() {
- return
TRANSACTION_PROPERTIES.get(processorType).get("xpathForTransactionProperty");
- }
-
- @Override
- public String getTransactionTagName() {
- return
TRANSACTION_PROPERTIES.get(processorType).get("transactionTagName");
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
deleted file mode 100644
index ec02bff12e..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/AbstractKafkaMigrator.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class AbstractKafkaMigrator implements Migrator {
- static final XPath XPATH = XPathFactory.newInstance().newXPath();
- private final static String NEW_KAFKA_PROCESSOR_VERSION = "_2_0";
- private final static String ARTIFACT = "nifi-kafka-2-0-nar";
- private final static String PATH_FOR_ARTIFACT = "bundle/artifact";
-
- final boolean isVersion8Processor;
- final boolean isKafkaBrokersPresent;
-
- final Map<String, String> kafkaProcessorProperties;
- final Map<String, String> propertiesToBeSaved;
- final Map<String, String> controllerServices;
-
- final String xpathForProperties;
- final String propertyKeyTagName;
- final String propertyTagName;
-
- final String xpathForTransactionProperty;
- final String transactionTagName;
- final boolean transaction;
-
-
- public AbstractKafkaMigrator(final MigratorConfiguration configuration) {
- final String kafkaBrokers = configuration.getKafkaBrokers();
- this.isKafkaBrokersPresent = !kafkaBrokers.isEmpty();
- this.isVersion8Processor = configuration.isVersion8Processor();
- this.kafkaProcessorProperties = new
HashMap<>(configuration.getProcessorDescriptor().getProcessorProperties());
- this.propertiesToBeSaved =
configuration.getProcessorDescriptor().getPropertiesToBeSaved();
- this.controllerServices =
configuration.getProcessorDescriptor().getControllerServicesForTemplates();
- this.xpathForProperties =
configuration.getPropertyXpathDescriptor().getXpathForProperties();
- this.propertyKeyTagName =
configuration.getPropertyXpathDescriptor().getPropertyKeyTagName();
- this.propertyTagName =
configuration.getPropertyXpathDescriptor().getPropertyTagName();
- this.xpathForTransactionProperty =
configuration.getPropertyXpathDescriptor().getXpathForTransactionProperty();
- this.transactionTagName =
configuration.getPropertyXpathDescriptor().getTransactionTagName();
- this.transaction = configuration.isTransaction();
-
- if (isKafkaBrokersPresent) {
- kafkaProcessorProperties.put("bootstrap.servers", kafkaBrokers);
- }
- }
-
- @Override
- public void configureProperties(final Node node) throws
XPathExpressionException {
- if (isVersion8Processor && isKafkaBrokersPresent) {
- final NodeList properties = (NodeList)
XPATH.evaluate(xpathForProperties, node, XPathConstants.NODESET);
- for (int i = 0; i < properties.getLength(); i++) {
- final Node property = properties.item(i);
- saveRequiredProperties(property);
- removeElement(node, property);
- }
- addNewProperties(node);
- }
- }
-
- @Override
- public void configureDescriptors(final Node node) throws
XPathExpressionException {
- if(isVersion8Processor && isKafkaBrokersPresent) {
- final Element descriptorElement = (Element)
XPATH.evaluate("config/descriptors", node, XPathConstants.NODE);
- final NodeList descriptors = (NodeList) XPATH.evaluate("entry",
descriptorElement, XPathConstants.NODESET);
- for (int i = 0; i < descriptors.getLength(); i++) {
- final Node descriptor = descriptors.item(i);
- removeElement(descriptorElement, descriptor);
- }
- addNewDescriptors(descriptorElement);
- }
- }
-
- @Override
- public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
- final String transactionString = Boolean.toString(transaction);
- final Element transactionsElement = (Element)
XPATH.evaluate(xpathForTransactionProperty, node, XPathConstants.NODE);
-
- if (transactionsElement != null) {
- transactionsElement.setTextContent(transactionString);
- } else {
- addNewProperty(node, transactionTagName, transactionString);
- }
-
- kafkaProcessorProperties.put(transactionTagName, transactionString);
- }
-
- public void replaceClassName(final Element className) {
- final String processorName =
StringUtils.substringAfterLast(className.getTextContent(), ".");
- final String newClassName =
replaceClassNameWithNewProcessorName(className.getTextContent(), processorName);
- className.setTextContent(newClassName);
- }
-
- public void replaceArtifact(final Node processor) throws
XPathExpressionException {
- ((Element) XPATH.evaluate(PATH_FOR_ARTIFACT, processor,
XPathConstants.NODE)).setTextContent(ARTIFACT);
- }
-
- private static String replaceClassNameWithNewProcessorName(final String
className, final String processorName) {
- final String newProcessorName = StringUtils.replaceEach(processorName,
new String[]{"Get", "Put"}, new String[]{"pubsub.Consume", "pubsub.Publish"});
- final String processorNameWithNewVersion =
- newProcessorName.replaceFirst("$|_0_1\\d?",
NEW_KAFKA_PROCESSOR_VERSION);
- return StringUtils.replace(className, processorName,
processorNameWithNewVersion);
- }
-
- private void addNewDescriptors(final Node node) {
- for (String key: kafkaProcessorProperties.keySet()) {
- final Element descriptorElement =
node.getOwnerDocument().createElement("entry");
- node.appendChild(descriptorElement);
-
- final Element descriptorKeyElement =
descriptorElement.getOwnerDocument().createElement("key");
- descriptorKeyElement.setTextContent(key);
- descriptorElement.appendChild(descriptorKeyElement);
-
- final Element descriptorValueElement =
descriptorElement.getOwnerDocument().createElement("value");
- descriptorElement.appendChild(descriptorValueElement);
-
- final Element descriptorNameElement =
descriptorValueElement.getOwnerDocument().createElement("name");
- descriptorNameElement.setTextContent(key);
- descriptorValueElement.appendChild(descriptorNameElement);
-
- if (controllerServices.containsKey(key)) {
- final Element controllerServiceElement =
descriptorValueElement.getOwnerDocument().createElement("identifiesControllerService");
-
controllerServiceElement.setTextContent(controllerServices.get(key));
- descriptorValueElement.appendChild(controllerServiceElement);
- }
- }
- }
-
- private void saveRequiredProperties(final Node property) throws
XPathExpressionException {
- final String propertyToBeSaved =
propertiesToBeSaved.get(XPATH.evaluate(propertyKeyTagName, property));
-
- if (propertyToBeSaved != null) {
- String propertyValue = XPATH.evaluate("value", property);
- kafkaProcessorProperties.put(propertyToBeSaved,
convert(propertyValue));
- }
- }
-
- private String convert(final String propertyValue) {
- return propertyValue.isEmpty() ? null : propertyValue;
- }
-
- private void addNewProperties(final Node node) {
- for (Map.Entry<String, String> entry :
kafkaProcessorProperties.entrySet()) {
- addNewProperty(node, entry.getKey(), entry.getValue());
- }
- }
-
- private void addNewProperty(final Node node, final String key, final
String value) {
- final Element propertyElement =
node.getOwnerDocument().createElement(propertyTagName);
- node.appendChild(propertyElement);
-
- final Element propertyKeyElement =
propertyElement.getOwnerDocument().createElement(propertyKeyTagName);
- propertyKeyElement.setTextContent(key);
-
- propertyElement.appendChild(propertyKeyElement);
-
- if (value != null) {
- final Element propertyValueElement =
propertyElement.getOwnerDocument().createElement("value");
- propertyValueElement.setTextContent(value);
-
- propertyElement.appendChild(propertyValueElement);
- }
- }
-
- private void removeElement(final Node node, final Node element) {
- node.removeChild(element);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
deleted file mode 100644
index 022c2db933..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaFlowMigrator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPathExpressionException;
-
-public class ConsumeKafkaFlowMigrator extends AbstractKafkaMigrator {
-
- public ConsumeKafkaFlowMigrator(final MigratorConfiguration configuration)
{
- super(configuration);
- }
-
- @Override
- public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
- configureProperties(processor);
- configureComponentSpecificSteps(processor);
- replaceClassName(className);
- replaceArtifact(processor);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
deleted file mode 100644
index 3abb118a3c..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/ConsumeKafkaTemplateMigrator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-
-public class ConsumeKafkaTemplateMigrator extends AbstractKafkaMigrator {
-
- public ConsumeKafkaTemplateMigrator(final MigratorConfiguration
configuration) {
- super(configuration);
- }
-
- @Override
- public void configureProperties(final Node node) throws
XPathExpressionException {
- final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
- super.configureProperties(propertyElement);
- }
-
- @Override
- public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
- final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
- super.configureComponentSpecificSteps(propertyElement);
- }
-
- @Override
- public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
- configureProperties(processor);
- configureComponentSpecificSteps(processor);
- configureDescriptors(processor);
- replaceClassName(className);
- replaceArtifact(processor);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
deleted file mode 100644
index cf41451099..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/Migrator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPathExpressionException;
-
-public interface Migrator {
- void configureProperties(final Node node) throws XPathExpressionException;
- void configureDescriptors(final Node node) throws XPathExpressionException;
- void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException;
- void migrate(final Element className, final Node node) throws
XPathExpressionException;
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
deleted file mode 100644
index 7ef865d25c..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaFlowMigrator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-
-public class PublishKafkaFlowMigrator extends AbstractKafkaMigrator {
-
- public PublishKafkaFlowMigrator(final MigratorConfiguration configuration)
{
- super(configuration);
- }
-
- @Override
- public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
- final Element deliveryGuaranteeValue = (Element)
XPATH.evaluate("property[name=\"acks\"]/value", node, XPathConstants.NODE);
- if (this.transaction && deliveryGuaranteeValue != null) {
- deliveryGuaranteeValue.setTextContent("all");
- }
- super.configureComponentSpecificSteps(node);
- }
-
- @Override
- public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
- configureProperties(processor);
- configureComponentSpecificSteps(processor);
- replaceClassName(className);
- replaceArtifact(processor);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
deleted file mode 100644
index 7b0538462a..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/migrator/PublishKafkaTemplateMigrator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.migrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-
-public class PublishKafkaTemplateMigrator extends AbstractKafkaMigrator {
-
- public PublishKafkaTemplateMigrator(final MigratorConfiguration
configuration) {
- super(configuration);
- }
-
- @Override
- public void configureProperties(final Node node) throws
XPathExpressionException {
- final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
- super.configureProperties(propertyElement);
- }
-
- @Override
- public void configureComponentSpecificSteps(final Node node) throws
XPathExpressionException {
- //add value if null
- final Element propertyElement = (Element)
XPATH.evaluate("config/properties", node, XPathConstants.NODE);
- final Element deliveryGuaranteeValue = (Element)
XPATH.evaluate("entry[key=\"acks\"]/value", propertyElement,
XPathConstants.NODE);
- if (this.transaction && deliveryGuaranteeValue != null) {
- deliveryGuaranteeValue.setTextContent("all");
- }
- super.configureComponentSpecificSteps(propertyElement);
- }
-
- @Override
- public void migrate(final Element className, final Node processor) throws
XPathExpressionException {
- configureProperties(processor);
- configureComponentSpecificSteps(processor);
- configureDescriptors(processor);
- replaceClassName(className);
- replaceArtifact(processor);
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
deleted file mode 100644
index f2cb98d19b..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaFlowMigrationService.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.service;
-
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
-import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
-
-
-public class KafkaFlowMigrationService implements KafkaMigrationService {
- private static final String XPATH_FOR_PROCESSORS_IN_FLOW = ".//processor";
- private static final String CLASS_TAG_NAME = "class";
-
- public KafkaFlowMigrationService() {
- }
-
- @Override
- public String getPathForProcessors() {
- return XPATH_FOR_PROCESSORS_IN_FLOW;
- }
-
- @Override
- public String getPathForClass() {
- return CLASS_TAG_NAME;
- }
-
- @Override
- public Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
-
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
- .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
- return new PublishKafkaFlowMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
-
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
- .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
- return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
- configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
- .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
- return new PublishKafkaFlowMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
- configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
- .setPropertyXpathDescriptor(new
FlowPropertyXpathDescriptor(KafkaProcessorType.CONSUME));
- return new ConsumeKafkaFlowMigrator(configurationBuilder.build());
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
deleted file mode 100644
index 1428002077..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaMigrationService.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.service;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-
-public interface KafkaMigrationService {
-
- String REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES =
"(Get|Put|Consume|Publish)Kafka(Record)?(_0_1\\d)?";
- boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
- boolean IS_NOT_VERSION_EIGHT_PROCESSOR = Boolean.FALSE;
-
- String getPathForProcessors();
- String getPathForClass();
- Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder);
- Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder);
- Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder);
- Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder);
-
- default void replaceKafkaProcessors(final Document document, final
MigratorConfigurationBuilder configurationBuilder) throws
XPathExpressionException {
- Migrator migrator;
- final XPath xPath = XPathFactory.newInstance().newXPath();
-
- final NodeList processors = (NodeList)
xPath.evaluate(getPathForProcessors(), document, XPathConstants.NODESET);
- for (int i = 0; i < processors.getLength(); i++) {
- final Node processor = processors.item(i);
- final Element className = ((Element)
xPath.evaluate(getPathForClass(), processor, XPathConstants.NODE));
- final String processorName =
StringUtils.substringAfterLast(className.getTextContent(), ".");
-
- if (processorName.matches(REGEX_FOR_REPLACEABLE_PROCESSOR_NAMES)) {
- if
(processorName.contains(KafkaProcessorType.PUBLISH.getProcessorType())) {
- migrator = createPublishMigrator(configurationBuilder);
- } else if
(processorName.contains(KafkaProcessorType.PUT.getProcessorType())) {
- migrator =
createVersionEightPublishMigrator(configurationBuilder);
- } else if
(processorName.contains(KafkaProcessorType.CONSUME.getProcessorType())) {
- migrator = createConsumeMigrator(configurationBuilder);
- } else {
- migrator =
createVersionEightConsumeMigrator(configurationBuilder);
- }
-
- migrator.migrate(className, processor);
- }
- }
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
deleted file mode 100644
index fa4b06d00a..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/main/java/org/apache/nifi/toolkit/kafkamigrator/service/KafkaTemplateMigrationService.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator.service;
-
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
-import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
-import
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.Migrator;
-import
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
-
-public class KafkaTemplateMigrationService implements KafkaMigrationService {
- private static final String XPATH_FOR_PROCESSORS_IN_TEMPLATE =
".//processors";
- private static final String TYPE_TAG_NAME = "type";
-
- public KafkaTemplateMigrationService() {
- }
-
- @Override
- public String getPathForProcessors() {
- return XPATH_FOR_PROCESSORS_IN_TEMPLATE;
- }
-
- @Override
- public String getPathForClass() {
- return TYPE_TAG_NAME;
- }
-
- @Override
- public Migrator createPublishMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
-
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
- .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
- return new PublishKafkaTemplateMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createConsumeMigrator(final MigratorConfigurationBuilder
configurationBuilder) {
-
configurationBuilder.setIsVersion8Processor(IS_NOT_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
- .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
- return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createVersionEightPublishMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
- configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH))
- .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.PUBLISH));
- return new PublishKafkaTemplateMigrator(configurationBuilder.build());
- }
-
- @Override
- public Migrator createVersionEightConsumeMigrator(final
MigratorConfigurationBuilder configurationBuilder) {
- configurationBuilder.setIsVersion8Processor(IS_VERSION_EIGHT_PROCESSOR)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(KafkaProcessorType.CONSUME))
- .setPropertyXpathDescriptor(new
TemplatePropertyXpathDescriptor(KafkaProcessorType.CONSUME));
- return new ConsumeKafkaTemplateMigrator(configurationBuilder.build());
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
deleted file mode 100644
index 5c3a54fb9c..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationServiceTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator;
-
-import org.apache.nifi.toolkit.kafkamigrator.service.KafkaFlowMigrationService;
-import
org.apache.nifi.toolkit.kafkamigrator.service.KafkaTemplateMigrationService;
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-import org.junit.jupiter.api.Test;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-
-public class KafkaMigrationServiceTest {
-
- private static final List<String> EXPECTED_CLASS_OR_TYPE_NAMES =
-
Arrays.asList("org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0",
-
"org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0",
- "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0");
-
- private static final List<String> EXPECTED_ARTIFACTS =
- Arrays.asList("nifi-kafka-2-0-nar", "nifi-kafka-2-0-nar",
"nifi-kafka-2-0-nar");
-
- private static final MigratorConfigurationBuilder CONFIGURATION_BUILDER =
- new MigratorConfigurationBuilder().setKafkaBrokers("kafkaBrokers,
localhost:1234")
- .setTransaction(Boolean.FALSE);
- private static final XPath XPATH = XPathFactory.newInstance().newXPath();
- private static final String PATH_FOR_PROCESSORS_IN_FLOWS = ".//processor";
- private static final String PATH_FOR_PROCESSORS_IN_TEMPLATES =
".//processors";
- private static final String PATH_FOR_CLASS_ELEMENT = "class";
- private static final String PATH_FOR_TYPE_ELEMENT = "type";
- private static final String PATH_FOR_ARTIFACT_ELEMENT = "bundle/artifact";
-
-
- @Test
- public void testClassReplacement() throws XPathExpressionException,
IOException {
- final KafkaFlowMigrationService kafkaMigrationService = new
KafkaFlowMigrationService();
- final Document document = KafkaMigrationUtil.parseDocument();
- final List<String> originalClassNames =
createClassResultList(document);
-
- kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
- final List<String> actualClassNames = createClassResultList(document);
-
- assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualClassNames,
originalClassNames);
- }
-
- @Test
- public void testTypeReplacement() throws XPathExpressionException,
IOException {
- final KafkaTemplateMigrationService kafkaMigrationService = new
KafkaTemplateMigrationService();
- final Document document = KafkaMigrationUtil.parseDocument();
- final List<String> originalTypeNames = createTypeResultList(document);
-
- kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
- final List<String> actualTypeNames = createTypeResultList(document);
-
- assertSuccess(EXPECTED_CLASS_OR_TYPE_NAMES, actualTypeNames,
originalTypeNames);
- }
-
- @Test
- public void testArtifactReplacementInTemplate() throws
XPathExpressionException, IOException {
- final KafkaTemplateMigrationService kafkaMigrationService = new
KafkaTemplateMigrationService();
- final Document document = KafkaMigrationUtil.parseDocument();
- final List<String> originalArtifacts =
createArtifactResultListForTemplate(document);
-
- kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
- final List<String> actualArtifacts =
createArtifactResultListForTemplate(document);
-
- assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
- }
-
- @Test
- public void testArtifactReplacementInFlow() throws
XPathExpressionException, IOException {
- final KafkaFlowMigrationService kafkaMigrationService = new
KafkaFlowMigrationService();
- final Document document = KafkaMigrationUtil.parseDocument();
- final List<String> originalArtifacts =
createArtifactResultListForFlow(document);
-
- kafkaMigrationService.replaceKafkaProcessors(document,
CONFIGURATION_BUILDER);
- final List<String> actualArtifacts =
createArtifactResultListForFlow(document);
-
- assertSuccess(EXPECTED_ARTIFACTS, actualArtifacts, originalArtifacts);
- }
-
- private List<String> createClassResultList(final Document document) throws
XPathExpressionException {
- return createProcessorResultListForFlow(document,
PATH_FOR_CLASS_ELEMENT);
- }
-
- private List<String> createArtifactResultListForFlow(final Document
document) throws XPathExpressionException {
- return createProcessorResultListForFlow(document,
PATH_FOR_ARTIFACT_ELEMENT);
- }
-
- private List<String> createTypeResultList(final Document document) throws
XPathExpressionException {
- return createProcessorResultListForTemplate(document,
PATH_FOR_TYPE_ELEMENT);
- }
-
- private List<String> createArtifactResultListForTemplate(final Document
document) throws XPathExpressionException {
- return createProcessorResultListForTemplate(document,
PATH_FOR_ARTIFACT_ELEMENT);
- }
-
- private List<String> createProcessorResultListForFlow(final Document
document, final String elementPath) throws XPathExpressionException {
- return createProcessorResultList(document,
PATH_FOR_PROCESSORS_IN_FLOWS, elementPath);
- }
-
- private List<String> createProcessorResultListForTemplate(final Document
document, final String elementPath) throws XPathExpressionException {
- return createProcessorResultList(document,
PATH_FOR_PROCESSORS_IN_TEMPLATES, elementPath);
- }
-
- private List<String> createProcessorResultList(final Document document,
final String processorPath, final String elementPath) throws
XPathExpressionException {
- final List<String> resultList = new ArrayList<>();
- final NodeList processors = (NodeList) XPATH.evaluate(processorPath,
document, XPathConstants.NODESET);
- for (int i = 0; i < processors.getLength(); i++) {
- resultList.add(XPATH.evaluate(elementPath, processors.item(i)));
- }
- return resultList;
- }
-
- private void assertSuccess(final List<String> expectedArtifacts, final
List<String> actualArtifacts, final List<String> originalArtifacts) {
- assertArrayEquals(expectedArtifacts.toArray(),
actualArtifacts.toArray());
- assertNoReplacementHappened(originalArtifacts, actualArtifacts);
- assertReplacementHappened(originalArtifacts, actualArtifacts);
- }
-
- private void assertNoReplacementHappened(final List<String>
originalArtifacts, final List<String> actualArtifacts) {
- assertEquals(originalArtifacts.get(0), actualArtifacts.get(0));
- }
-
- private void assertReplacementHappened(final List<String>
originalArtifacts, final List<String> actualArtifacts) {
- assertNotEquals(originalArtifacts.get(1), actualArtifacts.get(1));
- assertNotEquals(originalArtifacts.get(2), actualArtifacts.get(2));
- }
-}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
deleted file mode 100644
index b095e1cbc4..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigrationUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator;
-
-import org.apache.nifi.xml.processing.parsers.DocumentProvider;
-import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
-import org.w3c.dom.Document;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-public class KafkaMigrationUtil {
- public static Document parseDocument() throws IOException {
- final DocumentProvider documentProvider = new
StandardDocumentProvider();
- return
documentProvider.parse(Files.newInputStream(Paths.get("src/test/resources/flow.xml")));
- }
-}
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
deleted file mode 100644
index 6394516ece..0000000000
---
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/java/org/apache/nifi/toolkit/kafkamigrator/KafkaMigratorTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.toolkit.kafkamigrator;
-
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.FlowPropertyXpathDescriptor;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorDescriptor;
-import org.apache.nifi.toolkit.kafkamigrator.descriptor.KafkaProcessorType;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.PropertyXpathDescriptor;
-import
org.apache.nifi.toolkit.kafkamigrator.descriptor.TemplatePropertyXpathDescriptor;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaFlowMigrator;
-import
org.apache.nifi.toolkit.kafkamigrator.migrator.ConsumeKafkaTemplateMigrator;
-import org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaFlowMigrator;
-import
org.apache.nifi.toolkit.kafkamigrator.migrator.PublishKafkaTemplateMigrator;
-import
org.apache.nifi.toolkit.kafkamigrator.MigratorConfiguration.MigratorConfigurationBuilder;
-import org.junit.jupiter.api.Test;
-import org.w3c.dom.Document;
-import org.w3c.dom.Node;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class KafkaMigratorTest {
- private static final XPath XPATH = XPathFactory.newInstance().newXPath();
- private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW =
".//processor[class='org.apache.nifi.processors.kafka.PutKafka']";
- private static final String XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE =
".//processors[type='org.apache.nifi.processors.kafka.PutKafka']";
-
- private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW =
".//processor[class='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
- private static final String XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE =
".//processors[type='org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10']";
- private static final KafkaProcessorDescriptor
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR = new
KafkaProcessorDescriptor(KafkaProcessorType.PUBLISH);
- private static final boolean WITH_TRANSACTION = Boolean.TRUE;
- private static final boolean WITHOUT_TRANSACTION = Boolean.FALSE;
- private static final boolean IS_VERSION_EIGHT_PROCESSOR = Boolean.TRUE;
- private static final boolean IS_NOT_VERSION_EIGHT_PROCESSOR =
Boolean.FALSE;
- private static final String FLOW = "Flow";
- private static final String TEMPLATE = "Template";
-
- @Test
- public void testPropertiesRemoved() throws XPathExpressionException,
IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
- final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureProperties(processor);
-
- assertPropertyRemoveSuccess(processor);
- }
-
- @Test
- public void testPropertiesAdded() throws XPathExpressionException,
IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION,
IS_VERSION_EIGHT_PROCESSOR,KafkaProcessorType.PUBLISH, FLOW);
- final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureProperties(processor);
-
- assertPropertyAddSuccess(processor);
- }
-
- @Test
- public void testPropertiesSaved() throws XPathExpressionException,
IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
- final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
- final List<String> oldValues = getOldValues(processor);
-
- flowMigrator.configureProperties(processor);
-
- final List<String> newValues = getNewValues(processor);
- assertEquals(oldValues, newValues);
- }
-
- @Test
- public void testDescriptorsAdded() throws XPathExpressionException,
IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
- final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureDescriptors(processor);
-
- assertDescriptorAddSuccess(processor);
- }
-
- @Test
- public void testDescriptorsRemoved() throws XPathExpressionException,
IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
- final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureDescriptors(processor);
-
- assertDescriptorRemoveSuccess(processor);
- }
-
- @Test
- public void testTransactionFlowPropertyForConsumeProcessorWithTrue()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, FLOW);
- final ConsumeKafkaFlowMigrator flowMigrator = new
ConsumeKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("true",
XPATH.evaluate("property[name='honor-transactions']/value", processor));
- }
-
- @Test
- public void testTransactionTemplatePropertyForConsumeProcessorWithTrue()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, TEMPLATE);
- final ConsumeKafkaTemplateMigrator templateMigrator = new
ConsumeKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("true",
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value",
processor));
- }
-
- @Test
- public void testTransactionFlowPropertyForConsumeProcessorWithFalse()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, FLOW);
- final ConsumeKafkaFlowMigrator flowMigrator = new
ConsumeKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("false",
XPATH.evaluate("property[name='honor-transactions']/value", processor));
- }
-
- @Test
- public void testTransactionTemplatePropertyForConsumeProcessorWithFalse()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_NOT_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.CONSUME, TEMPLATE);
- final ConsumeKafkaTemplateMigrator templateMigrator = new
ConsumeKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_CONSUME_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("false",
XPATH.evaluate("config/properties/entry[key='honor-transactions']/value",
processor));
- }
-
- @Test
- public void testTransactionFlowPropertyForPublishProcessorWithTrue()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
- final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("true",
XPATH.evaluate("property[name='use-transactions']/value", processor));
- assertEquals("", XPATH.evaluate("property[name='acks']/value",
processor));
-
- }
-
- @Test
- public void testTransactionTemplatePropertyForPublishProcessorWithTrue()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITH_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
- final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("true",
XPATH.evaluate("config/properties/entry[key='use-transactions']/value",
processor));
- assertEquals("",
XPATH.evaluate("config/properties/entry[key='acks']/value", processor));
- }
-
- @Test
- public void testTransactionFlowPropertyForPublishProcessorWithFalse()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, FLOW);
- final PublishKafkaFlowMigrator flowMigrator = new
PublishKafkaFlowMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_FLOW, document,
XPathConstants.NODE);
-
- flowMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("false",
XPATH.evaluate("property[name='use-transactions']/value", processor));
- }
-
- @Test
- public void testTransactionTemplatePropertyForPublishProcessorWithFalse()
throws XPathExpressionException, IOException {
- final MigratorConfiguration configuration =
getConfiguration(WITHOUT_TRANSACTION, IS_VERSION_EIGHT_PROCESSOR,
KafkaProcessorType.PUBLISH, TEMPLATE);
- final PublishKafkaTemplateMigrator templateMigrator = new
PublishKafkaTemplateMigrator(configuration);
- final Document document = KafkaMigrationUtil.parseDocument();
- final Node processor = (Node)
XPATH.evaluate(XPATH_FOR_PUBLISH_PROCESSOR_IN_TEMPLATE, document,
XPathConstants.NODE);
-
- templateMigrator.configureComponentSpecificSteps(processor);
-
- assertEquals("false",
XPATH.evaluate("config/properties/entry[key='use-transactions']/value",
processor));
- }
-
-
- private List<String> getValues(final Collection<String> properties, final
Node node) throws XPathExpressionException {
- final List<String> result = new ArrayList<>();
- for (String propertyName : properties) {
-
result.add(XPATH.evaluate(String.format("property[name='%s']/value",
propertyName), node));
- }
- return result;
- }
-
- private List<String> getOldValues(final Node node) throws
XPathExpressionException {
- return
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().keySet(),
node);
- }
-
- private List<String> getNewValues(final Node node) throws
XPathExpressionException {
- return
getValues(PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getPropertiesToBeSaved().values(),
node);
- }
-
- private void assertPropertyRemoveSuccess(final Node node) throws
XPathExpressionException {
- assertTrue(XPATH.evaluate("property[name='Known Brokers']",
node).isEmpty());
- }
-
- private void assertDescriptorRemoveSuccess(final Node node) throws
XPathExpressionException {
- assertTrue(XPATH.evaluate("config/descriptors/entry[key='Known
Brokers']", node).isEmpty());
- }
-
- private void assertAddSuccess(final String xpath, final Node node) throws
XPathExpressionException {
- for (String propertyName:
PUBLISH_KAFKA_PROCESSOR_DESCRIPTOR.getProcessorProperties().keySet()) {
- assertFalse(XPATH.evaluate(String.format(xpath, propertyName),
node).isEmpty());
- }
- }
- private void assertPropertyAddSuccess(final Node node) throws
XPathExpressionException {
- assertAddSuccess("property[name='%s']/name", node);
- }
-
- private void assertDescriptorAddSuccess(final Node node) throws
XPathExpressionException {
- assertAddSuccess("config/descriptors/entry[key='%s']/key", node);
- }
-
- private MigratorConfiguration getConfiguration(final boolean transaction,
final boolean isVersion8Processor,
- final KafkaProcessorType
processorType, final String migrationType) {
- final MigratorConfigurationBuilder configurationBuilder = new
MigratorConfigurationBuilder();
- final PropertyXpathDescriptor propertyXpathDescriptor;
-
- if (migrationType.equalsIgnoreCase("Flow")) {
- propertyXpathDescriptor = new
FlowPropertyXpathDescriptor(processorType);
- } else {
- propertyXpathDescriptor= new
TemplatePropertyXpathDescriptor(processorType);
- }
-
- return configurationBuilder.setKafkaBrokers("kafkaBrokers,
localhost:1234")
- .setTransaction(transaction)
-
.setIsVersion8Processor(isVersion8Processor)
- .setProcessorDescriptor(new
KafkaProcessorDescriptor(processorType))
-
.setPropertyXpathDescriptor(propertyXpathDescriptor)
- .build();
- }
-}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
b/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
deleted file mode 100644
index b1cc3b5216..0000000000
--- a/nifi-toolkit/nifi-toolkit-kafka-migrator/src/test/resources/flow.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<rootGroup>
- <processGroup>
- <processor>
-
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</class>
- <bundle>
- <group>org.apache.nifi</group>
- <artifact>nifi-kafka-2-0-nar</artifact>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <property>
- <name>bootstrap.servers</name>
- <value>localhost:9092</value>
- </property>
- </processor>
- <processor>
-
<class>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</class>
- <bundle>
- <group>org.apache.nifi</group>
- <artifact>nifi-kafka-0-10-nar</artifact>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <property>
- <name>bootstrap.servers</name>
- <value>localhost:9092</value>
- </property>
- </processor>
- <processor>
- <class>org.apache.nifi.processors.kafka.PutKafka</class>
- <bundle>
- <group>org.apache.nifi</group>
- <artifact>nifi-kafka-0-8-nar</artifact>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <property>
- <name>Known Brokers</name>
- </property>
- <property>
- <name>Topic Name</name>
- <value>test-topic</value>
- </property>
- <property>
- <name>Partition</name>
- <value>test-partition</value>
- </property>
- <property>
- <name>Kafka Key</name>
- <value>kafka-key</value>
- </property>
- <property>
- <name>Delivery Guarantee</name>
- <value>1</value>
- </property>
- <property>
- <name>Compression Codec</name>
- <value>gzip</value>
- </property>
- </processor>
- </processGroup>
- <template encoding-version="1.3">
- <snippet>
- <processGroups>
- <contents>
- <processors>
- <bundle>
- <artifact>nifi-kafka-2-0-nar</artifact>
- <group>org.apache.nifi</group>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <config>
- <descriptors>
- <entry>
- <key>Known Brokers</key>
- <value>
- <name>Known Brokers</name>
- </value>
- </entry>
- </descriptors>
- <properties>
- <entry>
- <key>Known Brokers</key>
- </entry>
- </properties>
- </config>
-
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0</type>
- </processors>
- <processors>
- <bundle>
- <artifact>nifi-kafka-0-10-nar</artifact>
- <group>org.apache.nifi</group>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <config>
- <descriptors>
- <entry>
- <key>Known Brokers</key>
- <value>
- <name>Known Brokers</name>
- </value>
- </entry>
- </descriptors>
- <properties>
- <entry>
- <key>Known Brokers</key>
- </entry>
- </properties>
- </config>
-
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10</type>
- </processors>
- <processors>
- <bundle>
- <artifact>nifi-kafka-0-8-nar</artifact>
- <group>org.apache.nifi</group>
- <version>1.13.2.2.1.2.0-283</version>
- </bundle>
- <config>
- <descriptors>
- <entry>
- <key>Known Brokers</key>
- <value>
- <name>Known Brokers</name>
- </value>
- </entry>
- </descriptors>
- <properties>
- <entry>
- <key>Known Brokers</key>
- </entry>
- </properties>
- </config>
- <type>org.apache.nifi.processors.kafka.PutKafka</type>
- </processors>
- </contents>
- </processGroups>
- </snippet>
- </template>
-</rootGroup>
\ No newline at end of file
diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml
index 3f64eecde1..206d24b7a9 100644
--- a/nifi-toolkit/pom.xml
+++ b/nifi-toolkit/pom.xml
@@ -33,7 +33,6 @@
<module>nifi-toolkit-flowanalyzer</module>
<module>nifi-toolkit-cli</module>
<module>nifi-toolkit-api</module>
- <module>nifi-toolkit-kafka-migrator</module>
</modules>
<properties>
<toolkit.groovy.version>3.0.8</toolkit.groovy.version>