Repository: incubator-nifi
Updated Branches:
  refs/heads/bootstrap d517b3fd0 -> 76f54f861


NIFI-145: Allow a run.as user to be set in bootstrap.conf file; addressed 
concerns where an un-priveleged user could issue commands to running NiFi to 
shutdown; addressed concerns where an un-priveleged user could push large 
amounts of data to the Bootstrap or NiFi causing OOME


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/76f54f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/76f54f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/76f54f86

Branch: refs/heads/bootstrap
Commit: 76f54f86115e42d16f700aa0d1c5bce22f830734
Parents: d517b3f
Author: Mark Payne <[email protected]>
Authored: Fri Dec 12 09:47:19 2014 -0500
Committer: Mark Payne <[email protected]>
Committed: Fri Dec 12 09:47:19 2014 -0500

----------------------------------------------------------------------
 .../repository/VolatileContentRepository.java   |   2 +-
 .../src/main/resources/conf/bootstrap.conf      |   2 +-
 .../java/org/apache/nifi/BootstrapListener.java |  46 ++++++--
 .../src/main/java/org/apache/nifi/NiFi.java     |   2 -
 .../apache/nifi/util/LimitingInputStream.java   | 107 +++++++++++++++++++
 .../apache/nifi/bootstrap/BootstrapCodec.java   |   6 +-
 .../org/apache/nifi/bootstrap/NiFiListener.java |  18 +++-
 .../java/org/apache/nifi/bootstrap/RunNiFi.java |  81 ++++++++++----
 .../bootstrap/util/LimitingInputStream.java     | 107 +++++++++++++++++++
 9 files changed, 332 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index e14ec5d..1a44725 100644
--- 
a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ 
b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -92,7 +92,7 @@ public class VolatileContentRepository implements 
ContentRepository {
     public static final String MAX_SIZE_PROPERTY = 
"nifi.volatile.content.repository.max.size";
     public static final String BLOCK_SIZE_PROPERTY = 
"nifi.volatile.content.repository.block.size";
 
-    private final ScheduledExecutorService executor = new FlowEngine(3, 
"VolatileContentRepository Workers");
+    private final ScheduledExecutorService executor = new FlowEngine(3, 
"VolatileContentRepository Workers", true);
     private final ConcurrentMap<ContentClaim, ContentBlock> claimMap = new 
ConcurrentHashMap<>(256);
     private final AtomicLong repoSize = new AtomicLong(0L);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
index 6b32b2b..37ec474 100644
--- 
a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
+++ 
b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf
@@ -13,7 +13,7 @@ java.arg.2=-Xms256m
 java.arg.3=-Xmx512m
 
 # Enable Remote Debugging
-#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
+#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
 
 # Java command to use when running NiFi
 java=java

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 31f336c..3393952 100644
--- 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -27,9 +27,11 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.nifi.util.LimitingInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,14 +40,16 @@ public class BootstrapListener {
        
        private final NiFi nifi;
        private final int bootstrapPort;
-
+       private final String secretKey;
+       
        private volatile Listener listener;
        private volatile ServerSocket serverSocket;
        
        
-       public BootstrapListener(final NiFi nifi, final int port) {
+       public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
                this.nifi = nifi;
-               this.bootstrapPort = port;
+               this.bootstrapPort = bootstrapPort;
+               secretKey = UUID.randomUUID().toString();
        }
        
        public void start() throws IOException {
@@ -71,7 +75,7 @@ public class BootstrapListener {
                        socket.setSoTimeout(60000);
                        
                        final OutputStream out = socket.getOutputStream();
-                       out.write(("PORT " + localPort + 
"\n").getBytes(StandardCharsets.UTF_8));
+                       out.write(("PORT " + localPort + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
                        out.flush();
                        
                        logger.debug("Awaiting response from Bootstrap...");
@@ -121,6 +125,7 @@ public class BootstrapListener {
                                try {
                                        final Socket socket;
                                        try {
+                                           logger.debug("Listening for 
Bootstrap Requests");
                                                socket = serverSocket.accept();
                                        } catch (final SocketTimeoutException 
ste) {
                                                if ( stopped ) {
@@ -136,6 +141,9 @@ public class BootstrapListener {
                                                throw ioe;
                                        }
                                        
+                                       logger.debug("Received connection from 
Bootstrap");
+                                       socket.setSoTimeout(5000);
+                                       
                                        executor.submit(new Runnable() {
                                                @Override
                                                public void run() {
@@ -184,27 +192,42 @@ public class BootstrapListener {
                out.flush();
        }
        
-       private BootstrapRequest readRequest(final InputStream in) throws 
IOException {
-               final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+       
+       @SuppressWarnings("resource")  // we don't want to close the stream, as 
the caller will do that
+    private BootstrapRequest readRequest(final InputStream in) throws 
IOException {
+           // We want to ensure that we don't try to read data from an 
InputStream directly
+           // by a BufferedReader because any user on the system could open a 
socket and send
+           // a multi-gigabyte file without any new lines in order to crash 
the NiFi instance
+           // (or at least cause OutOfMemoryErrors, which can wreak havoc on 
the running instance).
+           // So we will limit the Input Stream to only 4 KB, which should be 
plenty for any request.
+           final LimitingInputStream limitingIn = new LimitingInputStream(in, 
4096);
+               final BufferedReader reader = new BufferedReader(new 
InputStreamReader(limitingIn));
                
                final String line = reader.readLine();
                final String[] splits = line.split(" ");
                if ( splits.length < 0 ) {
-                       throw new IOException("Received invalid command from 
NiFi: " + line);
+                       throw new IOException("Received invalid request from 
Bootstrap: " + line);
                }
                
                final String requestType = splits[0];
                final String[] args;
                if ( splits.length == 1 ) {
-                       args = new String[0];
+                       throw new IOException("Received invalid request from 
Bootstrap; request did not have a secret key; request type = " + requestType);
+               } else if ( splits.length == 2 ) {
+                   args = new String[0];
                } else {
-                       args = Arrays.copyOfRange(splits, 1, splits.length);
+                       args = Arrays.copyOfRange(splits, 2, splits.length);
+               }
+               
+               final String requestKey = splits[1];
+               if ( !secretKey.equals(requestKey) ) {
+                   throw new IOException("Received invalid Secret Key for 
request type " + requestType);
                }
                
                try {
                        return new BootstrapRequest(requestType, args);
                } catch (final Exception e) {
-                       throw new IOException("Received invalid request from 
bootstrap; request type = " + requestType);
+                       throw new IOException("Received invalid request from 
Bootstrap; request type = " + requestType);
                }
        }
        
@@ -227,7 +250,8 @@ public class BootstrapListener {
                        return requestType;
                }
                
-               public String[] getArgs() {
+               @SuppressWarnings("unused")
+        public String[] getArgs() {
                        return args;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
index 13cd4d6..98489af 100644
--- 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -36,7 +36,6 @@ import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.nar.NarUnpacker;
 import org.apache.nifi.util.FileUtils;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.bridge.SLF4JBridgeHandler;
@@ -61,7 +60,6 @@ public class NiFi {
 
         // register the shutdown hook
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-
             @Override
             public void run() {
                 // shutdown the jetty server

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java
 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java
new file mode 100644
index 0000000..ce3a6db
--- /dev/null
+++ 
b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
index fb10930..f03bf1e 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java
@@ -64,7 +64,7 @@ public class BootstrapCodec {
        private void processRequest(final String cmd, final String[] args) 
throws InvalidCommandException, IOException {
                switch (cmd) {
                        case "PORT": {
-                               if ( args.length != 1 ) {
+                               if ( args.length != 2 ) {
                                        throw new InvalidCommandException();
                                }
                                
@@ -78,8 +78,10 @@ public class BootstrapCodec {
                                if ( port < 1 || port > 65535 ) {
                                        throw new 
InvalidCommandException("Invalid Port number; should be integer between 1 and 
65535");
                                }
+
+                               final String secretKey = args[1];
                                
-                               runner.setNiFiCommandControlPort(port);
+                               runner.setNiFiCommandControlPort(port, 
secretKey);
                                writer.write("OK");
                                writer.newLine();
                                writer.flush();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
index c831351..f05d45a 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.bootstrap;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -24,6 +25,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.bootstrap.util.LimitingInputStream;
+
 public class NiFiListener {
        private ServerSocket serverSocket;
        private volatile Listener listener;
@@ -92,17 +95,26 @@ public class NiFiListener {
                                                throw ioe;
                                        }
                                        
-                                       
                                        executor.submit(new Runnable() {
                                                @Override
                                                public void run() {
                                                        try {
-                                                               final 
BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), 
socket.getOutputStream());
+                                                           // we want to 
ensure that we don't try to read data from an InputStream directly
+                                                       // by a BufferedReader 
because any user on the system could open a socket and send
+                                                       // a multi-gigabyte 
file without any new lines in order to crash the Bootstrap,
+                                                           // which in turn 
may cause the Shutdown Hook to shutdown NiFi.
+                                                       // So we will limit the 
amount of data to read to 4 KB
+                                                       final InputStream 
limitingIn = new LimitingInputStream(socket.getInputStream(), 4096);
+                                                               final 
BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, 
socket.getOutputStream());
                                                                
codec.communicate();
-                                                               socket.close();
                                                        } catch (final 
Throwable t) {
                                                                
System.out.println("Failed to communicate with NiFi due to " + t);
                                                                
t.printStackTrace();
+                                                       } finally {
+                                                           try {
+                                                               socket.close();
+                                                           } catch (final 
IOException ioe) {
+                                                           }
                                                        }
                                                }
                                        });

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index a5987bc..a230711 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -31,7 +31,10 @@ import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -205,6 +208,20 @@ public class RunNiFi {
        
        private synchronized void saveProperties(final Properties nifiProps) 
throws IOException {
            final File statusFile = getStatusFile();
+           if ( statusFile.exists() && !statusFile.delete() ) {
+               logger.warning("Failed to delete " + statusFile);
+           }
+
+           if ( !statusFile.createNewFile() ) {
+               throw new IOException("Failed to create file " + statusFile);
+           }
+
+           try {
+               Files.setPosixFilePermissions(statusFile.toPath(), 
Collections.singleton(PosixFilePermission.OWNER_READ));
+           } catch (final Exception e) {
+               logger.warning("Failed to set permissions so that only the 
owner can read status file " + statusFile + "; this may allows others to have 
access to the key needed to communicate with NiFi. Permissions should be 
changed so that only the owner can read this file");
+           }
+           
         try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
             nifiProps.store(fos, null);
             fos.getFD().sync();
@@ -213,16 +230,16 @@ public class RunNiFi {
         logger.fine("Saved Properties " + nifiProps + " to " + statusFile);
        }
 
-       private boolean isPingSuccessful(final int port) {
+       private boolean isPingSuccessful(final int port, final String 
secretKey) {
            logger.fine("Pinging " + port);
            
            try (final Socket socket = new Socket("localhost", port)) {
             final OutputStream out = socket.getOutputStream();
-            out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
+            out.write((PING_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
             out.flush();
 
             logger.fine("Sent PING command");
-            
+            socket.setSoTimeout(5000);
             final InputStream in = socket.getInputStream();
             final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
             final String response = reader.readLine();
@@ -245,7 +262,7 @@ public class RunNiFi {
                }
                
                final int port = Integer.parseInt(portVal);
-           final boolean success = isPingSuccessful(port);
+           final boolean success = isPingSuccessful(port, 
props.getProperty("secret.key"));
            if ( success ) {
                logger.fine("Successful PING on port " + port);
                return port;
@@ -271,10 +288,7 @@ public class RunNiFi {
                // We use the "ps" command to check if the process is still 
running.
                final ProcessBuilder builder = new ProcessBuilder();
                
-               // ps -p <pid> -o comm=
-               // -> -p <pid> to filter just the pid we care about
-               // -> -o comm= to remove headers from the output
-               builder.command("ps", "-p", pid, "-o", "comm=");
+               builder.command("ps", "-p", pid, "--no-headers");
                final Process proc = builder.start();
                
                // Read how many lines are output by the 'ps' command
@@ -321,6 +335,7 @@ public class RunNiFi {
            
         final String portValue = props.getProperty("port");
         final String pid = props.getProperty("pid");
+        final String secretKey = props.getProperty("secret.key");
         
         if ( portValue == null && pid == null ) {
             return new Status(null, null, false, false);
@@ -331,7 +346,7 @@ public class RunNiFi {
         if ( portValue != null ) {
             try {
                 port = Integer.parseInt(portValue);
-                pingSuccess = isPingSuccessful(port);
+                pingSuccess = isPingSuccessful(port, secretKey);
             } catch (final NumberFormatException nfe) {
                 return new Status(null, null, false, false);
             }
@@ -373,14 +388,19 @@ public class RunNiFi {
                        return;
                }
                
+               final Properties nifiProps = loadProperties();
+               final String secretKey = nifiProps.getProperty("secret.key");
+               
                try (final Socket socket = new Socket()) {
+                   logger.fine("Connecting to NiFi instance");
                        socket.setSoTimeout(60000);
                        socket.connect(new InetSocketAddress("localhost", 
port));
+                       logger.fine("Established connection to NiFi instance.");
                        socket.setSoTimeout(60000);
                        
                        logger.fine("Sending SHUTDOWN Command to port " + port);
                        final OutputStream out = socket.getOutputStream();
-                       out.write((SHUTDOWN_CMD + 
"\n").getBytes(StandardCharsets.UTF_8));
+                       out.write((SHUTDOWN_CMD + " " + secretKey + 
"\n").getBytes(StandardCharsets.UTF_8));
                        out.flush();
                        
                        final InputStream in = socket.getInputStream();
@@ -392,10 +412,8 @@ public class RunNiFi {
                        if ( SHUTDOWN_CMD.equals(response) ) {
                                logger.info("Apache NiFi has accepted the 
Shutdown Command and is shutting down now");
                                
-                               final Properties nifiProps = loadProperties();
                                final String pid = nifiProps.getProperty("pid");
                                if ( pid != null ) {
-
                                final Properties bootstrapProperties = new 
Properties();
                                try (final FileInputStream fis = new 
FileInputStream(bootstrapConfigFile)) {
                                    bootstrapProperties.load(fis);
@@ -418,7 +436,7 @@ public class RunNiFi {
                                        if ( isProcessRunning(pid) ) {
                                            logger.warning("NiFi has not 
finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing 
process.");
                                            try {
-                                               killProcess(pid);
+                                               killProcessTree(pid);
                                            } catch (final IOException ioe) {
                                                logger.severe("Failed to kill 
Process with PID " + pid);
                                            }
@@ -448,7 +466,31 @@ public class RunNiFi {
        }
        
        
-       private static void killProcess(final String pid) throws IOException {
+       private static List<String> getChildProcesses(final String ppid) throws 
IOException {
+           final Process proc = Runtime.getRuntime().exec(new String[] {"ps", 
"-o", "pid", "--no-headers", "--ppid", ppid});
+           final List<String> childPids = new ArrayList<>();
+           try (final InputStream in = proc.getInputStream();
+                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+               
+               String line;
+               while ((line = reader.readLine()) != null) {
+                   childPids.add(line.trim());
+               }
+           }
+           
+           return childPids;
+       }
+       
+       private void killProcessTree(final String pid) throws IOException {
+           logger.fine("Killing Process Tree for PID " + pid);
+           
+           final List<String> children = getChildProcesses(pid);
+           logger.fine("Children of PID " + pid + ": " + children);
+           
+           for ( final String childPid : children ) {
+               killProcessTree(childPid);
+           }
+           
            Runtime.getRuntime().exec(new String[] {"kill", "-9", pid});
        }
        
@@ -620,7 +662,7 @@ public class RunNiFi {
                 nifiPid = pid;
                 final Properties nifiProps = new Properties();
                 nifiProps.setProperty("pid", String.valueOf(nifiPid));
-                saveProperties(properties);
+                saveProperties(nifiProps);
             }
                        
                        ShutdownHook shutdownHook = new ShutdownHook(process, 
this, gracefulShutdownSeconds);
@@ -651,7 +693,7 @@ public class RunNiFi {
                                        nifiPid = pid;
                                        final Properties nifiProps = new 
Properties();
                                        nifiProps.setProperty("pid", 
String.valueOf(nifiPid));
-                                       saveProperties(properties);
+                                       saveProperties(nifiProps);
                                    }
                                                
                                                shutdownHook = new 
ShutdownHook(process, this, gracefulShutdownSeconds);
@@ -677,7 +719,7 @@ public class RunNiFi {
                            nifiPid = pid;
                 final Properties nifiProps = new Properties();
                 nifiProps.setProperty("pid", String.valueOf(nifiPid));
-                saveProperties(properties);
+                saveProperties(nifiProps);
                        }
                        
                        boolean started = waitForStart();
@@ -758,9 +800,8 @@ public class RunNiFi {
                this.autoRestartNiFi = restart;
        }
        
-       void setNiFiCommandControlPort(final int port) {
+       void setNiFiCommandControlPort(final int port, final String secretKey) {
                this.ccPort = port;
-
                final File statusFile = getStatusFile();
                
                final Properties nifiProps = new Properties();
@@ -768,6 +809,8 @@ public class RunNiFi {
                    nifiProps.setProperty("pid", String.valueOf(nifiPid));
                }
                nifiProps.setProperty("port", String.valueOf(ccPort));
+               nifiProps.setProperty("secret.key", secretKey);
+               
                try {
                    saveProperties(nifiProps);
                } catch (final IOException ioe) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/76f54f86/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
new file mode 100644
index 0000000..2149342
--- /dev/null
+++ 
b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.bootstrap.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+}

Reply via email to