http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
new file mode 100644
index 0000000..a48a1de
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -0,0 +1,1008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * <p>
+ * The class which bootstraps Apache NiFi. This class looks for the
+ * bootstrap.conf file by looking in the following places (in order):</p>
+ * <ol>
+ * <li>Java System Property named
+ * {@code org.apache.nifi.bootstrap.config.file}</li>
+ * <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an
+ * environment variable {@code NIFI_HOME}</li>
+ * <li>./conf/bootstrap.conf, where {@code ./} represents the working
+ * directory.</li>
+ * </ol>
+ *
+ * If the {@code bootstrap.conf} file cannot be found, throws a {@code 
FileNotFoundException}.
+ */
+public class RunNiFi {
+
+    public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
+    public static final String DEFAULT_NIFI_PROPS_FILE = 
"./conf/nifi.properties";
+    public static final String DEFAULT_JAVA_CMD = "java";
+
+    public static final String GRACEFUL_SHUTDOWN_PROP = 
"graceful.shutdown.seconds";
+    public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
+
+    public static final String RUN_AS_PROP = "run.as";
+
+    public static final int MAX_RESTART_ATTEMPTS = 5;
+    public static final int STARTUP_WAIT_SECONDS = 60;
+
+    public static final String SHUTDOWN_CMD = "SHUTDOWN";
+    public static final String PING_CMD = "PING";
+    public static final String DUMP_CMD = "DUMP";
+
+    private volatile boolean autoRestartNiFi = true;
+    private volatile int ccPort = -1;
+    private volatile long nifiPid = -1L;
+    private volatile String secretKey;
+    private volatile ShutdownHook shutdownHook;
+
+    private final Lock lock = new ReentrantLock();
+    private final Condition startupCondition = lock.newCondition();
+
+    private final File bootstrapConfigFile;
+
+    // used for logging initial info; these will be logged to console by 
default when the app is started
+    private final Logger cmdLogger = 
LoggerFactory.getLogger("org.apache.nifi.bootstrap.Command");
+    // used for logging all info. These by default will be written to the log 
file
+    private final Logger defaultLogger = 
LoggerFactory.getLogger(RunNiFi.class);
+
+
+    private final ExecutorService loggingExecutor;
+    private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
+
+    public RunNiFi(final File bootstrapConfigFile, final boolean verbose) {
+        this.bootstrapConfigFile = bootstrapConfigFile;
+
+        loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable runnable) {
+                final Thread t = 
Executors.defaultThreadFactory().newThread(runnable);
+                t.setDaemon(true);
+                t.setName("NiFi logging handler");
+                return t;
+            }
+        });
+    }
+
+    private static void printUsage() {
+        System.out.println("Usage:");
+        System.out.println();
+        System.out.println("java org.apache.nifi.bootstrap.RunNiFi 
[<-verbose>] <command> [options]");
+        System.out.println();
+        System.out.println("Valid commands include:");
+        System.out.println("");
+        System.out.println("Start : Start a new instance of Apache NiFi");
+        System.out.println("Stop : Stop a running instance of Apache NiFi");
+        System.out.println("Restart : Stop Apache NiFi, if it is running, and 
then start a new instance");
+        System.out.println("Status : Determine if there is a running instance 
of Apache NiFi");
+        System.out.println("Dump : Write a Thread Dump to the file specified 
by [options], or to the log if no file is given");
+        System.out.println("Run : Start a new instance of Apache NiFi and 
monitor the Process, restarting if the instance dies");
+        System.out.println();
+    }
+
+    private static String[] shift(final String[] orig) {
+        return Arrays.copyOfRange(orig, 1, orig.length);
+    }
+
+    public static void main(String[] args) throws IOException, 
InterruptedException {
+        if (args.length < 1 || args.length > 3) {
+            printUsage();
+            return;
+        }
+
+        File dumpFile = null;
+        boolean verbose = false;
+        if (args[0].equals("-verbose")) {
+            verbose = true;
+            args = shift(args);
+        }
+
+        final String cmd = args[0];
+        if (cmd.equals("dump")) {
+            if (args.length > 1) {
+                dumpFile = new File(args[1]);
+            } else {
+                dumpFile = null;
+            }
+        }
+
+        switch (cmd.toLowerCase()) {
+            case "start":
+            case "run":
+            case "stop":
+            case "status":
+            case "dump":
+            case "restart":
+                break;
+            default:
+                printUsage();
+                return;
+        }
+
+        String configFilename = 
System.getProperty("org.apache.nifi.bootstrap.config.file");
+
+        if (configFilename == null) {
+            final String nifiHome = System.getenv("NIFI_HOME");
+            if (nifiHome != null) {
+                final File nifiHomeFile = new File(nifiHome.trim());
+                final File configFile = new File(nifiHomeFile, 
DEFAULT_CONFIG_FILE);
+                configFilename = configFile.getAbsolutePath();
+            }
+        }
+
+        if (configFilename == null) {
+            configFilename = DEFAULT_CONFIG_FILE;
+        }
+
+        final File configFile = new File(configFilename);
+
+        final RunNiFi runNiFi = new RunNiFi(configFile, verbose);
+
+        switch (cmd.toLowerCase()) {
+            case "start":
+                runNiFi.start();
+                break;
+            case "run":
+                runNiFi.start();
+                break;
+            case "stop":
+                runNiFi.stop();
+                break;
+            case "status":
+                runNiFi.status();
+                break;
+            case "restart":
+                runNiFi.stop();
+                runNiFi.start();
+                break;
+            case "dump":
+                runNiFi.dump(dumpFile);
+                break;
+        }
+    }
+
+    File getStatusFile() {
+        return getStatusFile(defaultLogger);
+    }
+
+    public File getStatusFile(final Logger logger) {
+        final File confDir = bootstrapConfigFile.getParentFile();
+        final File nifiHome = confDir.getParentFile();
+        final File bin = new File(nifiHome, "bin");
+        final File statusFile = new File(bin, "nifi.pid");
+
+        logger.debug("Status File: {}", statusFile);
+
+        return statusFile;
+    }
+
+    private Properties loadProperties(final Logger logger) throws IOException {
+        final Properties props = new Properties();
+        final File statusFile = getStatusFile(logger);
+        if (statusFile == null || !statusFile.exists()) {
+            logger.debug("No status file to load properties from");
+            return props;
+        }
+
+        try (final FileInputStream fis = new 
FileInputStream(getStatusFile(logger))) {
+            props.load(fis);
+        }
+
+        final Map<Object, Object> modified = new HashMap<>(props);
+        modified.remove("secret.key");
+        logger.debug("Properties: {}", modified);
+
+        return props;
+    }
+
+    private synchronized void saveProperties(final Properties nifiProps, final 
Logger logger) throws IOException {
+        final File statusFile = getStatusFile(logger);
+        if (statusFile.exists() && !statusFile.delete()) {
+            logger.warn("Failed to delete {}", statusFile);
+        }
+
+        if (!statusFile.createNewFile()) {
+            throw new IOException("Failed to create file " + statusFile);
+        }
+
+        try {
+            final Set<PosixFilePermission> perms = new HashSet<>();
+            perms.add(PosixFilePermission.OWNER_READ);
+            perms.add(PosixFilePermission.OWNER_WRITE);
+            Files.setPosixFilePermissions(statusFile.toPath(), perms);
+        } catch (final Exception e) {
+            logger.warn("Failed to set permissions so that only the owner can 
read status file {}; "
+                + "this may allows others to have access to the key needed to 
communicate with NiFi. "
+                + "Permissions should be changed so that only the owner can 
read this file", statusFile);
+        }
+
+        try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
+            nifiProps.store(fos, null);
+            fos.getFD().sync();
+        }
+
+        logger.debug("Saved Properties {} to {}", new Object[]{nifiProps, 
statusFile});
+    }
+
+    private boolean isPingSuccessful(final int port, final String secretKey, 
final Logger logger) {
+        logger.debug("Pinging {}", port);
+
+        try (final Socket socket = new Socket("localhost", port)) {
+            final OutputStream out = socket.getOutputStream();
+            out.write((PING_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+
+            logger.debug("Sent PING command");
+            socket.setSoTimeout(5000);
+            final InputStream in = socket.getInputStream();
+            final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+            final String response = reader.readLine();
+            logger.debug("PING response: {}", response);
+            out.close();
+            reader.close();
+
+            return PING_CMD.equals(response);
+        } catch (final IOException ioe) {
+            return false;
+        }
+    }
+
+    private Integer getCurrentPort(final Logger logger) throws IOException {
+        final Properties props = loadProperties(logger);
+        final String portVal = props.getProperty("port");
+        if (portVal == null) {
+            logger.debug("No Port found in status file");
+            return null;
+        } else {
+            logger.debug("Port defined in status file: {}", portVal);
+        }
+
+        final int port = Integer.parseInt(portVal);
+        final boolean success = isPingSuccessful(port, 
props.getProperty("secret.key"), logger);
+        if (success) {
+            logger.debug("Successful PING on port {}", port);
+            return port;
+        }
+
+        final String pid = props.getProperty("pid");
+        logger.debug("PID in status file is {}", pid);
+        if (pid != null) {
+            final boolean procRunning = isProcessRunning(pid, logger);
+            if (procRunning) {
+                return port;
+            } else {
+                return null;
+            }
+        }
+
+        return null;
+    }
+
+    private boolean isProcessRunning(final String pid, final Logger logger) {
+        try {
+            // We use the "ps" command to check if the process is still 
running.
+            final ProcessBuilder builder = new ProcessBuilder();
+
+            builder.command("ps", "-p", pid);
+            final Process proc = builder.start();
+
+            // Look for the pid in the output of the 'ps' command.
+            boolean running = false;
+            String line;
+            try (final InputStream in = proc.getInputStream();
+                final Reader streamReader = new InputStreamReader(in);
+                final BufferedReader reader = new 
BufferedReader(streamReader)) {
+
+                while ((line = reader.readLine()) != null) {
+                    if (line.trim().startsWith(pid)) {
+                        running = true;
+                    }
+                }
+            }
+
+            // If output of the ps command had our PID, the process is running.
+            if (running) {
+                logger.debug("Process with PID {} is running", pid);
+            } else {
+                logger.debug("Process with PID {} is not running", pid);
+            }
+
+            return running;
+        } catch (final IOException ioe) {
+            System.err.println("Failed to determine if Process " + pid + " is 
running; assuming that it is not");
+            return false;
+        }
+    }
+
+    private Status getStatus(final Logger logger) {
+        final Properties props;
+        try {
+            props = loadProperties(logger);
+        } catch (final IOException ioe) {
+            return new Status(null, null, false, false);
+        }
+
+        if (props == null) {
+            return new Status(null, null, false, false);
+        }
+
+        final String portValue = props.getProperty("port");
+        final String pid = props.getProperty("pid");
+        final String secretKey = props.getProperty("secret.key");
+
+        if (portValue == null && pid == null) {
+            return new Status(null, null, false, false);
+        }
+
+        Integer port = null;
+        boolean pingSuccess = false;
+        if (portValue != null) {
+            try {
+                port = Integer.parseInt(portValue);
+                pingSuccess = isPingSuccessful(port, secretKey, logger);
+            } catch (final NumberFormatException nfe) {
+                return new Status(null, null, false, false);
+            }
+        }
+
+        if (pingSuccess) {
+            return new Status(port, pid, true, true);
+        }
+
+        final boolean alive = (pid == null) ? false : isProcessRunning(pid, 
logger);
+        return new Status(port, pid, pingSuccess, alive);
+    }
+
+    public void status() throws IOException {
+        final Logger logger = cmdLogger;
+        final Status status = getStatus(logger);
+        if (status.isRespondingToPing()) {
+            logger.info("Apache NiFi is currently running, listening to 
Bootstrap on port {}, PID={}",
+                new Object[]{status.getPort(), status.getPid() == null ? 
"unknkown" : status.getPid()});
+            return;
+        }
+
+        if (status.isProcessRunning()) {
+            logger.info("Apache NiFi is running at PID {} but is not 
responding to ping requests", status.getPid());
+            return;
+        }
+
+        if (status.getPort() == null) {
+            logger.info("Apache NiFi is not running");
+            return;
+        }
+
+        if (status.getPid() == null) {
+            logger.info("Apache NiFi is not responding to Ping requests. The 
process may have died or may be hung");
+        } else {
+            logger.info("Apache NiFi is not running");
+        }
+    }
+
+    /**
+     * Writes a NiFi thread dump to the given file; if file is null, logs at
+     * INFO level instead.
+     *
+     * @param dumpFile the file to write the dump content to
+     * @throws IOException if any issues occur while writing the dump file
+     */
+    public void dump(final File dumpFile) throws IOException {
+        final Logger logger = defaultLogger;    // dump to bootstrap log file 
by default
+        final Integer port = getCurrentPort(logger);
+        if (port == null) {
+            logger.info("Apache NiFi is not currently running");
+            return;
+        }
+
+        final Properties nifiProps = loadProperties(logger);
+        final String secretKey = nifiProps.getProperty("secret.key");
+
+        final StringBuilder sb = new StringBuilder();
+        try (final Socket socket = new Socket()) {
+            logger.debug("Connecting to NiFi instance");
+            socket.setSoTimeout(60000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.debug("Established connection to NiFi instance.");
+            socket.setSoTimeout(60000);
+
+            logger.debug("Sending DUMP Command to port {}", port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((DUMP_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+
+            final InputStream in = socket.getInputStream();
+            try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    sb.append(line).append("\n");
+                }
+            }
+        }
+
+        final String dump = sb.toString();
+        if (dumpFile == null) {
+            logger.info(dump);
+        } else {
+            try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
+                fos.write(dump.getBytes(StandardCharsets.UTF_8));
+            }
+            // we want to log to the console (by default) that we wrote the 
thread dump to the specified file
+            cmdLogger.info("Successfully wrote thread dump to {}", 
dumpFile.getAbsolutePath());
+        }
+    }
+
+    public void stop() throws IOException {
+        final Logger logger = cmdLogger;
+        final Integer port = getCurrentPort(logger);
+        if (port == null) {
+            logger.info("Apache NiFi is not currently running");
+            return;
+        }
+
+        final Properties nifiProps = loadProperties(logger);
+        final String secretKey = nifiProps.getProperty("secret.key");
+
+        final File statusFile = getStatusFile(logger);
+        if (statusFile.exists() && !statusFile.delete()) {
+            logger.error("Failed to delete status file {}; this file should be 
cleaned up manually", statusFile);
+        }
+
+        try (final Socket socket = new Socket()) {
+            logger.debug("Connecting to NiFi instance");
+            socket.setSoTimeout(60000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.debug("Established connection to NiFi instance.");
+            socket.setSoTimeout(60000);
+
+            logger.debug("Sending SHUTDOWN Command to port {}", port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((SHUTDOWN_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+            socket.shutdownOutput();
+
+            final InputStream in = socket.getInputStream();
+            int lastChar;
+            final StringBuilder sb = new StringBuilder();
+            while ((lastChar = in.read()) > -1) {
+                sb.append((char) lastChar);
+            }
+            final String response = sb.toString().trim();
+
+            logger.debug("Received response to SHUTDOWN command: {}", 
response);
+
+            if (SHUTDOWN_CMD.equals(response)) {
+                logger.info("Apache NiFi has accepted the Shutdown Command and 
is shutting down now");
+
+                final String pid = nifiProps.getProperty("pid");
+                if (pid != null) {
+                    final Properties bootstrapProperties = new Properties();
+                    try (final FileInputStream fis = new 
FileInputStream(bootstrapConfigFile)) {
+                        bootstrapProperties.load(fis);
+                    }
+
+                    String gracefulShutdown = 
bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, 
DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+                    int gracefulShutdownSeconds;
+                    try {
+                        gracefulShutdownSeconds = 
Integer.parseInt(gracefulShutdown);
+                    } catch (final NumberFormatException nfe) {
+                        gracefulShutdownSeconds = 
Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+                    }
+
+                    final long startWait = System.nanoTime();
+                    while (isProcessRunning(pid, logger)) {
+                        logger.info("Waiting for Apache NiFi to finish 
shutting down...");
+                        final long waitNanos = System.nanoTime() - startWait;
+                        final long waitSeconds = 
TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+                        if (waitSeconds >= gracefulShutdownSeconds && 
gracefulShutdownSeconds > 0) {
+                            if (isProcessRunning(pid, logger)) {
+                                logger.warn("NiFi has not finished shutting 
down after {} seconds. Killing process.", gracefulShutdownSeconds);
+                                try {
+                                    killProcessTree(pid, logger);
+                                } catch (final IOException ioe) {
+                                    logger.error("Failed to kill Process with 
PID {}", pid);
+                                }
+                            }
+                            break;
+                        } else {
+                            try {
+                                Thread.sleep(2000L);
+                            } catch (final InterruptedException ie) {
+                            }
+                        }
+                    }
+
+                    logger.info("NiFi has finished shutting down.");
+                }
+            } else {
+                logger.error("When sending SHUTDOWN command to NiFi, got 
unexpected response {}", response);
+            }
+        } catch (final IOException ioe) {
+            logger.error("Failed to send shutdown command to port {} due to 
{}", new Object[]{port, ioe.toString(), ioe});
+        }
+    }
+
+    private static List<String> getChildProcesses(final String ppid) throws 
IOException {
+        final Process proc = Runtime.getRuntime().exec(new String[]{"ps", 
"-o", "pid", "--no-headers", "--ppid", ppid});
+        final List<String> childPids = new ArrayList<>();
+        try (final InputStream in = proc.getInputStream();
+            final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+
+            String line;
+            while ((line = reader.readLine()) != null) {
+                childPids.add(line.trim());
+            }
+        }
+
+        return childPids;
+    }
+
+    private void killProcessTree(final String pid, final Logger logger) throws 
IOException {
+        logger.debug("Killing Process Tree for PID {}", pid);
+
+        final List<String> children = getChildProcesses(pid);
+        logger.debug("Children of PID {}: {}", new Object[]{pid, children});
+
+        for (final String childPid : children) {
+            killProcessTree(childPid, logger);
+        }
+
+        Runtime.getRuntime().exec(new String[]{"kill", "-9", pid});
+    }
+
+    public static boolean isAlive(final Process process) {
+        try {
+            process.exitValue();
+            return false;
+        } catch (final IllegalStateException | IllegalThreadStateException 
itse) {
+            return true;
+        }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void start() throws IOException, InterruptedException {
+        final Integer port = getCurrentPort(cmdLogger);
+        if (port != null) {
+            cmdLogger.info("Apache NiFi is already running, listening to 
Bootstrap on port " + port);
+            return;
+        }
+
+        final ProcessBuilder builder = new ProcessBuilder();
+
+        if (!bootstrapConfigFile.exists()) {
+            throw new 
FileNotFoundException(bootstrapConfigFile.getAbsolutePath());
+        }
+
+        final Properties properties = new Properties();
+        try (final FileInputStream fis = new 
FileInputStream(bootstrapConfigFile)) {
+            properties.load(fis);
+        }
+
+        final Map<String, String> props = new HashMap<>();
+        props.putAll((Map) properties);
+
+        final String specifiedWorkingDir = props.get("working.dir");
+        if (specifiedWorkingDir != null) {
+            builder.directory(new File(specifiedWorkingDir));
+        }
+
+        final File bootstrapConfigAbsoluteFile = 
bootstrapConfigFile.getAbsoluteFile();
+        final File binDir = bootstrapConfigAbsoluteFile.getParentFile();
+        final File workingDir = binDir.getParentFile();
+
+        if (specifiedWorkingDir == null) {
+            builder.directory(workingDir);
+        }
+
+        final String libFilename = replaceNull(props.get("lib.dir"), 
"./lib").trim();
+        File libDir = getFile(libFilename, workingDir);
+
+        final String confFilename = replaceNull(props.get("conf.dir"), 
"./conf").trim();
+        File confDir = getFile(confFilename, workingDir);
+
+        String nifiPropsFilename = props.get("props.file");
+        if (nifiPropsFilename == null) {
+            if (confDir.exists()) {
+                nifiPropsFilename = new File(confDir, 
"nifi.properties").getAbsolutePath();
+            } else {
+                nifiPropsFilename = DEFAULT_CONFIG_FILE;
+            }
+        }
+
+        nifiPropsFilename = nifiPropsFilename.trim();
+
+        final List<String> javaAdditionalArgs = new ArrayList<>();
+        for (final Map.Entry<String, String> entry : props.entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+
+            if (key.startsWith("java.arg")) {
+                javaAdditionalArgs.add(value);
+            }
+        }
+
+        final File[] libFiles = libDir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String filename) {
+                return filename.toLowerCase().endsWith(".jar");
+            }
+        });
+
+        if (libFiles == null || libFiles.length == 0) {
+            throw new RuntimeException("Could not find lib directory at " + 
libDir.getAbsolutePath());
+        }
+
+        final File[] confFiles = confDir.listFiles();
+        if (confFiles == null || confFiles.length == 0) {
+            throw new RuntimeException("Could not find conf directory at " + 
confDir.getAbsolutePath());
+        }
+
+        final List<String> cpFiles = new ArrayList<>(confFiles.length + 
libFiles.length);
+        cpFiles.add(confDir.getAbsolutePath());
+        for (final File file : libFiles) {
+            cpFiles.add(file.getAbsolutePath());
+        }
+
+        final StringBuilder classPathBuilder = new StringBuilder();
+        for (int i = 0; i < cpFiles.size(); i++) {
+            final String filename = cpFiles.get(i);
+            classPathBuilder.append(filename);
+            if (i < cpFiles.size() - 1) {
+                classPathBuilder.append(File.pathSeparatorChar);
+            }
+        }
+
+        final String classPath = classPathBuilder.toString();
+        String javaCmd = props.get("java");
+        if (javaCmd == null) {
+            javaCmd = DEFAULT_JAVA_CMD;
+        }
+        if (javaCmd.equals(DEFAULT_JAVA_CMD)) {
+            String javaHome = System.getenv("JAVA_HOME");
+            if (javaHome != null) {
+                String fileExtension = isWindows() ? ".exe" : "";
+                File javaFile = new File(javaHome + File.separatorChar + "bin"
+                    + File.separatorChar + "java" + fileExtension);
+                if (javaFile.exists() && javaFile.canExecute()) {
+                    javaCmd = javaFile.getAbsolutePath();
+                }
+            }
+        }
+
+        final NiFiListener listener = new NiFiListener();
+        final int listenPort = listener.start(this);
+
+        final List<String> cmd = new ArrayList<>();
+
+        cmd.add(javaCmd);
+        cmd.add("-classpath");
+        cmd.add(classPath);
+        cmd.addAll(javaAdditionalArgs);
+        cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
+        cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
+        cmd.add("-Dapp=NiFi");
+        cmd.add("org.apache.nifi.NiFi");
+
+        builder.command(cmd);
+
+        final StringBuilder cmdBuilder = new StringBuilder();
+        for (final String s : cmd) {
+            cmdBuilder.append(s).append(" ");
+        }
+
+        cmdLogger.info("Starting Apache NiFi...");
+        cmdLogger.info("Working Directory: {}", workingDir.getAbsolutePath());
+        cmdLogger.info("Command: {}", cmdBuilder.toString());
+
+        String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
+        if (gracefulShutdown == null) {
+            gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
+        }
+
+        final int gracefulShutdownSeconds;
+        try {
+            gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
+        } catch (final NumberFormatException nfe) {
+            throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + 
"' property in Bootstrap Config File "
+                + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an 
invalid value. Must be a non-negative integer");
+        }
+
+        if (gracefulShutdownSeconds < 0) {
+            throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + 
"' property in Bootstrap Config File "
+                + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an 
invalid value. Must be a non-negative integer");
+        }
+
+        Process process = builder.start();
+        handleLogging(process);
+        Long pid = getPid(process, cmdLogger);
+        if (pid != null) {
+            nifiPid = pid;
+            final Properties nifiProps = new Properties();
+            nifiProps.setProperty("pid", String.valueOf(nifiPid));
+            saveProperties(nifiProps, cmdLogger);
+        }
+
+        shutdownHook = new ShutdownHook(process, this, secretKey, 
gracefulShutdownSeconds, loggingExecutor);
+        final Runtime runtime = Runtime.getRuntime();
+        runtime.addShutdownHook(shutdownHook);
+
+        while (true) {
+            final boolean alive = isAlive(process);
+
+            if (alive) {
+                try {
+                    Thread.sleep(1000L);
+                } catch (final InterruptedException ie) {
+                }
+            } else {
+                try {
+                    runtime.removeShutdownHook(shutdownHook);
+                } catch (final IllegalStateException ise) {
+                    // happens when already shutting down
+                }
+
+                if (autoRestartNiFi) {
+                    final File statusFile = getStatusFile(defaultLogger);
+                    if (!statusFile.exists()) {
+                        defaultLogger.debug("Status File no longer exists. 
Will not restart NiFi");
+                        return;
+                    }
+
+                    defaultLogger.warn("Apache NiFi appears to have died. 
Restarting...");
+                    process = builder.start();
+                    handleLogging(process);
+
+                    pid = getPid(process, defaultLogger);
+                    if (pid != null) {
+                        nifiPid = pid;
+                        final Properties nifiProps = new Properties();
+                        nifiProps.setProperty("pid", String.valueOf(nifiPid));
+                        saveProperties(nifiProps, defaultLogger);
+                    }
+
+                    shutdownHook = new ShutdownHook(process, this, secretKey, 
gracefulShutdownSeconds, loggingExecutor);
+                    runtime.addShutdownHook(shutdownHook);
+
+                    final boolean started = waitForStart();
+
+                    if (started) {
+                        defaultLogger.info("Successfully started Apache 
NiFi{}", (pid == null ? "" : " with PID " + pid));
+                    } else {
+                        defaultLogger.error("Apache NiFi does not appear to 
have started");
+                    }
+                } else {
+                    return;
+                }
+            }
+        }
+    }
+
+    private void handleLogging(final Process process) {
+        final Set<Future<?>> existingFutures = loggingFutures;
+        if (existingFutures != null) {
+            for (final Future<?> future : existingFutures) {
+                future.cancel(false);
+            }
+        }
+
+        final Future<?> stdOutFuture = loggingExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                final Logger stdOutLogger = 
LoggerFactory.getLogger("org.apache.nifi.StdOut");
+                final InputStream in = process.getInputStream();
+                try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        stdOutLogger.info(line);
+                    }
+                } catch (IOException e) {
+                    defaultLogger.error("Failed to read from NiFi's Standard 
Out stream", e);
+                }
+            }
+        });
+
+        final Future<?> stdErrFuture = loggingExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                final Logger stdErrLogger = 
LoggerFactory.getLogger("org.apache.nifi.StdErr");
+                final InputStream in = process.getErrorStream();
+                try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        stdErrLogger.error(line);
+                    }
+                } catch (IOException e) {
+                    defaultLogger.error("Failed to read from NiFi's Standard 
Error stream", e);
+                }
+            }
+        });
+
+        final Set<Future<?>> futures = new HashSet<>();
+        futures.add(stdOutFuture);
+        futures.add(stdErrFuture);
+        this.loggingFutures = futures;
+    }
+
+    private Long getPid(final Process process, final Logger logger) {
+        try {
+            final Class<?> procClass = process.getClass();
+            final Field pidField = procClass.getDeclaredField("pid");
+            pidField.setAccessible(true);
+            final Object pidObject = pidField.get(process);
+
+            logger.debug("PID Object = {}", pidObject);
+
+            if (pidObject instanceof Number) {
+                return ((Number) pidObject).longValue();
+            }
+            return null;
+        } catch (final IllegalAccessException | NoSuchFieldException nsfe) {
+            logger.debug("Could not find PID for child process due to {}", 
nsfe);
+            return null;
+        }
+    }
+
+    private boolean isWindows() {
+        final String osName = System.getProperty("os.name");
+        return osName != null && osName.toLowerCase().contains("win");
+    }
+
+    private boolean waitForStart() {
+        lock.lock();
+        try {
+            final long startTime = System.nanoTime();
+
+            while (ccPort < 1) {
+                try {
+                    startupCondition.await(1, TimeUnit.SECONDS);
+                } catch (final InterruptedException ie) {
+                    return false;
+                }
+
+                final long waitNanos = System.nanoTime() - startTime;
+                final long waitSeconds = 
TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+                if (waitSeconds > STARTUP_WAIT_SECONDS) {
+                    return false;
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+        return true;
+    }
+
+    private File getFile(final String filename, final File workingDir) {
+        File file = new File(filename);
+        if (!file.isAbsolute()) {
+            file = new File(workingDir, filename);
+        }
+
+        return file;
+    }
+
+    private String replaceNull(final String value, final String replacement) {
+        return (value == null) ? replacement : value;
+    }
+
+    void setAutoRestartNiFi(final boolean restart) {
+        this.autoRestartNiFi = restart;
+    }
+
+    void setNiFiCommandControlPort(final int port, final String secretKey) {
+        this.ccPort = port;
+        this.secretKey = secretKey;
+
+        if (shutdownHook != null) {
+            shutdownHook.setSecretKey(secretKey);
+        }
+
+        final File statusFile = getStatusFile(defaultLogger);
+
+        final Properties nifiProps = new Properties();
+        if (nifiPid != -1) {
+            nifiProps.setProperty("pid", String.valueOf(nifiPid));
+        }
+        nifiProps.setProperty("port", String.valueOf(ccPort));
+        nifiProps.setProperty("secret.key", secretKey);
+
+        try {
+            saveProperties(nifiProps, defaultLogger);
+        } catch (final IOException ioe) {
+            defaultLogger.warn("Apache NiFi has started but failed to persist 
NiFi Port information to {} due to {}", new 
Object[]{statusFile.getAbsolutePath(), ioe});
+        }
+
+        defaultLogger.info("Apache NiFi now running and listening for 
Bootstrap requests on port {}", port);
+    }
+
+    int getNiFiCommandControlPort() {
+        return this.ccPort;
+    }
+
+    private static class Status {
+
+        private final Integer port;
+        private final String pid;
+
+        private final Boolean respondingToPing;
+        private final Boolean processRunning;
+
+        public Status(final Integer port, final String pid, final Boolean 
respondingToPing, final Boolean processRunning) {
+            this.port = port;
+            this.pid = pid;
+            this.respondingToPing = respondingToPing;
+            this.processRunning = processRunning;
+        }
+
+        public String getPid() {
+            return pid;
+        }
+
+        public Integer getPort() {
+            return port;
+        }
+
+        public boolean isRespondingToPing() {
+            return Boolean.TRUE.equals(respondingToPing);
+        }
+
+        public boolean isProcessRunning() {
+            return Boolean.TRUE.equals(processRunning);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
new file mode 100644
index 0000000..a594f60
--- /dev/null
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ShutdownHook extends Thread {
+
+    private final Process nifiProcess;
+    private final RunNiFi runner;
+    private final int gracefulShutdownSeconds;
+    private final ExecutorService executor;
+
+    private volatile String secretKey;
+
+    public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final 
String secretKey, final int gracefulShutdownSeconds, final ExecutorService 
executor) {
+        this.nifiProcess = nifiProcess;
+        this.runner = runner;
+        this.secretKey = secretKey;
+        this.gracefulShutdownSeconds = gracefulShutdownSeconds;
+        this.executor = executor;
+    }
+
+    void setSecretKey(final String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    @Override
+    public void run() {
+        executor.shutdown();
+        runner.setAutoRestartNiFi(false);
+        final int ccPort = runner.getNiFiCommandControlPort();
+        if (ccPort > 0) {
+            System.out.println("Initiating Shutdown of NiFi...");
+
+            try {
+                final Socket socket = new Socket("localhost", ccPort);
+                final OutputStream out = socket.getOutputStream();
+                out.write(("SHUTDOWN " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
+                out.flush();
+
+                socket.close();
+            } catch (final IOException ioe) {
+                System.out.println("Failed to Shutdown NiFi due to " + ioe);
+            }
+        }
+
+        System.out.println("Waiting for Apache NiFi to finish shutting 
down...");
+        final long startWait = System.nanoTime();
+        while (RunNiFi.isAlive(nifiProcess)) {
+            final long waitNanos = System.nanoTime() - startWait;
+            final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+            if (waitSeconds >= gracefulShutdownSeconds && 
gracefulShutdownSeconds > 0) {
+                if (RunNiFi.isAlive(nifiProcess)) {
+                    System.out.println("NiFi has not finished shutting down 
after " + gracefulShutdownSeconds + " seconds. Killing process.");
+                    nifiProcess.destroy();
+                }
+                break;
+            } else {
+                try {
+                    Thread.sleep(1000L);
+                } catch (final InterruptedException ie) {
+                }
+            }
+        }
+
+        final File statusFile = runner.getStatusFile();
+        if (!statusFile.delete()) {
+            System.err.println("Failed to delete status file " + 
statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
new file mode 100644
index 0000000..52e36b9
--- /dev/null
+++ 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.exception;
+
+public class InvalidCommandException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidCommandException() {
+        super();
+    }
+
+    public InvalidCommandException(final String message) {
+        super(message);
+    }
+
+    public InvalidCommandException(final Throwable t) {
+        super(t);
+    }
+
+    public InvalidCommandException(final String message, final Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
new file mode 100644
index 0000000..2149342
--- /dev/null
+++ 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/.gitignore 
b/nifi-commons/nifi-data-provenance-utils/.gitignore
new file mode 100755
index 0000000..19f2e00
--- /dev/null
+++ b/nifi-commons/nifi-data-provenance-utils/.gitignore
@@ -0,0 +1,2 @@
+/target
+/target

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/pom.xml 
b/nifi-commons/nifi-data-provenance-utils/pom.xml
new file mode 100644
index 0000000..5481cd2
--- /dev/null
+++ b/nifi-commons/nifi-data-provenance-utils/pom.xml
@@ -0,0 +1,34 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-data-provenance-utils</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java
new file mode 100644
index 0000000..4a52a89
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncLineageSubmission.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.lineage.LineageComputationType;
+
+/**
+ *
+ */
+public class AsyncLineageSubmission implements ComputeLineageSubmission {
+
+    private final String lineageIdentifier = UUID.randomUUID().toString();
+    private final Date submissionTime = new Date();
+
+    private final LineageComputationType computationType;
+    private final Long eventId;
+    private final Collection<String> lineageFlowFileUuids;
+
+    private volatile boolean canceled = false;
+
+    private final StandardLineageResult result;
+
+    public AsyncLineageSubmission(final LineageComputationType 
computationType, final Long eventId, final Collection<String> 
lineageFlowFileUuids, final int numSteps) {
+        this.computationType = computationType;
+        this.eventId = eventId;
+        this.lineageFlowFileUuids = lineageFlowFileUuids;
+        this.result = new StandardLineageResult(numSteps, 
lineageFlowFileUuids);
+    }
+
+    @Override
+    public StandardLineageResult getResult() {
+        return result;
+    }
+
+    @Override
+    public Date getSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public String getLineageIdentifier() {
+        return lineageIdentifier;
+    }
+
+    @Override
+    public void cancel() {
+        this.canceled = true;
+    }
+
+    @Override
+    public boolean isCanceled() {
+        return canceled;
+    }
+
+    @Override
+    public LineageComputationType getLineageComputationType() {
+        return computationType;
+    }
+
+    @Override
+    public Long getExpandedEventId() {
+        return eventId;
+    }
+
+    @Override
+    public Collection<String> getLineageFlowFileUuids() {
+        return lineageFlowFileUuids;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
new file mode 100644
index 0000000..00c6170
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+
+public class AsyncQuerySubmission implements QuerySubmission {
+
+    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(60, 
TimeUnit.SECONDS);
+
+    private final Date submissionTime = new Date();
+    private final Query query;
+
+    private volatile boolean canceled = false;
+    private final StandardQueryResult queryResult;
+
+    /**
+     * Constructs an AsyncQuerySubmission with the given query and the given
+     * number of steps, indicating how many results must be added to this
+     * AsyncQuerySubmission before it is considered finished
+     *
+     * @param query the query to execute
+     * @param numSteps how many steps to include
+     */
+    public AsyncQuerySubmission(final Query query, final int numSteps) {
+        this.query = query;
+        queryResult = new StandardQueryResult(query, numSteps);
+    }
+
+    @Override
+    public Date getSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public String getQueryIdentifier() {
+        return query.getIdentifier();
+    }
+
+    @Override
+    public void cancel() {
+        this.canceled = true;
+        queryResult.cancel();
+    }
+
+    @Override
+    public boolean isCanceled() {
+        return canceled;
+    }
+
+    @Override
+    public Query getQuery() {
+        return query;
+    }
+
+    @Override
+    public StandardQueryResult getResult() {
+        return queryResult;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java
new file mode 100644
index 0000000..dc2903f
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/NamedSearchableField.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.search.SearchableFieldType;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ */
+public class NamedSearchableField implements SearchableField {
+
+    private final String identifier;
+    private final String searchableName;
+    private final SearchableFieldType fieldType;
+    private final String friendlyName;
+    private final boolean attribute;
+
+    NamedSearchableField(final String identifier, final String searchableName, 
final String friendlyName, final boolean attribute) {
+        this(identifier, searchableName, friendlyName, attribute, 
SearchableFieldType.STRING);
+    }
+
+    NamedSearchableField(final String identifier, final String searchableName, 
final String friendlyName, final boolean attribute, final SearchableFieldType 
fieldType) {
+        this.identifier = requireNonNull(identifier);
+        this.searchableName = requireNonNull(searchableName);
+        this.friendlyName = requireNonNull(friendlyName);
+        this.attribute = requireNonNull(attribute);
+        this.fieldType = requireNonNull(fieldType);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String getSearchableFieldName() {
+        return searchableName;
+    }
+
+    @Override
+    public String getFriendlyName() {
+        return friendlyName;
+    }
+
+    @Override
+    public boolean isAttribute() {
+        return attribute;
+    }
+
+    @Override
+    public SearchableFieldType getFieldType() {
+        return fieldType;
+    }
+
+    @Override
+    public String toString() {
+        return friendlyName;
+    }
+
+    @Override
+    public int hashCode() {
+        return 298347 + searchableName.hashCode() + (attribute ? 1 : 0);
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof SearchableField)) {
+            return false;
+        }
+
+        final SearchableField other = (SearchableField) obj;
+        return (this.searchableName.equals(other.getSearchableFieldName()) && 
attribute == other.isAttribute());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java
new file mode 100644
index 0000000..6a934b1
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFieldParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.provenance.search.SearchableField;
+
+public class SearchableFieldParser {
+
+    public static List<SearchableField> extractSearchableFields(final String 
indexedFieldString, final boolean predefinedField) {
+        final List<SearchableField> searchableFields = new ArrayList<>();
+        if (indexedFieldString != null) {
+            final String[] split = indexedFieldString.split(",");
+            for (String fieldName : split) {
+                fieldName = fieldName.trim();
+                if (fieldName.isEmpty()) {
+                    continue;
+                }
+
+                final SearchableField searchableField;
+                if (predefinedField) {
+                    searchableField = 
SearchableFields.getSearchableField(fieldName);
+                } else {
+                    searchableField = 
SearchableFields.newSearchableAttribute(fieldName);
+                }
+
+                if (searchableField == null) {
+                    throw new RuntimeException("Invalid Configuration: 
Provenance Repository configured to Index field '" + fieldName + "', but this 
is not a valid field");
+                }
+                searchableFields.add(searchableField);
+            }
+        }
+
+        return searchableFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java
new file mode 100644
index 0000000..de62bca
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.provenance.search.SearchableFieldType;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class SearchableFields {
+
+    public static final SearchableField Identifier = new 
NamedSearchableField("Identifier", "identifier", "Identifier", false);
+    public static final SearchableField EventTime = new 
NamedSearchableField("EventTime", "time", "Event Time", false, 
SearchableFieldType.DATE);
+    public static final SearchableField FlowFileUUID = new 
NamedSearchableField("FlowFileUUID", "uuid", "FlowFile UUID", false);
+    public static final SearchableField Filename = new 
NamedSearchableField("Filename", "filename", "Filename", false);
+    public static final SearchableField EventType = new 
NamedSearchableField("EventType", "eventType", "Event Type", false);
+    public static final SearchableField TransitURI = new 
NamedSearchableField("TransitURI", "transitUri", "Transit URI", false);
+    public static final SearchableField ComponentID = new 
NamedSearchableField("ProcessorID", "processorId", "Component ID", false);
+    public static final SearchableField AlternateIdentifierURI = new 
NamedSearchableField("AlternateIdentifierURI", "alternateIdentifierUri", 
"Alternate Identifier URI", false);
+    public static final SearchableField FileSize = new 
NamedSearchableField("FileSize", "fileSize", "File Size", false, 
SearchableFieldType.DATA_SIZE);
+    public static final SearchableField Details = new 
NamedSearchableField("Details", "details", "Details", false, 
SearchableFieldType.STRING);
+    public static final SearchableField Relationship = new 
NamedSearchableField("Relationship", "relationship", "Relationship", false, 
SearchableFieldType.STRING);
+
+    public static final SearchableField LineageStartDate
+            = new NamedSearchableField("LineageStartDate", "lineageStartDate", 
"Lineage Start Date", false, SearchableFieldType.DATE);
+    public static final SearchableField LineageIdentifier
+            = new NamedSearchableField("LineageIdentifiers", 
"lineageIdentifier", "Lineage Identifier", false, SearchableFieldType.STRING);
+
+    public static final SearchableField ContentClaimSection
+            = new NamedSearchableField("ContentClaimSection", 
"contentClaimSection", "Content Claim Section", false, 
SearchableFieldType.STRING);
+    public static final SearchableField ContentClaimContainer
+            = new NamedSearchableField("ContentClaimContainer", 
"contentClaimContainer", "Content Claim Container", false, 
SearchableFieldType.STRING);
+    public static final SearchableField ContentClaimIdentifier
+            = new NamedSearchableField("ContentClaimIdentifier", 
"contentClaimIdentifier", "Content Claim Identifier", false, 
SearchableFieldType.STRING);
+    public static final SearchableField ContentClaimOffset
+            = new NamedSearchableField("ContentClaimOffset", 
"contentClaimOffset", "Content Claim Offset", false, SearchableFieldType.LONG);
+    public static final SearchableField SourceQueueIdentifier
+            = new NamedSearchableField("SourceQueueIdentifier", 
"sourceQueueIdentifier", "Source Queue Identifier", false, 
SearchableFieldType.STRING);
+
+    private static final Map<String, SearchableField> standardFields;
+
+    static {
+        final SearchableField[] searchableFields = new SearchableField[]{
+            EventTime, FlowFileUUID, Filename, EventType, TransitURI,
+            ComponentID, AlternateIdentifierURI, FileSize, Relationship, 
Details,
+            LineageStartDate, LineageIdentifier, ContentClaimSection, 
ContentClaimContainer, ContentClaimIdentifier,
+            ContentClaimOffset, SourceQueueIdentifier};
+
+        final Map<String, SearchableField> fields = new HashMap<>();
+        for (final SearchableField field : searchableFields) {
+            fields.put(field.getIdentifier(), field);
+        }
+
+        standardFields = Collections.unmodifiableMap(fields);
+    }
+
+    private SearchableFields() {
+    }
+
+    public static Collection<SearchableField> getStandardFields() {
+        return standardFields.values();
+    }
+
+    public static SearchableField getSearchableField(final String 
fieldIdentifier) {
+        return standardFields.get(fieldIdentifier);
+    }
+
+    public static SearchableField newSearchableAttribute(final String 
attributeName) {
+        return new NamedSearchableField(attributeName, attributeName, 
attributeName, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
new file mode 100644
index 0000000..63c53d0
--- /dev/null
+++ 
b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.EdgeNode;
+import org.apache.nifi.provenance.lineage.EventNode;
+import org.apache.nifi.provenance.lineage.FlowFileNode;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+
+/**
+ *
+ */
+public class StandardLineageResult implements ComputeLineageResult {
+
+    public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, 
TimeUnit.MINUTES);
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardLineageResult.class);
+
+    private final Collection<String> flowFileUuids;
+    private final Collection<ProvenanceEventRecord> relevantRecords = new 
ArrayList<>();
+    private final Set<LineageNode> nodes = new HashSet<>();
+    private final Set<LineageEdge> edges = new HashSet<>();
+    private final int numSteps;
+    private final long creationNanos;
+    private long computationNanos;
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock readLock = rwLock.readLock();
+    private final Lock writeLock = rwLock.writeLock();
+
+    private Date expirationDate = null;
+    private String error = null;
+    private int numCompletedSteps = 0;
+
+    private volatile boolean canceled = false;
+
+    public StandardLineageResult(final int numSteps, final Collection<String> 
flowFileUuids) {
+        this.numSteps = numSteps;
+        this.creationNanos = System.nanoTime();
+        this.flowFileUuids = flowFileUuids;
+
+        updateExpiration();
+    }
+
+    @Override
+    public List<LineageNode> getNodes() {
+        readLock.lock();
+        try {
+            return new ArrayList<>(nodes);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public List<LineageEdge> getEdges() {
+        readLock.lock();
+        try {
+            return new ArrayList<>(edges);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public int getNumberOfEdges() {
+        readLock.lock();
+        try {
+            return edges.size();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public int getNumberOfNodes() {
+        readLock.lock();
+        try {
+            return nodes.size();
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public long getComputationTime(final TimeUnit timeUnit) {
+        readLock.lock();
+        try {
+            return timeUnit.convert(computationNanos, TimeUnit.NANOSECONDS);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public Date getExpiration() {
+        readLock.lock();
+        try {
+            return expirationDate;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public String getError() {
+        readLock.lock();
+        try {
+            return error;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public int getPercentComplete() {
+        readLock.lock();
+        try {
+            return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / 
(float) numSteps) * 100.0F);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        readLock.lock();
+        try {
+            return numCompletedSteps >= numSteps || canceled;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void setError(final String error) {
+        writeLock.lock();
+        try {
+            this.error = error;
+            numCompletedSteps++;
+
+            updateExpiration();
+
+            if (numCompletedSteps >= numSteps) {
+                computationNanos = System.nanoTime() - creationNanos;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public void update(final Collection<ProvenanceEventRecord> records) {
+        writeLock.lock();
+        try {
+            relevantRecords.addAll(records);
+
+            numCompletedSteps++;
+            updateExpiration();
+
+            if (numCompletedSteps >= numSteps && error == null) {
+                computeLineage();
+                computationNanos = System.nanoTime() - creationNanos;
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Computes the lineage from the relevant Provenance Event Records. This
+     * method must be called with the write lock held and is only going to be
+     * useful after all of the records have been successfully obtained
+     */
+    private void computeLineage() {
+        final long startNanos = System.nanoTime();
+
+        nodes.clear();
+        edges.clear();
+
+        Map<String, LineageNode> lastEventMap = new HashMap<>();    // maps 
FlowFile UUID to last event for that FlowFile
+        final List<ProvenanceEventRecord> sortedRecords = new 
ArrayList<>(relevantRecords);
+        Collections.sort(sortedRecords, new 
Comparator<ProvenanceEventRecord>() {
+            @Override
+            public int compare(final ProvenanceEventRecord o1, final 
ProvenanceEventRecord o2) {
+                // Sort on Event Time, then Event ID.
+                final int eventTimeComparison = 
Long.compare(o1.getEventTime(), o2.getEventTime());
+                if (eventTimeComparison == 0) {
+                    return Long.compare(o1.getEventId(), o2.getEventId());
+                } else {
+                    return eventTimeComparison;
+                }
+            }
+        });
+
+        // convert the StandardProvenanceRecord objects into Lineage nodes 
(FlowFileNode, EventNodes).
+        for (final ProvenanceEventRecord record : sortedRecords) {
+            final LineageNode lineageNode = new EventNode(record);
+            final boolean added = nodes.add(lineageNode);
+            if (!added) {
+                logger.debug("Did not add {} because it already exists in the 
'nodes' set", lineageNode);
+            }
+
+            // Create an edge that connects this node to the previous node for 
the same FlowFile UUID.
+            final LineageNode lastNode = 
lastEventMap.get(record.getFlowFileUuid());
+            if (lastNode != null) {
+                // We calculate the Edge UUID based on whether or not this 
event is a SPAWN.
+                // If this event is a SPAWN, then we want to use the previous 
node's UUID because a
+                // SPAWN Event's UUID is not necessarily what we want, since a 
SPAWN Event's UUID pertains to
+                // only one of (potentially) many UUIDs associated with the 
event. Otherwise, we know that
+                // the UUID of this record is appropriate, so we just use it.
+                final String edgeUuid;
+
+                switch (record.getEventType()) {
+                    case JOIN:
+                    case CLONE:
+                    case REPLAY:
+                        edgeUuid = lastNode.getFlowFileUuid();
+                        break;
+                    default:
+                        edgeUuid = record.getFlowFileUuid();
+                        break;
+                }
+
+                edges.add(new EdgeNode(edgeUuid, lastNode, lineageNode));
+            }
+
+            lastEventMap.put(record.getFlowFileUuid(), lineageNode);
+
+            switch (record.getEventType()) {
+                case FORK:
+                case JOIN:
+                case REPLAY:
+                case CLONE: {
+                    // For events that create FlowFile nodes, we need to 
create the FlowFile Nodes and associated Edges, as appropriate
+                    for (final String childUuid : record.getChildUuids()) {
+                        if (flowFileUuids.contains(childUuid)) {
+                            final FlowFileNode childNode = new 
FlowFileNode(childUuid, record.getEventTime());
+                            final boolean isNewFlowFile = nodes.add(childNode);
+                            if (!isNewFlowFile) {
+                                final String msg = "Unable to generate Lineage 
Graph because multiple "
+                                        + "events were registered claiming to 
have generated the same FlowFile (UUID = " + childNode.getFlowFileUuid() + ")";
+                                logger.error(msg);
+                                setError(msg);
+                                return;
+                            }
+
+                            edges.add(new 
EdgeNode(childNode.getFlowFileUuid(), lineageNode, childNode));
+                            lastEventMap.put(childUuid, childNode);
+                        }
+                    }
+                    for (final String parentUuid : record.getParentUuids()) {
+                        LineageNode lastNodeForParent = 
lastEventMap.get(parentUuid);
+                        if (lastNodeForParent != null && 
!lastNodeForParent.equals(lineageNode)) {
+                            edges.add(new EdgeNode(parentUuid, 
lastNodeForParent, lineageNode));
+                        }
+
+                        lastEventMap.put(parentUuid, lineageNode);
+                    }
+                }
+                break;
+                case RECEIVE:
+                case CREATE: {
+                    // for a receive event, we want to create a FlowFile Node 
that represents the FlowFile received
+                    // and create an edge from the Receive Event to the 
FlowFile Node
+                    final LineageNode flowFileNode = new 
FlowFileNode(record.getFlowFileUuid(), record.getEventTime());
+                    final boolean isNewFlowFile = nodes.add(flowFileNode);
+                    if (!isNewFlowFile) {
+                        final String msg = "Found cycle in graph. This 
indicates that multiple events "
+                                + "were registered claiming to have generated 
the same FlowFile (UUID = " + flowFileNode.getFlowFileUuid() + ")";
+                        setError(msg);
+                        logger.error(msg);
+                        return;
+                    }
+                    edges.add(new EdgeNode(record.getFlowFileUuid(), 
lineageNode, flowFileNode));
+                    lastEventMap.put(record.getFlowFileUuid(), flowFileNode);
+                }
+                break;
+                default:
+                    break;
+            }
+        }
+
+        final long nanos = System.nanoTime() - startNanos;
+        logger.debug("Finished building lineage with {} nodes and {} edges in 
{} millis", nodes.size(), edges.size(), TimeUnit.NANOSECONDS.toMillis(nanos));
+    }
+
+    void cancel() {
+        this.canceled = true;
+    }
+
+    /**
+     * Must be called with write lock!
+     */
+    private void updateExpiration() {
+        expirationDate = new Date(System.currentTimeMillis() + TTL);
+    }
+}

Reply via email to