NIFI-259: Update ZooKeeperStateServer so that the thread starting it doesn't 
block indefinitely, as this was causing NiFi to not shut down properly; applied 
patch for NIFI-1415


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d696391f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d696391f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d696391f

Branch: refs/heads/master
Commit: d696391f76cb14bf4bdaedb4e88d129aa2e54dc2
Parents: 593f128
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jan 20 11:52:41 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jan 20 11:52:41 2016 -0500

----------------------------------------------------------------------
 .../state/server/ZooKeeperStateServer.java      | 48 +++++++++++++++++---
 .../nifi/processors/standard/FetchSFTP.java     | 18 --------
 2 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d696391f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
index 1a646b0..e65c375 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
@@ -25,8 +25,11 @@ import java.io.InputStream;
 import java.util.Properties;
 
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
@@ -35,8 +38,12 @@ import org.slf4j.LoggerFactory;
 public class ZooKeeperStateServer extends ZooKeeperServerMain {
     private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperStateServer.class);
 
-    private QuorumPeerConfig quorumPeerConfig;
-    private boolean started = false;
+    private final QuorumPeerConfig quorumPeerConfig;
+    private volatile boolean started = false;
+
+    private ServerCnxnFactory connectionFactory;
+    private FileTxnSnapLog transactionLog;
+    private ZooKeeperServer embeddedZkServer;
 
     private ZooKeeperStateServer(final Properties zkProperties) throws 
IOException, ConfigException {
         quorumPeerConfig = new QuorumPeerConfig();
@@ -46,11 +53,25 @@ public class ZooKeeperStateServer extends 
ZooKeeperServerMain {
     public synchronized void start() throws IOException {
         logger.info("Starting Embedded ZooKeeper Server");
 
-        final ServerConfig serverConfig = new ServerConfig();
-        serverConfig.readFrom(quorumPeerConfig);
+        final ServerConfig config = new ServerConfig();
+        config.readFrom(quorumPeerConfig);
         try {
-            runFromConfig(serverConfig);
             started = true;
+
+            transactionLog = new FileTxnSnapLog(new 
File(config.getDataLogDir()), new File(config.getDataDir()));
+
+            embeddedZkServer = new ZooKeeperServer();
+            embeddedZkServer.setTxnLogFactory(transactionLog);
+            embeddedZkServer.setTickTime(config.getTickTime());
+            
embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout());
+            
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
+
+            connectionFactory = ServerCnxnFactory.createFactory();
+            connectionFactory.configure(config.getClientPortAddress(), 
config.getMaxClientCnxns());
+            connectionFactory.startup(embeddedZkServer);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Embedded ZooKeeper Server interrupted", e);
         } catch (final IOException ioe) {
             throw new IOException("Failed to start embedded ZooKeeper Server", 
ioe);
         } catch (final Exception e) {
@@ -62,7 +83,22 @@ public class ZooKeeperStateServer extends 
ZooKeeperServerMain {
     public synchronized void shutdown() {
         if (started) {
             started = false;
-            super.shutdown();
+
+            if (transactionLog != null) {
+                try {
+                    transactionLog.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close Transaction Log", ioe);
+                }
+            }
+
+            if (connectionFactory != null) {
+                connectionFactory.shutdown();
+            }
+
+            if (embeddedZkServer != null && embeddedZkServer.isRunning()) {
+                embeddedZkServer.shutdown();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d696391f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 8379987..dd6d880 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -18,8 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -30,8 +28,6 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
@@ -73,20 +69,6 @@ public class FetchSFTP extends FetchFileTransfer {
     }
 
     @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && 
!(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet()
-            && 
validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) {
-            return Collections.singleton(new ValidationResult.Builder()
-                .subject("Password")
-                .valid(false)
-                .explanation("Must set either password or Private Key Path & 
Passphrase")
-                .build());
-        }
-
-        return Collections.emptyList();
-    }
-
-    @Override
     protected FileTransfer createFileTransfer(final ProcessContext context) {
         return new SFTPTransfer(context, getLogger());
     }

Reply via email to