http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java new file mode 100644 index 0000000..a48a1de --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -0,0 +1,1008 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermission; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * <p> + * The class which bootstraps Apache NiFi. This class looks for the + * bootstrap.conf file by looking in the following places (in order):</p> + * <ol> + * <li>Java System Property named + * {@code org.apache.nifi.bootstrap.config.file}</li> + * <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an + * environment variable {@code NIFI_HOME}</li> + * <li>./conf/bootstrap.conf, where {@code ./} represents the working + * directory.</li> + * </ol> + * + * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException}. + */ +public class RunNiFi { + + public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf"; + public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; + public static final String DEFAULT_JAVA_CMD = "java"; + + public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds"; + public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20"; + + public static final String RUN_AS_PROP = "run.as"; + + public static final int MAX_RESTART_ATTEMPTS = 5; + public static final int STARTUP_WAIT_SECONDS = 60; + + public static final String SHUTDOWN_CMD = "SHUTDOWN"; + public static final String PING_CMD = "PING"; + public static final String DUMP_CMD = "DUMP"; + + private volatile boolean autoRestartNiFi = true; + private volatile int ccPort = -1; + private volatile long nifiPid = -1L; + private volatile String secretKey; + private volatile ShutdownHook shutdownHook; + + private final Lock lock = new ReentrantLock(); + private final Condition startupCondition = lock.newCondition(); + + private final File bootstrapConfigFile; + + // used for logging initial info; these will be logged to console by default when the app is started + private final Logger cmdLogger = LoggerFactory.getLogger("org.apache.nifi.bootstrap.Command"); + // used for logging all info. These by default will be written to the log file + private final Logger defaultLogger = LoggerFactory.getLogger(RunNiFi.class); + + + private final ExecutorService loggingExecutor; + private volatile Set<Future<?>> loggingFutures = new HashSet<>(2); + + public RunNiFi(final File bootstrapConfigFile, final boolean verbose) { + this.bootstrapConfigFile = bootstrapConfigFile; + + loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(final Runnable runnable) { + final Thread t = Executors.defaultThreadFactory().newThread(runnable); + t.setDaemon(true); + t.setName("NiFi logging handler"); + return t; + } + }); + } + + private static void printUsage() { + System.out.println("Usage:"); + System.out.println(); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command> [options]"); + System.out.println(); + System.out.println("Valid commands include:"); + System.out.println(""); + System.out.println("Start : Start a new instance of Apache NiFi"); + System.out.println("Stop : Stop a running instance of Apache NiFi"); + System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance"); + System.out.println("Status : Determine if there is a running instance of Apache NiFi"); + System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given"); + System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies"); + System.out.println(); + } + + private static String[] shift(final String[] orig) { + return Arrays.copyOfRange(orig, 1, orig.length); + } + + public static void main(String[] args) throws IOException, InterruptedException { + if (args.length < 1 || args.length > 3) { + printUsage(); + return; + } + + File dumpFile = null; + boolean verbose = false; + if (args[0].equals("-verbose")) { + verbose = true; + args = shift(args); + } + + final String cmd = args[0]; + if (cmd.equals("dump")) { + if (args.length > 1) { + dumpFile = new File(args[1]); + } else { + dumpFile = null; + } + } + + switch (cmd.toLowerCase()) { + case "start": + case "run": + case "stop": + case "status": + case "dump": + case "restart": + break; + default: + printUsage(); + return; + } + + String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file"); + + if (configFilename == null) { + final String nifiHome = System.getenv("NIFI_HOME"); + if (nifiHome != null) { + final File nifiHomeFile = new File(nifiHome.trim()); + final File configFile = new File(nifiHomeFile, DEFAULT_CONFIG_FILE); + configFilename = configFile.getAbsolutePath(); + } + } + + if (configFilename == null) { + configFilename = DEFAULT_CONFIG_FILE; + } + + final File configFile = new File(configFilename); + + final RunNiFi runNiFi = new RunNiFi(configFile, verbose); + + switch (cmd.toLowerCase()) { + case "start": + runNiFi.start(); + break; + case "run": + runNiFi.start(); + break; + case "stop": + runNiFi.stop(); + break; + case "status": + runNiFi.status(); + break; + case "restart": + runNiFi.stop(); + runNiFi.start(); + break; + case "dump": + runNiFi.dump(dumpFile); + break; + } + } + + File getStatusFile() { + return getStatusFile(defaultLogger); + } + + public File getStatusFile(final Logger logger) { + final File confDir = bootstrapConfigFile.getParentFile(); + final File nifiHome = confDir.getParentFile(); + final File bin = new File(nifiHome, "bin"); + final File statusFile = new File(bin, "nifi.pid"); + + logger.debug("Status File: {}", statusFile); + + return statusFile; + } + + private Properties loadProperties(final Logger logger) throws IOException { + final Properties props = new Properties(); + final File statusFile = getStatusFile(logger); + if (statusFile == null || !statusFile.exists()) { + logger.debug("No status file to load properties from"); + return props; + } + + try (final FileInputStream fis = new FileInputStream(getStatusFile(logger))) { + props.load(fis); + } + + final Map<Object, Object> modified = new HashMap<>(props); + modified.remove("secret.key"); + logger.debug("Properties: {}", modified); + + return props; + } + + private synchronized void saveProperties(final Properties nifiProps, final Logger logger) throws IOException { + final File statusFile = getStatusFile(logger); + if (statusFile.exists() && !statusFile.delete()) { + logger.warn("Failed to delete {}", statusFile); + } + + if (!statusFile.createNewFile()) { + throw new IOException("Failed to create file " + statusFile); + } + + try { + final Set<PosixFilePermission> perms = new HashSet<>(); + perms.add(PosixFilePermission.OWNER_READ); + perms.add(PosixFilePermission.OWNER_WRITE); + Files.setPosixFilePermissions(statusFile.toPath(), perms); + } catch (final Exception e) { + logger.warn("Failed to set permissions so that only the owner can read status file {}; " + + "this may allows others to have access to the key needed to communicate with NiFi. " + + "Permissions should be changed so that only the owner can read this file", statusFile); + } + + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { + nifiProps.store(fos, null); + fos.getFD().sync(); + } + + logger.debug("Saved Properties {} to {}", new Object[]{nifiProps, statusFile}); + } + + private boolean isPingSuccessful(final int port, final String secretKey, final Logger logger) { + logger.debug("Pinging {}", port); + + try (final Socket socket = new Socket("localhost", port)) { + final OutputStream out = socket.getOutputStream(); + out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + logger.debug("Sent PING command"); + socket.setSoTimeout(5000); + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + logger.debug("PING response: {}", response); + out.close(); + reader.close(); + + return PING_CMD.equals(response); + } catch (final IOException ioe) { + return false; + } + } + + private Integer getCurrentPort(final Logger logger) throws IOException { + final Properties props = loadProperties(logger); + final String portVal = props.getProperty("port"); + if (portVal == null) { + logger.debug("No Port found in status file"); + return null; + } else { + logger.debug("Port defined in status file: {}", portVal); + } + + final int port = Integer.parseInt(portVal); + final boolean success = isPingSuccessful(port, props.getProperty("secret.key"), logger); + if (success) { + logger.debug("Successful PING on port {}", port); + return port; + } + + final String pid = props.getProperty("pid"); + logger.debug("PID in status file is {}", pid); + if (pid != null) { + final boolean procRunning = isProcessRunning(pid, logger); + if (procRunning) { + return port; + } else { + return null; + } + } + + return null; + } + + private boolean isProcessRunning(final String pid, final Logger logger) { + try { + // We use the "ps" command to check if the process is still running. + final ProcessBuilder builder = new ProcessBuilder(); + + builder.command("ps", "-p", pid); + final Process proc = builder.start(); + + // Look for the pid in the output of the 'ps' command. + boolean running = false; + String line; + try (final InputStream in = proc.getInputStream(); + final Reader streamReader = new InputStreamReader(in); + final BufferedReader reader = new BufferedReader(streamReader)) { + + while ((line = reader.readLine()) != null) { + if (line.trim().startsWith(pid)) { + running = true; + } + } + } + + // If output of the ps command had our PID, the process is running. + if (running) { + logger.debug("Process with PID {} is running", pid); + } else { + logger.debug("Process with PID {} is not running", pid); + } + + return running; + } catch (final IOException ioe) { + System.err.println("Failed to determine if Process " + pid + " is running; assuming that it is not"); + return false; + } + } + + private Status getStatus(final Logger logger) { + final Properties props; + try { + props = loadProperties(logger); + } catch (final IOException ioe) { + return new Status(null, null, false, false); + } + + if (props == null) { + return new Status(null, null, false, false); + } + + final String portValue = props.getProperty("port"); + final String pid = props.getProperty("pid"); + final String secretKey = props.getProperty("secret.key"); + + if (portValue == null && pid == null) { + return new Status(null, null, false, false); + } + + Integer port = null; + boolean pingSuccess = false; + if (portValue != null) { + try { + port = Integer.parseInt(portValue); + pingSuccess = isPingSuccessful(port, secretKey, logger); + } catch (final NumberFormatException nfe) { + return new Status(null, null, false, false); + } + } + + if (pingSuccess) { + return new Status(port, pid, true, true); + } + + final boolean alive = (pid == null) ? false : isProcessRunning(pid, logger); + return new Status(port, pid, pingSuccess, alive); + } + + public void status() throws IOException { + final Logger logger = cmdLogger; + final Status status = getStatus(logger); + if (status.isRespondingToPing()) { + logger.info("Apache NiFi is currently running, listening to Bootstrap on port {}, PID={}", + new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()}); + return; + } + + if (status.isProcessRunning()) { + logger.info("Apache NiFi is running at PID {} but is not responding to ping requests", status.getPid()); + return; + } + + if (status.getPort() == null) { + logger.info("Apache NiFi is not running"); + return; + } + + if (status.getPid() == null) { + logger.info("Apache NiFi is not responding to Ping requests. The process may have died or may be hung"); + } else { + logger.info("Apache NiFi is not running"); + } + } + + /** + * Writes a NiFi thread dump to the given file; if file is null, logs at + * INFO level instead. + * + * @param dumpFile the file to write the dump content to + * @throws IOException if any issues occur while writing the dump file + */ + public void dump(final File dumpFile) throws IOException { + final Logger logger = defaultLogger; // dump to bootstrap log file by default + final Integer port = getCurrentPort(logger); + if (port == null) { + logger.info("Apache NiFi is not currently running"); + return; + } + + final Properties nifiProps = loadProperties(logger); + final String secretKey = nifiProps.getProperty("secret.key"); + + final StringBuilder sb = new StringBuilder(); + try (final Socket socket = new Socket()) { + logger.debug("Connecting to NiFi instance"); + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", port)); + logger.debug("Established connection to NiFi instance."); + socket.setSoTimeout(60000); + + logger.debug("Sending DUMP Command to port {}", port); + final OutputStream out = socket.getOutputStream(); + out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + sb.append(line).append("\n"); + } + } + } + + final String dump = sb.toString(); + if (dumpFile == null) { + logger.info(dump); + } else { + try (final FileOutputStream fos = new FileOutputStream(dumpFile)) { + fos.write(dump.getBytes(StandardCharsets.UTF_8)); + } + // we want to log to the console (by default) that we wrote the thread dump to the specified file + cmdLogger.info("Successfully wrote thread dump to {}", dumpFile.getAbsolutePath()); + } + } + + public void stop() throws IOException { + final Logger logger = cmdLogger; + final Integer port = getCurrentPort(logger); + if (port == null) { + logger.info("Apache NiFi is not currently running"); + return; + } + + final Properties nifiProps = loadProperties(logger); + final String secretKey = nifiProps.getProperty("secret.key"); + + final File statusFile = getStatusFile(logger); + if (statusFile.exists() && !statusFile.delete()) { + logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile); + } + + try (final Socket socket = new Socket()) { + logger.debug("Connecting to NiFi instance"); + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", port)); + logger.debug("Established connection to NiFi instance."); + socket.setSoTimeout(60000); + + logger.debug("Sending SHUTDOWN Command to port {}", port); + final OutputStream out = socket.getOutputStream(); + out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + socket.shutdownOutput(); + + final InputStream in = socket.getInputStream(); + int lastChar; + final StringBuilder sb = new StringBuilder(); + while ((lastChar = in.read()) > -1) { + sb.append((char) lastChar); + } + final String response = sb.toString().trim(); + + logger.debug("Received response to SHUTDOWN command: {}", response); + + if (SHUTDOWN_CMD.equals(response)) { + logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + + final String pid = nifiProps.getProperty("pid"); + if (pid != null) { + final Properties bootstrapProperties = new Properties(); + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { + bootstrapProperties.load(fis); + } + + String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + } + + final long startWait = System.nanoTime(); + while (isProcessRunning(pid, logger)) { + logger.info("Waiting for Apache NiFi to finish shutting down..."); + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { + if (isProcessRunning(pid, logger)) { + logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds); + try { + killProcessTree(pid, logger); + } catch (final IOException ioe) { + logger.error("Failed to kill Process with PID {}", pid); + } + } + break; + } else { + try { + Thread.sleep(2000L); + } catch (final InterruptedException ie) { + } + } + } + + logger.info("NiFi has finished shutting down."); + } + } else { + logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response); + } + } catch (final IOException ioe) { + logger.error("Failed to send shutdown command to port {} due to {}", new Object[]{port, ioe.toString(), ioe}); + } + } + + private static List<String> getChildProcesses(final String ppid) throws IOException { + final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid}); + final List<String> childPids = new ArrayList<>(); + try (final InputStream in = proc.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + + String line; + while ((line = reader.readLine()) != null) { + childPids.add(line.trim()); + } + } + + return childPids; + } + + private void killProcessTree(final String pid, final Logger logger) throws IOException { + logger.debug("Killing Process Tree for PID {}", pid); + + final List<String> children = getChildProcesses(pid); + logger.debug("Children of PID {}: {}", new Object[]{pid, children}); + + for (final String childPid : children) { + killProcessTree(childPid, logger); + } + + Runtime.getRuntime().exec(new String[]{"kill", "-9", pid}); + } + + public static boolean isAlive(final Process process) { + try { + process.exitValue(); + return false; + } catch (final IllegalStateException | IllegalThreadStateException itse) { + return true; + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public void start() throws IOException, InterruptedException { + final Integer port = getCurrentPort(cmdLogger); + if (port != null) { + cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port " + port); + return; + } + + final ProcessBuilder builder = new ProcessBuilder(); + + if (!bootstrapConfigFile.exists()) { + throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath()); + } + + final Properties properties = new Properties(); + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { + properties.load(fis); + } + + final Map<String, String> props = new HashMap<>(); + props.putAll((Map) properties); + + final String specifiedWorkingDir = props.get("working.dir"); + if (specifiedWorkingDir != null) { + builder.directory(new File(specifiedWorkingDir)); + } + + final File bootstrapConfigAbsoluteFile = bootstrapConfigFile.getAbsoluteFile(); + final File binDir = bootstrapConfigAbsoluteFile.getParentFile(); + final File workingDir = binDir.getParentFile(); + + if (specifiedWorkingDir == null) { + builder.directory(workingDir); + } + + final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim(); + File libDir = getFile(libFilename, workingDir); + + final String confFilename = replaceNull(props.get("conf.dir"), "./conf").trim(); + File confDir = getFile(confFilename, workingDir); + + String nifiPropsFilename = props.get("props.file"); + if (nifiPropsFilename == null) { + if (confDir.exists()) { + nifiPropsFilename = new File(confDir, "nifi.properties").getAbsolutePath(); + } else { + nifiPropsFilename = DEFAULT_CONFIG_FILE; + } + } + + nifiPropsFilename = nifiPropsFilename.trim(); + + final List<String> javaAdditionalArgs = new ArrayList<>(); + for (final Map.Entry<String, String> entry : props.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + + if (key.startsWith("java.arg")) { + javaAdditionalArgs.add(value); + } + } + + final File[] libFiles = libDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String filename) { + return filename.toLowerCase().endsWith(".jar"); + } + }); + + if (libFiles == null || libFiles.length == 0) { + throw new RuntimeException("Could not find lib directory at " + libDir.getAbsolutePath()); + } + + final File[] confFiles = confDir.listFiles(); + if (confFiles == null || confFiles.length == 0) { + throw new RuntimeException("Could not find conf directory at " + confDir.getAbsolutePath()); + } + + final List<String> cpFiles = new ArrayList<>(confFiles.length + libFiles.length); + cpFiles.add(confDir.getAbsolutePath()); + for (final File file : libFiles) { + cpFiles.add(file.getAbsolutePath()); + } + + final StringBuilder classPathBuilder = new StringBuilder(); + for (int i = 0; i < cpFiles.size(); i++) { + final String filename = cpFiles.get(i); + classPathBuilder.append(filename); + if (i < cpFiles.size() - 1) { + classPathBuilder.append(File.pathSeparatorChar); + } + } + + final String classPath = classPathBuilder.toString(); + String javaCmd = props.get("java"); + if (javaCmd == null) { + javaCmd = DEFAULT_JAVA_CMD; + } + if (javaCmd.equals(DEFAULT_JAVA_CMD)) { + String javaHome = System.getenv("JAVA_HOME"); + if (javaHome != null) { + String fileExtension = isWindows() ? ".exe" : ""; + File javaFile = new File(javaHome + File.separatorChar + "bin" + + File.separatorChar + "java" + fileExtension); + if (javaFile.exists() && javaFile.canExecute()) { + javaCmd = javaFile.getAbsolutePath(); + } + } + } + + final NiFiListener listener = new NiFiListener(); + final int listenPort = listener.start(this); + + final List<String> cmd = new ArrayList<>(); + + cmd.add(javaCmd); + cmd.add("-classpath"); + cmd.add(classPath); + cmd.addAll(javaAdditionalArgs); + cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename); + cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort); + cmd.add("-Dapp=NiFi"); + cmd.add("org.apache.nifi.NiFi"); + + builder.command(cmd); + + final StringBuilder cmdBuilder = new StringBuilder(); + for (final String s : cmd) { + cmdBuilder.append(s).append(" "); + } + + cmdLogger.info("Starting Apache NiFi..."); + cmdLogger.info("Working Directory: {}", workingDir.getAbsolutePath()); + cmdLogger.info("Command: {}", cmdBuilder.toString()); + + String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); + if (gracefulShutdown == null) { + gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE; + } + + final int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } + + if (gracefulShutdownSeconds < 0) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } + + Process process = builder.start(); + handleLogging(process); + Long pid = getPid(process, cmdLogger); + if (pid != null) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps, cmdLogger); + } + + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); + final Runtime runtime = Runtime.getRuntime(); + runtime.addShutdownHook(shutdownHook); + + while (true) { + final boolean alive = isAlive(process); + + if (alive) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } else { + try { + runtime.removeShutdownHook(shutdownHook); + } catch (final IllegalStateException ise) { + // happens when already shutting down + } + + if (autoRestartNiFi) { + final File statusFile = getStatusFile(defaultLogger); + if (!statusFile.exists()) { + defaultLogger.debug("Status File no longer exists. Will not restart NiFi"); + return; + } + + defaultLogger.warn("Apache NiFi appears to have died. Restarting..."); + process = builder.start(); + handleLogging(process); + + pid = getPid(process, defaultLogger); + if (pid != null) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps, defaultLogger); + } + + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); + runtime.addShutdownHook(shutdownHook); + + final boolean started = waitForStart(); + + if (started) { + defaultLogger.info("Successfully started Apache NiFi{}", (pid == null ? "" : " with PID " + pid)); + } else { + defaultLogger.error("Apache NiFi does not appear to have started"); + } + } else { + return; + } + } + } + } + + private void handleLogging(final Process process) { + final Set<Future<?>> existingFutures = loggingFutures; + if (existingFutures != null) { + for (final Future<?> future : existingFutures) { + future.cancel(false); + } + } + + final Future<?> stdOutFuture = loggingExecutor.submit(new Runnable() { + @Override + public void run() { + final Logger stdOutLogger = LoggerFactory.getLogger("org.apache.nifi.StdOut"); + final InputStream in = process.getInputStream(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + stdOutLogger.info(line); + } + } catch (IOException e) { + defaultLogger.error("Failed to read from NiFi's Standard Out stream", e); + } + } + }); + + final Future<?> stdErrFuture = loggingExecutor.submit(new Runnable() { + @Override + public void run() { + final Logger stdErrLogger = LoggerFactory.getLogger("org.apache.nifi.StdErr"); + final InputStream in = process.getErrorStream(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + stdErrLogger.error(line); + } + } catch (IOException e) { + defaultLogger.error("Failed to read from NiFi's Standard Error stream", e); + } + } + }); + + final Set<Future<?>> futures = new HashSet<>(); + futures.add(stdOutFuture); + futures.add(stdErrFuture); + this.loggingFutures = futures; + } + + private Long getPid(final Process process, final Logger logger) { + try { + final Class<?> procClass = process.getClass(); + final Field pidField = procClass.getDeclaredField("pid"); + pidField.setAccessible(true); + final Object pidObject = pidField.get(process); + + logger.debug("PID Object = {}", pidObject); + + if (pidObject instanceof Number) { + return ((Number) pidObject).longValue(); + } + return null; + } catch (final IllegalAccessException | NoSuchFieldException nsfe) { + logger.debug("Could not find PID for child process due to {}", nsfe); + return null; + } + } + + private boolean isWindows() { + final String osName = System.getProperty("os.name"); + return osName != null && osName.toLowerCase().contains("win"); + } + + private boolean waitForStart() { + lock.lock(); + try { + final long startTime = System.nanoTime(); + + while (ccPort < 1) { + try { + startupCondition.await(1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + return false; + } + + final long waitNanos = System.nanoTime() - startTime; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds > STARTUP_WAIT_SECONDS) { + return false; + } + } + } finally { + lock.unlock(); + } + return true; + } + + private File getFile(final String filename, final File workingDir) { + File file = new File(filename); + if (!file.isAbsolute()) { + file = new File(workingDir, filename); + } + + return file; + } + + private String replaceNull(final String value, final String replacement) { + return (value == null) ? replacement : value; + } + + void setAutoRestartNiFi(final boolean restart) { + this.autoRestartNiFi = restart; + } + + void setNiFiCommandControlPort(final int port, final String secretKey) { + this.ccPort = port; + this.secretKey = secretKey; + + if (shutdownHook != null) { + shutdownHook.setSecretKey(secretKey); + } + + final File statusFile = getStatusFile(defaultLogger); + + final Properties nifiProps = new Properties(); + if (nifiPid != -1) { + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + } + nifiProps.setProperty("port", String.valueOf(ccPort)); + nifiProps.setProperty("secret.key", secretKey); + + try { + saveProperties(nifiProps, defaultLogger); + } catch (final IOException ioe) { + defaultLogger.warn("Apache NiFi has started but failed to persist NiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe}); + } + + defaultLogger.info("Apache NiFi now running and listening for Bootstrap requests on port {}", port); + } + + int getNiFiCommandControlPort() { + return this.ccPort; + } + + private static class Status { + + private final Integer port; + private final String pid; + + private final Boolean respondingToPing; + private final Boolean processRunning; + + public Status(final Integer port, final String pid, final Boolean respondingToPing, final Boolean processRunning) { + this.port = port; + this.pid = pid; + this.respondingToPing = respondingToPing; + this.processRunning = processRunning; + } + + public String getPid() { + return pid; + } + + public Integer getPort() { + return port; + } + + public boolean isRespondingToPing() { + return Boolean.TRUE.equals(respondingToPing); + } + + public boolean isProcessRunning() { + return Boolean.TRUE.equals(processRunning); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java new file mode 100644 index 0000000..a594f60 --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class ShutdownHook extends Thread { + + private final Process nifiProcess; + private final RunNiFi runner; + private final int gracefulShutdownSeconds; + private final ExecutorService executor; + + private volatile String secretKey; + + public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds, final ExecutorService executor) { + this.nifiProcess = nifiProcess; + this.runner = runner; + this.secretKey = secretKey; + this.gracefulShutdownSeconds = gracefulShutdownSeconds; + this.executor = executor; + } + + void setSecretKey(final String secretKey) { + this.secretKey = secretKey; + } + + @Override + public void run() { + executor.shutdown(); + runner.setAutoRestartNiFi(false); + final int ccPort = runner.getNiFiCommandControlPort(); + if (ccPort > 0) { + System.out.println("Initiating Shutdown of NiFi..."); + + try { + final Socket socket = new Socket("localhost", ccPort); + final OutputStream out = socket.getOutputStream(); + out.write(("SHUTDOWN " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + socket.close(); + } catch (final IOException ioe) { + System.out.println("Failed to Shutdown NiFi due to " + ioe); + } + } + + System.out.println("Waiting for Apache NiFi to finish shutting down..."); + final long startWait = System.nanoTime(); + while (RunNiFi.isAlive(nifiProcess)) { + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { + if (RunNiFi.isAlive(nifiProcess)) { + System.out.println("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process."); + nifiProcess.destroy(); + } + break; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } + } + + final File statusFile = runner.getStatusFile(); + if (!statusFile.delete()) { + System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java new file mode 100644 index 0000000..52e36b9 --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.exception; + +public class InvalidCommandException extends Exception { + + private static final long serialVersionUID = 1L; + + public InvalidCommandException() { + super(); + } + + public InvalidCommandException(final String message) { + super(message); + } + + public InvalidCommandException(final Throwable t) { + super(t); + } + + public InvalidCommandException(final String message, final Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java new file mode 100644 index 0000000..2149342 --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.util; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/.gitignore b/nifi-commons/nifi-data-provenance-utils/.gitignore new file mode 100755 index 0000000..19f2e00 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/.gitignore @@ -0,0 +1,2 @@ +/target +/target http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/pom.xml b/nifi-commons/nifi-data-provenance-utils/pom.xml new file mode 100644 index 0000000..5481cd2 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/pom.xml @@ -0,0 +1,34 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-data-provenance-utils</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java new file mode 100644 index 0000000..4a52a89 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.Collection; +import java.util.Date; +import java.util.UUID; + +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.lineage.LineageComputationType; + +/** + * + */ +public class AsyncLineageSubmission implements ComputeLineageSubmission { + + private final String lineageIdentifier = UUID.randomUUID().toString(); + private final Date submissionTime = new Date(); + + private final LineageComputationType computationType; + private final Long eventId; + private final Collection<String> lineageFlowFileUuids; + + private volatile boolean canceled = false; + + private final StandardLineageResult result; + + public AsyncLineageSubmission(final LineageComputationType computationType, final Long eventId, final Collection<String> lineageFlowFileUuids, final int numSteps) { + this.computationType = computationType; + this.eventId = eventId; + this.lineageFlowFileUuids = lineageFlowFileUuids; + this.result = new StandardLineageResult(numSteps, lineageFlowFileUuids); + } + + @Override + public StandardLineageResult getResult() { + return result; + } + + @Override + public Date getSubmissionTime() { + return submissionTime; + } + + @Override + public String getLineageIdentifier() { + return lineageIdentifier; + } + + @Override + public void cancel() { + this.canceled = true; + } + + @Override + public boolean isCanceled() { + return canceled; + } + + @Override + public LineageComputationType getLineageComputationType() { + return computationType; + } + + @Override + public Long getExpandedEventId() { + return eventId; + } + + @Override + public Collection<String> getLineageFlowFileUuids() { + return lineageFlowFileUuids; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java new file mode 100644 index 0000000..00c6170 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; + +public class AsyncQuerySubmission implements QuerySubmission { + + public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS); + + private final Date submissionTime = new Date(); + private final Query query; + + private volatile boolean canceled = false; + private final StandardQueryResult queryResult; + + /** + * Constructs an AsyncQuerySubmission with the given query and the given + * number of steps, indicating how many results must be added to this + * AsyncQuerySubmission before it is considered finished + * + * @param query the query to execute + * @param numSteps how many steps to include + */ + public AsyncQuerySubmission(final Query query, final int numSteps) { + this.query = query; + queryResult = new StandardQueryResult(query, numSteps); + } + + @Override + public Date getSubmissionTime() { + return submissionTime; + } + + @Override + public String getQueryIdentifier() { + return query.getIdentifier(); + } + + @Override + public void cancel() { + this.canceled = true; + queryResult.cancel(); + } + + @Override + public boolean isCanceled() { + return canceled; + } + + @Override + public Query getQuery() { + return query; + } + + @Override + public StandardQueryResult getResult() { + return queryResult; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java new file mode 100644 index 0000000..dc2903f --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.provenance.search.SearchableFieldType; + +import static java.util.Objects.requireNonNull; + +/** + * + */ +public class NamedSearchableField implements SearchableField { + + private final String identifier; + private final String searchableName; + private final SearchableFieldType fieldType; + private final String friendlyName; + private final boolean attribute; + + NamedSearchableField(final String identifier, final String searchableName, final String friendlyName, final boolean attribute) { + this(identifier, searchableName, friendlyName, attribute, SearchableFieldType.STRING); + } + + NamedSearchableField(final String identifier, final String searchableName, final String friendlyName, final boolean attribute, final SearchableFieldType fieldType) { + this.identifier = requireNonNull(identifier); + this.searchableName = requireNonNull(searchableName); + this.friendlyName = requireNonNull(friendlyName); + this.attribute = requireNonNull(attribute); + this.fieldType = requireNonNull(fieldType); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getSearchableFieldName() { + return searchableName; + } + + @Override + public String getFriendlyName() { + return friendlyName; + } + + @Override + public boolean isAttribute() { + return attribute; + } + + @Override + public SearchableFieldType getFieldType() { + return fieldType; + } + + @Override + public String toString() { + return friendlyName; + } + + @Override + public int hashCode() { + return 298347 + searchableName.hashCode() + (attribute ? 1 : 0); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (!(obj instanceof SearchableField)) { + return false; + } + + final SearchableField other = (SearchableField) obj; + return (this.searchableName.equals(other.getSearchableFieldName()) && attribute == other.isAttribute()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java new file mode 100644 index 0000000..6a934b1 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.provenance.search.SearchableField; + +public class SearchableFieldParser { + + public static List<SearchableField> extractSearchableFields(final String indexedFieldString, final boolean predefinedField) { + final List<SearchableField> searchableFields = new ArrayList<>(); + if (indexedFieldString != null) { + final String[] split = indexedFieldString.split(","); + for (String fieldName : split) { + fieldName = fieldName.trim(); + if (fieldName.isEmpty()) { + continue; + } + + final SearchableField searchableField; + if (predefinedField) { + searchableField = SearchableFields.getSearchableField(fieldName); + } else { + searchableField = SearchableFields.newSearchableAttribute(fieldName); + } + + if (searchableField == null) { + throw new RuntimeException("Invalid Configuration: Provenance Repository configured to Index field '" + fieldName + "', but this is not a valid field"); + } + searchableFields.add(searchableField); + } + } + + return searchableFields; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java new file mode 100644 index 0000000..de62bca --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.provenance.search.SearchableFieldType; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class SearchableFields { + + public static final SearchableField Identifier = new NamedSearchableField("Identifier", "identifier", "Identifier", false); + public static final SearchableField EventTime = new NamedSearchableField("EventTime", "time", "Event Time", false, SearchableFieldType.DATE); + public static final SearchableField FlowFileUUID = new NamedSearchableField("FlowFileUUID", "uuid", "FlowFile UUID", false); + public static final SearchableField Filename = new NamedSearchableField("Filename", "filename", "Filename", false); + public static final SearchableField EventType = new NamedSearchableField("EventType", "eventType", "Event Type", false); + public static final SearchableField TransitURI = new NamedSearchableField("TransitURI", "transitUri", "Transit URI", false); + public static final SearchableField ComponentID = new NamedSearchableField("ProcessorID", "processorId", "Component ID", false); + public static final SearchableField AlternateIdentifierURI = new NamedSearchableField("AlternateIdentifierURI", "alternateIdentifierUri", "Alternate Identifier URI", false); + public static final SearchableField FileSize = new NamedSearchableField("FileSize", "fileSize", "File Size", false, SearchableFieldType.DATA_SIZE); + public static final SearchableField Details = new NamedSearchableField("Details", "details", "Details", false, SearchableFieldType.STRING); + public static final SearchableField Relationship = new NamedSearchableField("Relationship", "relationship", "Relationship", false, SearchableFieldType.STRING); + + public static final SearchableField LineageStartDate + = new NamedSearchableField("LineageStartDate", "lineageStartDate", "Lineage Start Date", false, SearchableFieldType.DATE); + public static final SearchableField LineageIdentifier + = new NamedSearchableField("LineageIdentifiers", "lineageIdentifier", "Lineage Identifier", false, SearchableFieldType.STRING); + + public static final SearchableField ContentClaimSection + = new NamedSearchableField("ContentClaimSection", "contentClaimSection", "Content Claim Section", false, SearchableFieldType.STRING); + public static final SearchableField ContentClaimContainer + = new NamedSearchableField("ContentClaimContainer", "contentClaimContainer", "Content Claim Container", false, SearchableFieldType.STRING); + public static final SearchableField ContentClaimIdentifier + = new NamedSearchableField("ContentClaimIdentifier", "contentClaimIdentifier", "Content Claim Identifier", false, SearchableFieldType.STRING); + public static final SearchableField ContentClaimOffset + = new NamedSearchableField("ContentClaimOffset", "contentClaimOffset", "Content Claim Offset", false, SearchableFieldType.LONG); + public static final SearchableField SourceQueueIdentifier + = new NamedSearchableField("SourceQueueIdentifier", "sourceQueueIdentifier", "Source Queue Identifier", false, SearchableFieldType.STRING); + + private static final Map<String, SearchableField> standardFields; + + static { + final SearchableField[] searchableFields = new SearchableField[]{ + EventTime, FlowFileUUID, Filename, EventType, TransitURI, + ComponentID, AlternateIdentifierURI, FileSize, Relationship, Details, + LineageStartDate, LineageIdentifier, ContentClaimSection, ContentClaimContainer, ContentClaimIdentifier, + ContentClaimOffset, SourceQueueIdentifier}; + + final Map<String, SearchableField> fields = new HashMap<>(); + for (final SearchableField field : searchableFields) { + fields.put(field.getIdentifier(), field); + } + + standardFields = Collections.unmodifiableMap(fields); + } + + private SearchableFields() { + } + + public static Collection<SearchableField> getStandardFields() { + return standardFields.values(); + } + + public static SearchableField getSearchableField(final String fieldIdentifier) { + return standardFields.get(fieldIdentifier); + } + + public static SearchableField newSearchableAttribute(final String attributeName) { + return new NamedSearchableField(attributeName, attributeName, attributeName, true); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java new file mode 100644 index 0000000..63c53d0 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.EdgeNode; +import org.apache.nifi.provenance.lineage.EventNode; +import org.apache.nifi.provenance.lineage.FlowFileNode; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; + +/** + * + */ +public class StandardLineageResult implements ComputeLineageResult { + + public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); + private static final Logger logger = LoggerFactory.getLogger(StandardLineageResult.class); + + private final Collection<String> flowFileUuids; + private final Collection<ProvenanceEventRecord> relevantRecords = new ArrayList<>(); + private final Set<LineageNode> nodes = new HashSet<>(); + private final Set<LineageEdge> edges = new HashSet<>(); + private final int numSteps; + private final long creationNanos; + private long computationNanos; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + private Date expirationDate = null; + private String error = null; + private int numCompletedSteps = 0; + + private volatile boolean canceled = false; + + public StandardLineageResult(final int numSteps, final Collection<String> flowFileUuids) { + this.numSteps = numSteps; + this.creationNanos = System.nanoTime(); + this.flowFileUuids = flowFileUuids; + + updateExpiration(); + } + + @Override + public List<LineageNode> getNodes() { + readLock.lock(); + try { + return new ArrayList<>(nodes); + } finally { + readLock.unlock(); + } + } + + @Override + public List<LineageEdge> getEdges() { + readLock.lock(); + try { + return new ArrayList<>(edges); + } finally { + readLock.unlock(); + } + } + + public int getNumberOfEdges() { + readLock.lock(); + try { + return edges.size(); + } finally { + readLock.unlock(); + } + } + + public int getNumberOfNodes() { + readLock.lock(); + try { + return nodes.size(); + } finally { + readLock.unlock(); + } + } + + public long getComputationTime(final TimeUnit timeUnit) { + readLock.lock(); + try { + return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS); + } finally { + readLock.unlock(); + } + } + + @Override + public Date getExpiration() { + readLock.lock(); + try { + return expirationDate; + } finally { + readLock.unlock(); + } + } + + @Override + public String getError() { + readLock.lock(); + try { + return error; + } finally { + readLock.unlock(); + } + } + + @Override + public int getPercentComplete() { + readLock.lock(); + try { + return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isFinished() { + readLock.lock(); + try { + return numCompletedSteps >= numSteps || canceled; + } finally { + readLock.unlock(); + } + } + + public void setError(final String error) { + writeLock.lock(); + try { + this.error = error; + numCompletedSteps++; + + updateExpiration(); + + if (numCompletedSteps >= numSteps) { + computationNanos = System.nanoTime() - creationNanos; + } + } finally { + writeLock.unlock(); + } + } + + public void update(final Collection<ProvenanceEventRecord> records) { + writeLock.lock(); + try { + relevantRecords.addAll(records); + + numCompletedSteps++; + updateExpiration(); + + if (numCompletedSteps >= numSteps && error == null) { + computeLineage(); + computationNanos = System.nanoTime() - creationNanos; + } + } finally { + writeLock.unlock(); + } + } + + /** + * Computes the lineage from the relevant Provenance Event Records. This + * method must be called with the write lock held and is only going to be + * useful after all of the records have been successfully obtained + */ + private void computeLineage() { + final long startNanos = System.nanoTime(); + + nodes.clear(); + edges.clear(); + + Map<String, LineageNode> lastEventMap = new HashMap<>(); // maps FlowFile UUID to last event for that FlowFile + final List<ProvenanceEventRecord> sortedRecords = new ArrayList<>(relevantRecords); + Collections.sort(sortedRecords, new Comparator<ProvenanceEventRecord>() { + @Override + public int compare(final ProvenanceEventRecord o1, final ProvenanceEventRecord o2) { + // Sort on Event Time, then Event ID. + final int eventTimeComparison = Long.compare(o1.getEventTime(), o2.getEventTime()); + if (eventTimeComparison == 0) { + return Long.compare(o1.getEventId(), o2.getEventId()); + } else { + return eventTimeComparison; + } + } + }); + + // convert the StandardProvenanceRecord objects into Lineage nodes (FlowFileNode, EventNodes). + for (final ProvenanceEventRecord record : sortedRecords) { + final LineageNode lineageNode = new EventNode(record); + final boolean added = nodes.add(lineageNode); + if (!added) { + logger.debug("Did not add {} because it already exists in the 'nodes' set", lineageNode); + } + + // Create an edge that connects this node to the previous node for the same FlowFile UUID. + final LineageNode lastNode = lastEventMap.get(record.getFlowFileUuid()); + if (lastNode != null) { + // We calculate the Edge UUID based on whether or not this event is a SPAWN. + // If this event is a SPAWN, then we want to use the previous node's UUID because a + // SPAWN Event's UUID is not necessarily what we want, since a SPAWN Event's UUID pertains to + // only one of (potentially) many UUIDs associated with the event. Otherwise, we know that + // the UUID of this record is appropriate, so we just use it. + final String edgeUuid; + + switch (record.getEventType()) { + case JOIN: + case CLONE: + case REPLAY: + edgeUuid = lastNode.getFlowFileUuid(); + break; + default: + edgeUuid = record.getFlowFileUuid(); + break; + } + + edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode)); + } + + lastEventMap.put(record.getFlowFileUuid(), lineageNode); + + switch (record.getEventType()) { + case FORK: + case JOIN: + case REPLAY: + case CLONE: { + // For events that create FlowFile nodes, we need to create the FlowFile Nodes and associated Edges, as appropriate + for (final String childUuid : record.getChildUuids()) { + if (flowFileUuids.contains(childUuid)) { + final FlowFileNode childNode = new FlowFileNode(childUuid, record.getEventTime()); + final boolean isNewFlowFile = nodes.add(childNode); + if (!isNewFlowFile) { + final String msg = "Unable to generate Lineage Graph because multiple " + + "events were registered claiming to have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")"; + logger.error(msg); + setError(msg); + return; + } + + edges.add(new EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode)); + lastEventMap.put(childUuid, childNode); + } + } + for (final String parentUuid : record.getParentUuids()) { + LineageNode lastNodeForParent = lastEventMap.get(parentUuid); + if (lastNodeForParent != null && !lastNodeForParent.equals(lineageNode)) { + edges.add(new EdgeNode(parentUuid, lastNodeForParent, lineageNode)); + } + + lastEventMap.put(parentUuid, lineageNode); + } + } + break; + case RECEIVE: + case CREATE: { + // for a receive event, we want to create a FlowFile Node that represents the FlowFile received + // and create an edge from the Receive Event to the FlowFile Node + final LineageNode flowFileNode = new FlowFileNode(record.getFlowFileUuid(), record.getEventTime()); + final boolean isNewFlowFile = nodes.add(flowFileNode); + if (!isNewFlowFile) { + final String msg = "Found cycle in graph. This indicates that multiple events " + + "were registered claiming to have generated the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")"; + setError(msg); + logger.error(msg); + return; + } + edges.add(new EdgeNode(record.getFlowFileUuid(), lineageNode, flowFileNode)); + lastEventMap.put(record.getFlowFileUuid(), flowFileNode); + } + break; + default: + break; + } + } + + final long nanos = System.nanoTime() - startNanos; + logger.debug("Finished building lineage with {} nodes and {} edges in {} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos)); + } + + void cancel() { + this.canceled = true; + } + + /** + * Must be called with write lock! + */ + private void updateExpiration() { + expirationDate = new Date(System.currentTimeMillis() + TTL); + } +}
