Repository: falcon Updated Branches: refs/heads/master 4ad28f630 -> 6bbfe2366
FALCON-348 Add shutdown hook for Falcon (Contributed by Sandeep Samudrala) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6bbfe236 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6bbfe236 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6bbfe236 Branch: refs/heads/master Commit: 6bbfe2366b32d885a5b67fc4accb11ed76b889ca Parents: 4ad28f6 Author: Pallavi Rao <[email protected]> Authored: Wed Sep 9 14:31:19 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Sep 9 14:31:19 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + prism/src/main/java/org/apache/falcon/Main.java | 19 +++++++++++++++-- .../rerun/handler/AbstractRerunConsumer.java | 22 +++++++++++++------- .../rerun/handler/AbstractRerunHandler.java | 4 ++++ .../falcon/rerun/handler/LateRerunHandler.java | 10 ++++++++- .../falcon/rerun/handler/RetryHandler.java | 9 +++++++- .../apache/falcon/rerun/queue/ActiveMQueue.java | 3 --- .../falcon/rerun/service/LateRunService.java | 6 ++++-- 8 files changed, 57 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 30f2b8c..196490d 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ Trunk (Unreleased) FALCON-1250 Throw error when keys in startup.properties do not start with "*." or domain+"."(Narayan Periwal via Ajay Yadava) + FALCON-348 Add shutdown hook for Falcon (Sandeep Samudrala via Pallavi Rao) OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/prism/src/main/java/org/apache/falcon/Main.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/Main.java b/prism/src/main/java/org/apache/falcon/Main.java index 96e003c..d8bbfbd 100644 --- a/prism/src/main/java/org/apache/falcon/Main.java +++ b/prism/src/main/java/org/apache/falcon/Main.java @@ -38,6 +38,8 @@ public final class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); private static final String APP_PATH = "app"; private static final String APP_PORT = "port"; + private static EmbeddedServer server; + private static BrokerService broker; /** * Prevent users from constructing this. @@ -60,7 +62,19 @@ public final class Main { return new GnuParser().parse(options, args); } + static class ShutDown extends Thread { + public void run() { + try { + LOG.info("calling shutdown hook"); + server.stop(); + broker.stop(); + } catch (Exception e) { + LOG.error("Server shutdown failed with " , e); + } + } + } public static void main(String[] args) throws Exception { + Runtime.getRuntime().addShutdownHook(new ShutDown()); CommandLine cmd = parseArgs(args); String projectVersion = BuildProperties.get().getProperty("project.version"); String appPath = "webapp/target/falcon-webapp-" + projectVersion; @@ -79,7 +93,7 @@ public final class Main { LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); LOG.info("Server starting with TLS ? {} on port {}", enableTLS, appPort); LOG.info("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); - EmbeddedServer server = EmbeddedServer.newServer(appPort, appPath, enableTLS); + server = EmbeddedServer.newServer(appPort, appPath, enableTLS); server.start(); } @@ -109,12 +123,13 @@ public final class Main { int mqport = Integer.valueOf(System.getProperty("falcon.embeddedmq.port", "61616")); LOG.info("Starting ActiveMQ at port {} with data dir {}", mqport, dataDir); - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setUseJmx(false); broker.setDataDirectory(dataDir); broker.addConnector("vm://localhost"); broker.addConnector("tcp://0.0.0.0:" + mqport); broker.setSchedulerSupport(true); + broker.setUseShutdownHook(false); broker.start(); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java index 9ee94c5..582cb15 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.rerun.handler; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; import org.apache.falcon.entity.v0.Frequency; @@ -50,20 +51,25 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst int attempt = 1; AbstractRerunPolicy policy = new ExpBackoffPolicy(); Frequency frequency = new Frequency("minutes(1)"); - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { T message; try { message = handler.takeFromQueue(); attempt = 1; } catch (FalconException e) { - LOG.error("Error while reading message from the queue", e); - GenericAlert.alertRerunConsumerFailed( - "Error while reading message from the queue: ", e); - Thread.sleep(policy.getDelay(frequency, attempt)); - handler.reconnect(); - attempt++; - continue; + if (ExceptionUtils.getRootCause(e) instanceof InterruptedException){ + LOG.info("Rerun handler daemon has been interrupted"); + return; + } else { + LOG.error("Error while reading message from the queue", e); + GenericAlert.alertRerunConsumerFailed( + "Error while reading message from the queue: ", e); + Thread.sleep(policy.getDelay(frequency, attempt)); + handler.reconnect(); + attempt++; + continue; + } } // Login the user to access WfEngine as this user http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java index f019737..64c566e 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java @@ -48,6 +48,10 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay this.delayQueue.init(); } + public void close() throws FalconException { + this.delayQueue.close(); + } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck public abstract void handleRerun(String clusterName, String entityType, String entityName, String nominalTime, String runId, http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index c2cb09e..785dce8 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -52,6 +52,7 @@ import java.util.Date; */ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends AbstractRerunHandler<LaterunEvent, M> { + private Thread daemon; @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -188,13 +189,20 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends @Override public void init(M aDelayQueue) throws FalconException { super.init(aDelayQueue); - Thread daemon = new Thread(new LateRerunConsumer(this)); + daemon = new Thread(new LateRerunConsumer(this)); daemon.setName("LaterunHandler"); daemon.setDaemon(true); daemon.start(); LOG.info("Laterun Handler thread started"); } + @Override + public void close() throws FalconException { + daemon.interrupt(); + super.close(); + } + + public Path getLateLogPath(String logDir, String nominalTime, String srcClusterName) { //SrcClusterName valid only in case of feed http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java index c6bc36f..b952bbe 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java @@ -38,6 +38,7 @@ import org.apache.falcon.workflow.WorkflowExecutionContext; */ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends AbstractRerunHandler<RetryEvent, M> { + private Thread daemon; @Override //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck @@ -85,7 +86,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends @Override public void init(M aDelayQueue) throws FalconException { super.init(aDelayQueue); - Thread daemon = new Thread(new RetryConsumer(this)); + daemon = new Thread(new RetryConsumer(this)); daemon.setName("RetryHandler"); daemon.setDaemon(true); daemon.start(); @@ -93,6 +94,12 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends } @Override + public void close() throws FalconException { + daemon.interrupt(); + super.close(); + } + + @Override public void onSuccess(WorkflowExecutionContext context) throws FalconException { // do nothing since retry does not apply for failed workflows } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java index 021e4cc..3168c31 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java @@ -68,7 +68,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> { event.toString(), event.getDelay(TimeUnit.MILLISECONDS)); return true; } catch (Exception e) { - LOG.error("Unable to offer event: {} to ActiveMQ", event, e); throw new FalconException("Unable to offer event:" + event + " to ActiveMQ", e); } } @@ -91,7 +90,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> { LOG.debug("Dequeued Message: {}", event.toString()); return event; } catch (Exception e) { - LOG.error("Error getting the message from ActiveMQ", e); throw new FalconException("Error getting the message from ActiveMQ: ", e); } } @@ -111,7 +109,6 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> { consumer = session.createConsumer(destination); LOG.info("Initialized Queue on ActiveMQ: {}", destinationName); } catch (Exception e) { - LOG.error("Error starting ActiveMQ connection for delayed queue", e); throw new RuntimeException("Error starting ActiveMQ connection for delayed queue", e); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6bbfe236/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java index 2bb198b..8be6810 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java @@ -39,6 +39,8 @@ public class LateRunService implements FalconService { private ActiveMQueue<LaterunEvent> queue; + private AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler; + @Override public String getName() { return LateRunService.class.getName(); @@ -50,8 +52,7 @@ public class LateRunService implements FalconService { throw new FalconException("WorkflowJobEndNotificationService must be configured ahead"); } - AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = - RerunHandlerFactory.getRerunHandler(RerunType.LATE); + rerunHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE); queue = new ActiveMQueue<LaterunEvent>( StartupProperties.get() .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"), @@ -64,6 +65,7 @@ public class LateRunService implements FalconService { @Override public void destroy() throws FalconException { + rerunHandler.close(); closeQuietly(); LOG.info("LateRun thread destroyed"); }
