NIFI-883 Fixing issue HandleHttpRequest had with PrimaryNodeOnly scheduling
Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2ae49026 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2ae49026 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2ae49026 Branch: refs/heads/NIFI-730 Commit: 2ae49026e898ef9ea154990f4d3a5da3ee8d8129 Parents: 37e2f17 Author: Joseph Percivall <[email protected]> Authored: Mon Nov 2 10:17:32 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Nov 2 10:32:14 2015 -0500 ---------------------------------------------------------------------- .../processors/standard/HandleHttpRequest.java | 23 +++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2ae49026/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 49bad40..1be8dd9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import javax.security.cert.X509Certificate; @@ -226,6 +227,7 @@ public class HandleHttpRequest extends AbstractProcessor { .build(); private volatile Server server; + private AtomicBoolean initialized = new AtomicBoolean(false); private final BlockingQueue<HttpRequestContainer> containerQueue = new LinkedBlockingQueue<>(50); @Override @@ -255,7 +257,15 @@ public class HandleHttpRequest extends AbstractProcessor { } @OnScheduled - public void initializeServer(final ProcessContext context) throws Exception { + public void clearInit(){ + initialized.set(false); + } + + private synchronized void initializeServer(final ProcessContext context) throws Exception { + if(initialized.get()){ + return; + } + final String host = context.getProperty(HOSTNAME).getValue(); final int port = context.getProperty(PORT).asInteger(); final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); @@ -402,6 +412,8 @@ public class HandleHttpRequest extends AbstractProcessor { server.start(); getLogger().info("Server started and listening on port " + getPort()); + + initialized.set(true); } protected int getPort() { @@ -452,6 +464,15 @@ public class HandleHttpRequest extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + try { + if(!initialized.get()) { + initializeServer(context); + } + } catch (Exception e) { + context.yield(); + throw new ProcessException("Failed to initialize the server",e); + } + final HttpRequestContainer container = containerQueue.poll(); if (container == null) { return;
