MINIFI-17 Adding error handling of configurations that fail to start and a couple other small changes
This closes #15 Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/66dbda90 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/66dbda90 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/66dbda90 Branch: refs/heads/master Commit: 66dbda90c904a679337633a3631ea09554887ec7 Parents: 0c04fbb Author: Joseph Percivall <[email protected]> Authored: Thu Apr 21 16:50:19 2016 -0400 Committer: Joseph Percivall <[email protected]> Committed: Mon Apr 25 11:53:32 2016 -0400 ---------------------------------------------------------------------- minifi-assembly/pom.xml | 2 +- .../apache/nifi/minifi/bootstrap/RunMiNiFi.java | 270 +++++++---- .../nifi/minifi/bootstrap/ShutdownHook.java | 2 + .../ConfigurationChangeException.java | 42 ++ .../ConfigurationChangeListener.java | 6 +- .../ConfigurationChangeNotifier.java | 4 +- .../configuration/FileChangeNotifier.java | 183 -------- .../configuration/ListenerHandleResult.java | 55 +++ .../configuration/RestChangeNotifier.java | 259 ----------- .../notifiers/FileChangeNotifier.java | 202 ++++++++ .../notifiers/RestChangeNotifier.java | 289 ++++++++++++ .../bootstrap/util/ConfigTransformer.java | 459 +++++++++++-------- .../configuration/TestFileChangeNotifier.java | 206 --------- .../configuration/TestRestChangeNotifier.java | 51 --- .../TestRestChangeNotifierSSL.java | 96 ---- .../notifiers/TestFileChangeNotifier.java | 208 +++++++++ .../notifiers/TestRestChangeNotifier.java | 51 +++ .../notifiers/TestRestChangeNotifierSSL.java | 96 ++++ .../notifiers/util/MockChangeListener.java | 51 +++ .../util/TestRestChangeNotifierCommon.java | 89 ++++ .../configuration/util/MockChangeListener.java | 46 -- .../util/TestRestChangeNotifierCommon.java | 89 ---- .../bootstrap/util/TestConfigTransformer.java | 27 ++ .../src/test/resources/config-empty.yml | 18 + minifi-bootstrap/src/test/resources/default.yml | 101 ++++ .../src/main/resources/conf/bootstrap.conf | 4 +- 26 files changed, 1699 insertions(+), 1207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml index 7a0e6b5..7c5266d 100644 --- a/minifi-assembly/pom.xml +++ b/minifi-assembly/pom.xml @@ -261,7 +261,7 @@ limitations under the License. <!-- nifi.properties: web properties --> <nifi.web.war.directory>./lib</nifi.web.war.directory> <nifi.web.http.host /> - <nifi.web.http.port>8080</nifi.web.http.port> + <nifi.web.http.port>8081</nifi.web.http.port> <nifi.web.https.host /> <nifi.web.https.port /> <nifi.jetty.work.dir>./work/jetty</nifi.jetty.work.dir> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java index 98d06f3..82b583f 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java @@ -34,6 +34,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -51,10 +52,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; @@ -64,6 +67,8 @@ import org.apache.nifi.util.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + /** * <p> * The class which bootstraps Apache MiNiFi. This class looks for the @@ -116,7 +121,6 @@ public class RunMiNiFi { private final Lock startedLock = new ReentrantLock(); private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); - private final File bootstrapConfigFile; // used for logging initial info; these will be logged to console by default when the app is started @@ -130,7 +134,10 @@ public class RunMiNiFi { private volatile int gracefulShutdownSeconds; private Set<ConfigurationChangeNotifier> changeNotifiers; - private ConfigurationChangeListener changeListener; + private MiNiFiConfigurationChangeListener changeListener; + + // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again. + private AtomicBoolean reloading = new AtomicBoolean(false); private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); @@ -273,12 +280,21 @@ public class RunMiNiFi { final File confDir = bootstrapConfigFile.getParentFile(); final File nifiHome = confDir.getParentFile(); final File bin = new File(nifiHome, "bin"); - final File lockFile = new File(bin, "minifi.reload.lock"); + final File reloadFile = new File(bin, "minifi.reload.lock"); - logger.debug("Reload File: {}", lockFile); - return lockFile; + logger.debug("Reload File: {}", reloadFile); + return reloadFile; + } + + public File getSwapFile(final Logger logger) { + final File confDir = bootstrapConfigFile.getParentFile(); + final File swapFile = new File(confDir, "swap.yml"); + + logger.debug("Swap File: {}", swapFile); + return swapFile; } + private Properties loadProperties(final Logger logger) throws IOException { final Properties props = new Properties(); final File statusFile = getStatusFile(logger); @@ -663,7 +679,8 @@ public class RunMiNiFi { } } - logger.info("MiNiFi has finished shutting down."); + reloading.set(true); + logger.info("MiNiFi has finished shutting down and will be reloaded."); } } else { logger.error("When sending RELOAD command to MiNiFi, got unexpected response {}", response); @@ -1035,6 +1052,15 @@ public class RunMiNiFi { @SuppressWarnings({"rawtypes", "unchecked"}) public void start() throws IOException, InterruptedException { + final String confDir = getBootstrapProperties().getProperty(CONF_DIR_KEY); + final File configFile = new File(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY)); + try { + performTransformation(new FileInputStream(configFile), confDir); + } catch (ConfigurationChangeException e) { + defaultLogger.error("The config file is malformed, unable to start.", e); + return; + } + Tuple<ProcessBuilder, Process> tuple = startMiNiFi(); if (tuple == null) { cmdLogger.info("Start method returned null, ending start command."); @@ -1048,76 +1074,115 @@ public class RunMiNiFi { ProcessBuilder builder = tuple.getKey(); Process process = tuple.getValue(); - while (true) { - final boolean alive = isAlive(process); + try { + while (true) { + final boolean alive = isAlive(process); - if (alive) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - } else { - final Runtime runtime = Runtime.getRuntime(); - try { - runtime.removeShutdownHook(shutdownHook); - } catch (final IllegalStateException ise) { - // happens when already shutting down - } + if (alive) { + try { + Thread.sleep(1000L); - String now = sdf.format(System.currentTimeMillis()); - if (autoRestartNiFi) { - final File statusFile = getStatusFile(defaultLogger); - if (!statusFile.exists()) { - defaultLogger.info("Status File no longer exists. Will not restart MiNiFi"); - return; - } + if (reloading.get() && getNifiStarted()) { + final File swapConfigFile = getSwapFile(defaultLogger); + if (swapConfigFile.exists()) { + defaultLogger.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration."); - final File lockFile = getLockFile(defaultLogger); - if (lockFile.exists()) { - defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi"); - return; - } + if (swapConfigFile.delete()) { + defaultLogger.info("Swap file was successfully deleted."); + } else { + defaultLogger.info("Swap file was not deleted."); + } + } - final File reloadFile = getReloadFile(defaultLogger); - if (reloadFile.exists()) { - defaultLogger.info("Currently reloading configuration. Will not restart MiNiFi."); - Thread.sleep(5000L); - continue; - } + reloading.set(false); + } - final boolean previouslyStarted = getNifiStarted(); - if (!previouslyStarted) { - defaultLogger.info("MiNiFi never started. Will not restart MiNiFi"); - return; - } else { - setNiFiStarted(false); + } catch (final InterruptedException ie) { + } + } else { + final Runtime runtime = Runtime.getRuntime(); + try { + runtime.removeShutdownHook(shutdownHook); + } catch (final IllegalStateException ise) { + // happens when already shutting down } - process = builder.start(); - handleLogging(process); + if (autoRestartNiFi) { + final File statusFile = getStatusFile(defaultLogger); + if (!statusFile.exists()) { + defaultLogger.info("Status File no longer exists. Will not restart MiNiFi"); + return; + } - Long pid = getPid(process, defaultLogger); - if (pid != null) { - nifiPid = pid; - final Properties nifiProps = new Properties(); - nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(nifiProps, defaultLogger); - } + final File lockFile = getLockFile(defaultLogger); + if (lockFile.exists()) { + defaultLogger.info("A shutdown was initiated. Will not restart MiNiFi"); + return; + } - shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); - runtime.addShutdownHook(shutdownHook); + final File reloadFile = getReloadFile(defaultLogger); + if (reloadFile.exists()) { + defaultLogger.info("Currently reloading configuration. Will wait to restart MiNiFi."); + Thread.sleep(5000L); + continue; + } + + final boolean previouslyStarted = getNifiStarted(); + if (!previouslyStarted) { + final File swapConfigFile = getSwapFile(defaultLogger); + if (swapConfigFile.exists()) { + defaultLogger.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration."); + + try { + performTransformation(new FileInputStream(swapConfigFile), confDir); + } catch (ConfigurationChangeException e) { + defaultLogger.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually."); + return; + } - final boolean started = waitForStart(); + Files.copy(swapConfigFile.toPath(), Paths.get(getBootstrapProperties().getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING); + + defaultLogger.info("Replacing config file with swap file and deleting swap file"); + if (!swapConfigFile.delete()) { + defaultLogger.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually."); + } + reloading.set(false); + } else { + defaultLogger.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi"); + return; + } + } else { + setNiFiStarted(false); + } - if (started) { - defaultLogger.info("Successfully started Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid)); + process = builder.start(); + handleLogging(process); + + Long pid = getPid(process, defaultLogger); + if (pid != null) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps, defaultLogger); + } + + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); + runtime.addShutdownHook(shutdownHook); + + final boolean started = waitForStart(); + + if (started) { + defaultLogger.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid)); + } else { + defaultLogger.error("Apache MiNiFi does not appear to have started"); + } } else { - defaultLogger.error("Apache MiNiFi does not appear to have started"); + return; } - } else { - return; } } + } finally { + shutdownChangeNotifiers(); } } @@ -1255,7 +1320,7 @@ public class RunMiNiFi { defaultLogger.warn("Apache MiNiFi has started but failed to persist MiNiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe}); } - defaultLogger.info("Apache MiNiFi now running and listening for Bootstrap requests on port {}", port); + defaultLogger.info("The thread to run Apache MiNiFi is now running and listening for Bootstrap requests on port {}", port); } int getNiFiCommandControlPort() { @@ -1328,7 +1393,7 @@ public class RunMiNiFi { } @Override - public void handleChange(InputStream configInputStream) { + public void handleChange(InputStream configInputStream) throws ConfigurationChangeException { logger.info("Received notification of a change"); try { final Properties bootstrapProperties = runner.getBootstrapProperties(); @@ -1346,26 +1411,51 @@ public class RunMiNiFi { final ByteArrayInputStream newConfigBais = new ByteArrayInputStream(bufferedConfigOs.toByteArray()); newConfigBais.mark(-1); - logger.info("Persisting changes to {}", configFile.getAbsolutePath()); - saveFile(newConfigBais, configFile); - - // Reset the input stream to provide to the transformer - newConfigBais.reset(); - - final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); - logger.info("Performing transformation for input and saving outputs to {}", configFile); - performTransformation(newConfigBais, confDir); - - logger.info("Reloading instance with new configuration"); - restartInstance(); + final File swapConfigFile = runner.getSwapFile(logger); + logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath()); + Files.copy(new FileInputStream(configFile), swapConfigFile.toPath(), REPLACE_EXISTING); + try { + logger.info("Persisting changes to {}", configFile.getAbsolutePath()); + saveFile(newConfigBais, configFile); + + try { + // Reset the input stream to provide to the transformer + newConfigBais.reset(); + + final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); + logger.info("Performing transformation for input and saving outputs to {}", confDir); + performTransformation(newConfigBais, confDir); + + logger.info("Reloading instance with new configuration"); + restartInstance(); + } catch (Exception e){ + logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting."); + Files.copy(new FileInputStream(swapConfigFile), configFile.toPath(), REPLACE_EXISTING); + throw e; + } + } catch (Exception e){ + logger.debug("Transformation of new config file failed after swap file was created, deleting it."); + if(!swapConfigFile.delete()){ + logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually."); + } + throw e; + } + } catch (ConfigurationChangeException e){ + logger.error("Unable to carry out reloading of configuration on receipt of notification event", e); + throw e; } catch (IOException ioe) { logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe); - throw new IllegalStateException("Unable to perform reload of received configuration change", ioe); + throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe); } } - private void saveFile(final InputStream configInputStream, File configFile) { + @Override + public String getDescriptor() { + return "MiNiFiConfigurationChangeListener"; + } + + private void saveFile(final InputStream configInputStream, File configFile) throws IOException { try { try (final FileOutputStream configFileOutputStream = new FileOutputStream(configFile)) { byte[] copyArray = new byte[1024]; @@ -1375,29 +1465,31 @@ public class RunMiNiFi { } } } catch (IOException ioe) { - throw new IllegalStateException("Unable to save updated configuration to the configured config file location", ioe); + throw new IOException("Unable to save updated configuration to the configured config file location", ioe); } } - private void performTransformation(InputStream configIs, String configDestinationPath) { - try { - ConfigTransformer.transformConfigFile(configIs, configDestinationPath); - } catch (Exception e) { - logger.error("Unable to successfully transform the provided configuration", e); - throw new IllegalStateException("Unable to successfully transform the provided configuration", e); - } - } - private void restartInstance() { - logger.info("Restarting MiNiFi with new configuration"); + + private void restartInstance() throws IOException { try { runner.reload(); } catch (IOException e) { - throw new IllegalStateException("Unable to successfully restart MiNiFi instance after configuration change.", e); + throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e); } } } + private static void performTransformation(InputStream configIs, String configDestinationPath) throws ConfigurationChangeException, IOException { + try { + ConfigTransformer.transformConfigFile(configIs, configDestinationPath); + } catch (ConfigurationChangeException e){ + throw e; + } catch (Exception e) { + throw new IOException("Unable to successfully transform the provided configuration", e); + } + } + private static class Status { private final Integer port; http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java index 13f0d16..ad3a2df 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/ShutdownHook.java @@ -101,5 +101,7 @@ public class ShutdownHook extends Thread { if (!statusFile.delete()) { System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually"); } + + System.out.println("MiNiFi is done shutting down"); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java new file mode 100644 index 0000000..04bbb02 --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration; + +/** + * Exception to indicate there was a problem handling a change to the configuration + */ + +public class ConfigurationChangeException extends Exception { + + public ConfigurationChangeException() { + super(); + } + + public ConfigurationChangeException(String message) { + super(message); + } + + public ConfigurationChangeException(String message, Throwable cause) { + super(message, cause); + } + + public ConfigurationChangeException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java index 7d9183a..756b051 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeListener.java @@ -29,6 +29,10 @@ public interface ConfigurationChangeListener { * * @param is stream of the detected content received from the change notifier */ - void handleChange(InputStream is); + void handleChange(InputStream is) throws ConfigurationChangeException; + /** + * Returns a succinct string identifying this particular listener + */ + String getDescriptor(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java index 7ad32f1..745ce6c 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeNotifier.java @@ -17,6 +17,7 @@ package org.apache.nifi.minifi.bootstrap.configuration; import java.io.Closeable; +import java.util.Collection; import java.util.Properties; import java.util.Set; @@ -52,6 +53,5 @@ public interface ConfigurationChangeNotifier extends Closeable { /** * Provide the mechanism by which listeners are notified */ - void notifyListeners(); - + Collection<ListenerHandleResult> notifyListeners(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java deleted file mode 100644 index d3f51f7..0000000 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/FileChangeNotifier.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.minifi.bootstrap.configuration; - -import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; -import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.Collections; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}. Upon modifications to the associated file, - * associated listeners receive notification of a change allowing configuration logic to be reanalyzed. The backing implementation is associated with a {@link ScheduledExecutorService} that - * ensures continuity of monitoring. - */ -public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier { - - private Path configFile; - private WatchService watchService; - private long pollingSeconds; - - private ScheduledExecutorService executorService; - private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>(); - - protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path"; - protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds"; - - protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15; - protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS; - - @Override - public Set<ConfigurationChangeListener> getChangeListeners() { - return Collections.unmodifiableSet(configurationChangeListeners); - } - - @Override - public void notifyListeners() { - final File fileToRead = configFile.toFile(); - for (final ConfigurationChangeListener listener : getChangeListeners()) { - try (final FileInputStream fis = new FileInputStream(fileToRead);) { - listener.handleChange(fis); - } catch (IOException ex) { - throw new IllegalStateException("Unable to read the changed file " + configFile, ex); - } - } - } - - @Override - public boolean registerListener(ConfigurationChangeListener listener) { - return this.configurationChangeListeners.add(listener); - } - - protected boolean targetChanged() { - boolean targetChanged = false; - - final WatchKey watchKey = this.watchService.poll(); - - if (watchKey == null) { - return targetChanged; - } - - for (WatchEvent<?> watchEvt : watchKey.pollEvents()) { - final WatchEvent.Kind<?> evtKind = watchEvt.kind(); - - final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt; - final Path changedFile = pathEvent.context(); - - // determine target change by verifying if the changed file corresponds to the config file monitored for this path - targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1))); - } - - // After completing inspection, reset for detection of subsequent change events - boolean valid = watchKey.reset(); - if (!valid) { - throw new IllegalStateException("Unable to reinitialize file system watcher."); - } - - return targetChanged; - } - - protected static WatchService initializeWatcher(Path filePath) { - try { - final WatchService fsWatcher = FileSystems.getDefault().newWatchService(); - final Path watchDirectory = filePath.getParent(); - watchDirectory.register(fsWatcher, ENTRY_MODIFY); - - return fsWatcher; - } catch (IOException ioe) { - throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe); - } - } - - @Override - public void run() { - if (targetChanged()) { - notifyListeners(); - } - } - - @Override - public void initialize(Properties properties) { - final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY); - final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)); - - if (rawPath == null || rawPath.isEmpty()) { - throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified."); - } - - try { - setConfigFile(Paths.get(rawPath)); - setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT); - setWatchService(initializeWatcher(configFile)); - } catch (Exception e) { - throw new IllegalStateException("Could not successfully initialize file change notifier.", e); - } - } - - protected void setConfigFile(Path configFile) { - this.configFile = configFile; - } - - protected void setWatchService(WatchService watchService) { - this.watchService = watchService; - } - - protected void setPollingPeriod(long duration, TimeUnit unit) { - if (duration < 0) { - throw new IllegalArgumentException("Cannot specify a polling period with duration <=0"); - } - this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit); - } - - @Override - public void start() { - executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - final Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("File Change Notifier Thread"); - t.setDaemon(true); - return t; - } - }); - this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT); - } - - @Override - public void close() { - if (this.executorService != null) { - this.executorService.shutdownNow(); - } - } -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java new file mode 100644 index 0000000..8ac4cea --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ListenerHandleResult.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration; + +public class ListenerHandleResult { + + private final ConfigurationChangeListener configurationChangeListener; + private final Exception failureCause; + + public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener){ + this.configurationChangeListener = configurationChangeListener; + failureCause = null; + } + + public ListenerHandleResult(ConfigurationChangeListener configurationChangeListener, Exception failureCause){ + this.configurationChangeListener = configurationChangeListener; + this.failureCause = failureCause; + } + + public boolean succeeded(){ + return failureCause == null; + } + + public String getDescriptor(){ + return configurationChangeListener.getDescriptor(); + } + + public Exception getFailureCause(){ + return failureCause; + } + + @Override + public String toString() { + if(failureCause == null){ + return getDescriptor() + " successfully handled the configuration change"; + } else { + return getDescriptor() + " FAILED to handle the configuration change due to: '" + failureCause.getMessage() + "'"; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java deleted file mode 100644 index 5807f89..0000000 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/RestChangeNotifier.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.bootstrap.configuration; - -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX; - - -public class RestChangeNotifier implements ConfigurationChangeNotifier { - - private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>(); - private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class); - private String configFile = null; - private final Server jetty; - public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" + - "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" + - "Send a POST http request to '/' to upload the file."; - public static final String POST_TEXT ="Configuration received, notifying listeners.\n"; - public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n"; - - - public static final String POST = "POST"; - public static final String GET = "GET"; - - public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port"; - public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host"; - public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location"; - public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password"; - public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type"; - public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location"; - public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password"; - public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type"; - public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth"; - - public RestChangeNotifier(){ - QueuedThreadPool queuedThreadPool = new QueuedThreadPool(); - queuedThreadPool.setDaemon(true); - jetty = new Server(queuedThreadPool); - } - - @Override - public void initialize(Properties properties) { - logger.info("Initializing"); - - // create the secure connector if keystore location is specified - if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) { - createSecureConnector(properties); - } else { - // create the unsecure connector otherwise - createConnector(properties); - } - - HandlerCollection handlerCollection = new HandlerCollection(true); - handlerCollection.addHandler(new JettyHandler()); - jetty.setHandler(handlerCollection); - } - - - @Override - public Set<ConfigurationChangeListener> getChangeListeners() { - return configurationChangeListeners; - } - - @Override - public boolean registerListener(ConfigurationChangeListener listener) { - return configurationChangeListeners.add(listener); - } - - @Override - public void notifyListeners() { - if (configFile == null){ - throw new IllegalStateException("Attempting to notify listeners when there is no new config file."); - } - - for (final ConfigurationChangeListener listener : getChangeListeners()) { - try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes());) { - listener.handleChange(fis); - } catch (IOException ex) { - throw new IllegalStateException("Unable to read the changed file " + configFile, ex); - } - } - - configFile = null; - } - - @Override - public void start(){ - try { - jetty.start(); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void close() throws IOException { - logger.warn("Shutting down the jetty server"); - try { - jetty.stop(); - jetty.destroy(); - } catch (Exception e) { - throw new IOException(e); - } - logger.warn("Done shutting down the jetty server"); - } - - public URI getURI(){ - return jetty.getURI(); - } - - public int getPort(){ - if (!jetty.isStarted()) { - throw new IllegalStateException("Jetty server not started"); - } - return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); - } - - public String getConfigString(){ - return configFile; - } - - private void setConfigFile(String configFile){ - this.configFile = configFile; - } - - private void createConnector(Properties properties) { - final ServerConnector http = new ServerConnector(jetty); - - http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0"))); - http.setHost(properties.getProperty(HOST_KEY, "localhost")); - - // Severely taxed or distant environments may have significant delays when executing. - http.setIdleTimeout(30000L); - jetty.addConnector(http); - - logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()}); - } - - private void createSecureConnector(Properties properties) { - SslContextFactory ssl = new SslContextFactory(); - - if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) { - ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY)); - ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY)); - ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY)); - } - - if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) { - ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY)); - ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY)); - ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY)); - ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true"))); - } - - // build the connector - final ServerConnector https = new ServerConnector(jetty, ssl); - - // set host and port - https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0"))); - https.setHost(properties.getProperty(HOST_KEY, "localhost")); - - // Severely taxed environments may have significant delays when executing. - https.setIdleTimeout(30000L); - - // add the connector - jetty.addConnector(https); - - logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()}); - } - - - public class JettyHandler extends AbstractHandler { - - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - - logRequest(request); - - baseRequest.setHandled(true); - - if(POST.equals(request.getMethod())) { - final StringBuilder configBuilder = new StringBuilder(); - BufferedReader reader = request.getReader(); - if(reader != null && reader.ready()){ - String line; - while ((line = reader.readLine()) != null) { - configBuilder.append(line); - configBuilder.append(System.getProperty("line.separator")); - } - } - setConfigFile(configBuilder.substring(0,configBuilder.length()-1)); - notifyListeners(); - writeOutput(response, POST_TEXT, 200); - } else if(GET.equals(request.getMethod())) { - writeOutput(response, GET_TEXT, 200); - } else { - writeOutput(response, OTHER_TEXT, 404); - } - } - - private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException { - response.setStatus(responseCode); - response.setContentType("text/plain"); - response.setContentLength(responseText.length()); - try (PrintWriter writer = response.getWriter()) { - writer.print(responseText); - writer.flush(); - } - } - - private void logRequest(HttpServletRequest request){ - logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); - logger.info("request method = " + request.getMethod()); - logger.info("request url = " + request.getRequestURL()); - logger.info("context path = " + request.getContextPath()); - logger.info("request content type = " + request.getContentType()); - logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); - } - - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java new file mode 100644 index 0000000..faba2f0 --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/FileChangeNotifier.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.configuration.notifiers; + +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; +import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * FileChangeNotifier provides a simple FileSystem monitor for detecting changes for a specified file as generated from its corresponding {@link Path}. Upon modifications to the associated file, + * associated listeners receive notification of a change allowing configuration logic to be reanalyzed. The backing implementation is associated with a {@link ScheduledExecutorService} that + * ensures continuity of monitoring. + */ +public class FileChangeNotifier implements Runnable, ConfigurationChangeNotifier { + + private Path configFile; + private WatchService watchService; + private long pollingSeconds; + + private final static Logger logger = LoggerFactory.getLogger(FileChangeNotifier.class); + private ScheduledExecutorService executorService; + private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>(); + + protected static final String CONFIG_FILE_PATH_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.config.path"; + protected static final String POLLING_PERIOD_INTERVAL_KEY = NOTIFIER_PROPERTY_PREFIX + ".file.polling.period.seconds"; + + protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15; + protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS; + + @Override + public Set<ConfigurationChangeListener> getChangeListeners() { + return Collections.unmodifiableSet(configurationChangeListeners); + } + + @Override + public Collection<ListenerHandleResult> notifyListeners() { + logger.info("Notifying Listeners of a change"); + final File fileToRead = configFile.toFile(); + + Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size()); + for (final ConfigurationChangeListener listener : getChangeListeners()) { + ListenerHandleResult result; + try (final FileInputStream fis = new FileInputStream(fileToRead);) { + listener.handleChange(fis); + result = new ListenerHandleResult(listener); + } catch (IOException | ConfigurationChangeException ex) { + result = new ListenerHandleResult(listener, ex); + } + listenerHandleResults.add(result); + logger.info("Listener notification result:" + result.toString()); + } + return listenerHandleResults; + } + + @Override + public boolean registerListener(ConfigurationChangeListener listener) { + return this.configurationChangeListeners.add(listener); + } + + protected boolean targetChanged() { + boolean targetChanged = false; + + final WatchKey watchKey = this.watchService.poll(); + + if (watchKey == null) { + return targetChanged; + } + + for (WatchEvent<?> watchEvt : watchKey.pollEvents()) { + final WatchEvent.Kind<?> evtKind = watchEvt.kind(); + + final WatchEvent<Path> pathEvent = (WatchEvent<Path>) watchEvt; + final Path changedFile = pathEvent.context(); + + // determine target change by verifying if the changed file corresponds to the config file monitored for this path + targetChanged = (evtKind == ENTRY_MODIFY && changedFile.equals(configFile.getName(configFile.getNameCount() - 1))); + } + + // After completing inspection, reset for detection of subsequent change events + boolean valid = watchKey.reset(); + if (!valid) { + throw new IllegalStateException("Unable to reinitialize file system watcher."); + } + + return targetChanged; + } + + protected static WatchService initializeWatcher(Path filePath) { + try { + final WatchService fsWatcher = FileSystems.getDefault().newWatchService(); + final Path watchDirectory = filePath.getParent(); + watchDirectory.register(fsWatcher, ENTRY_MODIFY); + + return fsWatcher; + } catch (IOException ioe) { + throw new IllegalStateException("Unable to initialize a file system watcher for the path " + filePath, ioe); + } + } + + @Override + public void run() { + logger.debug("Checking for a change"); + if (targetChanged()) { + notifyListeners(); + } + } + + @Override + public void initialize(Properties properties) { + final String rawPath = properties.getProperty(CONFIG_FILE_PATH_KEY); + final String rawPollingDuration = properties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)); + + if (rawPath == null || rawPath.isEmpty()) { + throw new IllegalArgumentException("Property, " + CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified."); + } + + try { + setConfigFile(Paths.get(rawPath)); + setPollingPeriod(Long.parseLong(rawPollingDuration), DEFAULT_POLLING_PERIOD_UNIT); + setWatchService(initializeWatcher(configFile)); + } catch (Exception e) { + throw new IllegalStateException("Could not successfully initialize file change notifier.", e); + } + } + + protected void setConfigFile(Path configFile) { + this.configFile = configFile; + } + + protected void setWatchService(WatchService watchService) { + this.watchService = watchService; + } + + protected void setPollingPeriod(long duration, TimeUnit unit) { + if (duration < 0) { + throw new IllegalArgumentException("Cannot specify a polling period with duration <=0"); + } + this.pollingSeconds = TimeUnit.SECONDS.convert(duration, unit); + } + + @Override + public void start() { + executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("File Change Notifier Thread"); + t.setDaemon(true); + return t; + } + }); + this.executorService.scheduleWithFixedDelay(this, 0, pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT); + } + + @Override + public void close() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + } + } +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/66dbda90/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java ---------------------------------------------------------------------- diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java new file mode 100644 index 0000000..777214f --- /dev/null +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/notifiers/RestChangeNotifier.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.bootstrap.configuration.notifiers; + +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier; +import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.NOTIFIER_PROPERTY_PREFIX; + + +public class RestChangeNotifier implements ConfigurationChangeNotifier { + + private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>(); + private final static Logger logger = LoggerFactory.getLogger(RestChangeNotifier.class); + private String configFile = null; + private final Server jetty; + public static final String GET_TEXT = "This is a config change listener for an Apache NiFi - MiNiFi instance.\n" + + "Use this rest server to upload a conf.yml to configure the MiNiFi instance.\n" + + "Send a POST http request to '/' to upload the file."; + public static final String OTHER_TEXT ="This is not a support HTTP operation. Please use GET to get more information or POST to upload a new config.yml file.\n"; + + + public static final String POST = "POST"; + public static final String GET = "GET"; + + public static final String PORT_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.port"; + public static final String HOST_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.host"; + public static final String TRUSTSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.location"; + public static final String TRUSTSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.password"; + public static final String TRUSTSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.truststore.type"; + public static final String KEYSTORE_LOCATION_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.location"; + public static final String KEYSTORE_PASSWORD_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.password"; + public static final String KEYSTORE_TYPE_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.keystore.type"; + public static final String NEED_CLIENT_AUTH_KEY = NOTIFIER_PROPERTY_PREFIX + ".http.need.client.auth"; + + public RestChangeNotifier(){ + QueuedThreadPool queuedThreadPool = new QueuedThreadPool(); + queuedThreadPool.setDaemon(true); + jetty = new Server(queuedThreadPool); + } + + @Override + public void initialize(Properties properties) { + logger.info("Initializing"); + + // create the secure connector if keystore location is specified + if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) { + createSecureConnector(properties); + } else { + // create the unsecure connector otherwise + createConnector(properties); + } + + HandlerCollection handlerCollection = new HandlerCollection(true); + handlerCollection.addHandler(new JettyHandler()); + jetty.setHandler(handlerCollection); + } + + @Override + public Set<ConfigurationChangeListener> getChangeListeners() { + return configurationChangeListeners; + } + + @Override + public boolean registerListener(ConfigurationChangeListener listener) { + return configurationChangeListeners.add(listener); + } + + @Override + public Collection<ListenerHandleResult> notifyListeners() { + if (configFile == null){ + throw new IllegalStateException("Attempting to notify listeners when there is no new config file."); + } + + Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size()); + for (final ConfigurationChangeListener listener : getChangeListeners()) { + ListenerHandleResult result; + try (final ByteArrayInputStream fis = new ByteArrayInputStream(configFile.getBytes())) { + listener.handleChange(fis); + result = new ListenerHandleResult(listener); + } catch (IOException | ConfigurationChangeException ex) { + result = new ListenerHandleResult(listener, ex); + } + listenerHandleResults.add(result); + logger.info("Listener notification result:" + result.toString()); + } + + configFile = null; + return listenerHandleResults; + } + + @Override + public void start(){ + try { + jetty.start(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + + @Override + public void close() throws IOException { + logger.warn("Shutting down the jetty server"); + try { + jetty.stop(); + jetty.destroy(); + } catch (Exception e) { + throw new IOException(e); + } + logger.warn("Done shutting down the jetty server"); + } + + public URI getURI(){ + return jetty.getURI(); + } + + public int getPort(){ + if (!jetty.isStarted()) { + throw new IllegalStateException("Jetty server not started"); + } + return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort(); + } + + public String getConfigString(){ + return configFile; + } + + private void setConfigFile(String configFile){ + this.configFile = configFile; + } + + private void createConnector(Properties properties) { + final ServerConnector http = new ServerConnector(jetty); + + http.setPort(Integer.parseInt(properties.getProperty(PORT_KEY, "0"))); + http.setHost(properties.getProperty(HOST_KEY, "localhost")); + + // Severely taxed or distant environments may have significant delays when executing. + http.setIdleTimeout(30000L); + jetty.addConnector(http); + + logger.info("Added an http connector on the host '{}' and port '{}'", new Object[]{http.getHost(), http.getPort()}); + } + + private void createSecureConnector(Properties properties) { + SslContextFactory ssl = new SslContextFactory(); + + if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) { + ssl.setKeyStorePath(properties.getProperty(KEYSTORE_LOCATION_KEY)); + ssl.setKeyStorePassword(properties.getProperty(KEYSTORE_PASSWORD_KEY)); + ssl.setKeyStoreType(properties.getProperty(KEYSTORE_TYPE_KEY)); + } + + if (properties.getProperty(TRUSTSTORE_LOCATION_KEY) != null) { + ssl.setTrustStorePath(properties.getProperty(TRUSTSTORE_LOCATION_KEY)); + ssl.setTrustStorePassword(properties.getProperty(TRUSTSTORE_PASSWORD_KEY)); + ssl.setTrustStoreType(properties.getProperty(TRUSTSTORE_TYPE_KEY)); + ssl.setNeedClientAuth(Boolean.parseBoolean(properties.getProperty(NEED_CLIENT_AUTH_KEY, "true"))); + } + + // build the connector + final ServerConnector https = new ServerConnector(jetty, ssl); + + // set host and port + https.setPort(Integer.parseInt(properties.getProperty(PORT_KEY,"0"))); + https.setHost(properties.getProperty(HOST_KEY, "localhost")); + + // Severely taxed environments may have significant delays when executing. + https.setIdleTimeout(30000L); + + // add the connector + jetty.addConnector(https); + + logger.info("Added an https connector on the host '{}' and port '{}'", new Object[]{https.getHost(), https.getPort()}); + } + + + public class JettyHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + + logRequest(request); + + baseRequest.setHandled(true); + + if(POST.equals(request.getMethod())) { + final StringBuilder configBuilder = new StringBuilder(); + BufferedReader reader = request.getReader(); + if(reader != null && reader.ready()){ + String line; + while ((line = reader.readLine()) != null) { + configBuilder.append(line); + configBuilder.append(System.getProperty("line.separator")); + } + } + setConfigFile(configBuilder.substring(0,configBuilder.length()-1)); + Collection<ListenerHandleResult> listenerHandleResults = notifyListeners(); + + int statusCode = 200; + for (ListenerHandleResult result: listenerHandleResults){ + if(!result.succeeded()){ + statusCode = 500; + break; + } + } + + writeOutput(response, getPostText(listenerHandleResults), statusCode); + } else if(GET.equals(request.getMethod())) { + writeOutput(response, GET_TEXT, 200); + } else { + writeOutput(response, OTHER_TEXT, 404); + } + } + + private String getPostText(Collection<ListenerHandleResult> listenerHandleResults){ + StringBuilder postResult = new StringBuilder("The result of notifying listeners:\n"); + + for (ListenerHandleResult result : listenerHandleResults) { + postResult.append(result.toString()); + postResult.append("\n"); + } + + return postResult.toString(); + } + + private void writeOutput(HttpServletResponse response, String responseText, int responseCode) throws IOException { + response.setStatus(responseCode); + response.setContentType("text/plain"); + response.setContentLength(responseText.length()); + try (PrintWriter writer = response.getWriter()) { + writer.print(responseText); + writer.flush(); + } + } + + private void logRequest(HttpServletRequest request){ + logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); + logger.info("request method = " + request.getMethod()); + logger.info("request url = " + request.getRequestURL()); + logger.info("context path = " + request.getContextPath()); + logger.info("request content type = " + request.getContentType()); + logger.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); + } + + } +}
