Repository: nifi Updated Branches: refs/heads/master 24e298101 -> 7314066b6
NIFI-3716 Created flow analyzer in NiFi Toolkit. This closes #1747. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7314066b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7314066b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7314066b Branch: refs/heads/master Commit: 7314066b6a2d5716e00884e9b6992ad60823d4a3 Parents: 24e2981 Author: Eric <[email protected]> Authored: Sun Apr 30 21:33:12 2017 -0400 Committer: Andy LoPresto <[email protected]> Committed: Mon Jun 26 15:24:30 2017 -0700 ---------------------------------------------------------------------- nifi-toolkit/nifi-toolkit-assembly/pom.xml | 4 + .../src/main/resources/bin/flow-analyzer.bat | 39 +++++ .../src/main/resources/bin/flow-analyzer.sh | 120 ++++++++++++++ nifi-toolkit/nifi-toolkit-flowanalyzer/pom.xml | 43 +++++ .../flowanalyzer/FlowAnalyzerDriver.java | 165 +++++++++++++++++++ .../nifi/toolkit/FlowAnalyzerDriverTest.java | 61 +++++++ nifi-toolkit/pom.xml | 1 + pom.xml | 5 + 8 files changed, 438 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/pom.xml b/nifi-toolkit/nifi-toolkit-assembly/pom.xml index cee7c36..9286455 100644 --- a/nifi-toolkit/nifi-toolkit-assembly/pom.xml +++ b/nifi-toolkit/nifi-toolkit-assembly/pom.xml @@ -80,6 +80,10 @@ language governing permissions and limitations under the License. --> <groupId>org.apache.nifi</groupId> <artifactId>nifi-toolkit-zookeeper-migrator</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit-flowanalyzer</artifactId> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.bat ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.bat b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.bat new file mode 100644 index 0000000..4c7e738 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.bat @@ -0,0 +1,39 @@ +@echo off +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Use JAVA_HOME if it's set; otherwise, just use java + +if "%JAVA_HOME%" == "" goto noJavaHome +if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome +set JAVA_EXE=%JAVA_HOME%\bin\java.exe +goto startConfig + +:noJavaHome +echo The JAVA_HOME environment variable is not defined correctly. +echo Instead the PATH will be used to find the java executable. +echo. +set JAVA_EXE=java +goto startConfig + +:startConfig +set LIB_DIR=%~dp0..\classpath;%~dp0..\lib + +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver + +cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* "" + http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.sh ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.sh b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.sh new file mode 100755 index 0000000..1ef1842 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-assembly/src/main/resources/bin/flow-analyzer.sh @@ -0,0 +1,120 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches + +SCRIPT_DIR=$(dirname "$0") +SCRIPT_NAME=$(basename "$0") +NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) +PROGNAME=$(basename "$0") + + +warn() { + (>&2 echo "${PROGNAME}: $*") +} + +die() { + warn "$*" + exit 1 +} + +detectOS() { + # OS specific support (must be 'true' or 'false'). + cygwin=false; + aix=false; + os400=false; + darwin=false; + case "$(uname)" in + CYGWIN*) + cygwin=true + ;; + AIX*) + aix=true + ;; + OS400*) + os400=true + ;; + Darwin) + darwin=true + ;; + esac + # For AIX, set an environment variable + if ${aix}; then + export LDR_CNTRL=MAXDATA=0xB0000000@DSA + echo ${LDR_CNTRL} + fi +} + +locateJava() { + # Setup the Java Virtual Machine + if $cygwin ; then + [ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}") + [ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}") + fi + + if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi + if [ "x${JAVA}" = "x" ]; then + if [ "x${JAVA_HOME}" != "x" ]; then + if [ ! -d "${JAVA_HOME}" ]; then + die "JAVA_HOME is not valid: ${JAVA_HOME}" + fi + JAVA="${JAVA_HOME}/bin/java" + else + warn "JAVA_HOME not set; results may vary" + JAVA=$(type java) + JAVA=$(expr "${JAVA}" : '.* \(/.*\)$') + if [ "x${JAVA}" = "x" ]; then + die "java command not found" + fi + fi + fi +} + +init() { + # Determine if there is special OS handling we must perform + detectOS + + # Locate the Java VM to execute + locateJava "$1" +} + +run() { + LIBS="${NIFI_TOOLKIT_HOME}/lib/*" + + sudo_cmd_prefix="" + if $cygwin; then + NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}") + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")" + else + CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}" + fi + + export JAVA_HOME="$JAVA_HOME" + export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME" + + umask 0077 + "${JAVA}" -cp "${CLASSPATH}" -Xms12m -Xmx24m org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver "$@" + return $? +} + + +init "$1" +run "$@" http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-flowanalyzer/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowanalyzer/pom.xml b/nifi-toolkit/nifi-toolkit-flowanalyzer/pom.xml new file mode 100644 index 0000000..eb6b93e --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowanalyzer/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-toolkit-flowanalyzer</artifactId> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit</artifactId> + <version>1.3.0-SNAPSHOT</version> + </parent> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/test-data.json</exclude> + <exclude>src/test/resources/test-data-user-pass.json</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/main/java/org/apache/nifi/toolkit/flowanalyzer/FlowAnalyzerDriver.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowanalyzer/src/main/java/org/apache/nifi/toolkit/flowanalyzer/FlowAnalyzerDriver.java b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/main/java/org/apache/nifi/toolkit/flowanalyzer/FlowAnalyzerDriver.java new file mode 100644 index 0000000..216f2bd --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/main/java/org/apache/nifi/toolkit/flowanalyzer/FlowAnalyzerDriver.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.flowanalyzer; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.zip.GZIPInputStream; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class FlowAnalyzerDriver { + final static String CONST_BYTES_GB_CONV = "1000000000"; + final static String CONST_BYTES_MB_CONV = "1000000"; + final static String CONST_BYTES_KB_CONV = "1000"; + final static int DIVIDE_SCALE = 9; + final static String CONST_XMLNODE_CONNECTION = "connection"; + + private static void printUsage() { + System.out.println("This application seeks to produce a report to analyze the flow.xml.gz file"); + System.out.println( + "Currently the reports supported by this application are Total Storage for all queues " + + "backpressure, average storage of all queues backpressure, and min and max of all queues " + + "backpressure over the entire flow."); + System.out.println("\n\n\n"); + System.out.println("Usage: flow-analyzer.sh <path to flow.xml.gz>"); + } + + public static void main(String[] args) throws Exception { + BigDecimal totalDataSize = new BigDecimal("0.0"); + BigDecimal max = new BigDecimal("0.0"); + BigDecimal min = new BigDecimal("0.0"); + BigDecimal avg = new BigDecimal("0.0"); + long maxQueueSize = 0L; + long minQueueSize = 0L; + long totalQueueSize = 0L; + + int numberOfConnections = 0; + + if (helpRequested(args)) { + printUsage(); + return; + } + + String input = args[0]; + if (!input.contains("xml.gz")) + input = input + "/flow.xml.gz"; + + InputStream fileStream = new FileInputStream(input); + InputStream gzipStream = new GZIPInputStream(fileStream); + + System.out.println("Using flow=" + input); + + DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder documentBuilder; + try { + documentBuilder = documentBuilderFactory.newDocumentBuilder(); + Document document = documentBuilder.parse(gzipStream); + NodeList connectionNode = document.getElementsByTagName(CONST_XMLNODE_CONNECTION); + + for (int x = 0; x < connectionNode.getLength(); x++) { + Node nNode = connectionNode.item(x); + if (nNode.getNodeType() == Node.ELEMENT_NODE) { + Element maxWorkQueueSize = (Element) nNode; + String maxDataSize = maxWorkQueueSize.getElementsByTagName("maxWorkQueueDataSize").item(0) + .getTextContent(); + BigDecimal byteValue = (convertSizeToByteValue(maxDataSize)) != null + ? convertSizeToByteValue(maxDataSize) : new BigDecimal("0.0"); + numberOfConnections++; + avg = avg.add(byteValue); + String dataQueueSize = maxWorkQueueSize.getElementsByTagName("maxWorkQueueSize").item(0) + .getTextContent(); + Long dataQueueSizeL = new Long(dataQueueSize); + totalQueueSize = dataQueueSizeL + totalQueueSize; + if(dataQueueSizeL > maxQueueSize) + maxQueueSize = dataQueueSizeL; + if(dataQueueSizeL < minQueueSize || minQueueSize == 0) + minQueueSize = dataQueueSizeL; + if (max.compareTo(byteValue) < 0) + max = byteValue; + + if (byteValue.compareTo(min) < 0 || min.compareTo(new BigDecimal("0.0")) == 0) + min = byteValue; + + totalDataSize = totalDataSize.add(byteValue); + } + + } + + System.out.println("Total Bytes Utilized by System=" + convertBytesToGB(totalDataSize).toPlainString() + + " GB\nMax Back Pressure Size=" + convertBytesToGB(max).toPlainString() + + " GB\nMin Back Pressure Size=" + convertBytesToGB(min).toPlainString() + + " GB\nAverage Back Pressure Size=" + + convertBytesToGB(avg.divide(new BigDecimal(numberOfConnections), DIVIDE_SCALE, RoundingMode.HALF_UP)) + " GB"); + System.out.println("Max Flowfile Queue Size=" + maxQueueSize + "\nMin Flowfile Queue Size=" + minQueueSize + + "\nAvg Flowfile Queue Size=" + new BigDecimal(totalQueueSize).divide(new BigDecimal(numberOfConnections), DIVIDE_SCALE, RoundingMode.HALF_UP)); + gzipStream.close(); + fileStream.close(); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Exception occurred while attempting to parse flow.xml.gz. Cause: " + e.getCause()); + } + + } + + private static boolean helpRequested(String[] args) { + return args.length == 0 || (args.length > 0 && (args[0].equalsIgnoreCase("-h") || args[0].equalsIgnoreCase("--help"))); + } + + /** + * + * @param value to convert to bytes + * @return BigDecimal Byte size + */ + public static BigDecimal convertSizeToByteValue(String value) { + BigDecimal size = null; + + if (value.contains("GB")) { + String numericValue = value.substring(0, value.indexOf("G") - 1); + size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_GB_CONV)); + } + + if (value.contains("MB")) { + String numericValue = value.substring(0, value.indexOf("M") - 1); + size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_MB_CONV)); + } + + if (value.contains("KB")) { + String numericValue = value.substring(0, value.indexOf("K") - 1); + size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_KB_CONV)); + } + + return size; + } + + /** + * @param bytes to convert to GB + * @return BigDecimal bytes to GB + */ + public static BigDecimal convertBytesToGB(BigDecimal bytes) { + return bytes.divide(new BigDecimal(CONST_BYTES_GB_CONV), DIVIDE_SCALE, RoundingMode.HALF_UP).stripTrailingZeros(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/test/java/org/apache/nifi/toolkit/FlowAnalyzerDriverTest.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowanalyzer/src/test/java/org/apache/nifi/toolkit/FlowAnalyzerDriverTest.java b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/test/java/org/apache/nifi/toolkit/FlowAnalyzerDriverTest.java new file mode 100644 index 0000000..9f1243b --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowanalyzer/src/test/java/org/apache/nifi/toolkit/FlowAnalyzerDriverTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; + +import org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver; +import org.junit.Test; + +public class FlowAnalyzerDriverTest { + + @Test + public void testConvertSizeToValue() { + + String gbTest = "13 GB"; + String kbTest = "103 KB"; + String mbTest = "20 MB"; + + BigDecimal gbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(gbTest); + assertEquals(gbBigDecimal.toPlainString(), "13000000000"); + + BigDecimal kbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(kbTest); + assertEquals(kbBigDecimal.toPlainString(), "103000"); + + BigDecimal mbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(mbTest); + assertEquals(mbBigDecimal.toPlainString(), "20000000"); + } + + @Test + public void convertBytesToGB() { + + BigDecimal gbBigDecimal = new BigDecimal("13000000000"); + BigDecimal kbBigDecimal = new BigDecimal("103000"); + BigDecimal mbBigDecimal = new BigDecimal("20000000"); + + BigDecimal result = FlowAnalyzerDriver.convertBytesToGB(gbBigDecimal); + assertEquals("13", result.stripTrailingZeros().toPlainString()); + + result = FlowAnalyzerDriver.convertBytesToGB(mbBigDecimal); + assertEquals("0.02", result.stripTrailingZeros().toPlainString()); + + result = FlowAnalyzerDriver.convertBytesToGB(kbBigDecimal); + assertEquals("0.000103", result.stripTrailingZeros().toPlainString()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/nifi-toolkit/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml index c2f6846..5a6b0dc 100644 --- a/nifi-toolkit/pom.xml +++ b/nifi-toolkit/pom.xml @@ -30,6 +30,7 @@ <module>nifi-toolkit-zookeeper-migrator</module> <module>nifi-toolkit-flowfile-repo</module> <module>nifi-toolkit-assembly</module> + <module>nifi-toolkit-flowanalyzer</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/7314066b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9aa9b88..e79d1f6 100644 --- a/pom.xml +++ b/pom.xml @@ -947,6 +947,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit-flowanalyzer</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-registry-service</artifactId> <version>1.4.0-SNAPSHOT</version> </dependency>
