NIFI-259: Construct QuorumPeer object instead of ZooKeeperServer object because 
we want to join part of a cluster when using embedded ZK Server


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/329e1fe2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/329e1fe2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/329e1fe2

Branch: refs/heads/master
Commit: 329e1fe2f4f17832d38e5058f39cef483bde986c
Parents: fcf837b
Author: Mark Payne <marka...@hotmail.com>
Authored: Mon Jan 25 14:33:22 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Jan 25 14:33:22 2016 -0500

----------------------------------------------------------------------
 .../state/server/ZooKeeperStateServer.java      | 46 ++++++++++++--------
 1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/329e1fe2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
index e65c375..de41764 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
@@ -26,10 +26,10 @@ import java.util.Properties;
 
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain 
{
 
     private ServerCnxnFactory connectionFactory;
     private FileTxnSnapLog transactionLog;
-    private ZooKeeperServer embeddedZkServer;
+    private QuorumPeer quorumPeer;
 
     private ZooKeeperStateServer(final Properties zkProperties) throws 
IOException, ConfigException {
         quorumPeerConfig = new QuorumPeerConfig();
@@ -53,25 +53,33 @@ public class ZooKeeperStateServer extends 
ZooKeeperServerMain {
     public synchronized void start() throws IOException {
         logger.info("Starting Embedded ZooKeeper Server");
 
-        final ServerConfig config = new ServerConfig();
-        config.readFrom(quorumPeerConfig);
         try {
             started = true;
 
-            transactionLog = new FileTxnSnapLog(new 
File(config.getDataLogDir()), new File(config.getDataDir()));
-
-            embeddedZkServer = new ZooKeeperServer();
-            embeddedZkServer.setTxnLogFactory(transactionLog);
-            embeddedZkServer.setTickTime(config.getTickTime());
-            
embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout());
-            
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
+            transactionLog = new FileTxnSnapLog(new 
File(quorumPeerConfig.getDataLogDir()), new 
File(quorumPeerConfig.getDataDir()));
 
             connectionFactory = ServerCnxnFactory.createFactory();
-            connectionFactory.configure(config.getClientPortAddress(), 
config.getMaxClientCnxns());
-            connectionFactory.startup(embeddedZkServer);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger.warn("Embedded ZooKeeper Server interrupted", e);
+            
connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), 
quorumPeerConfig.getMaxClientCnxns());
+
+            quorumPeer = new QuorumPeer();
+            
quorumPeer.setClientPortAddress(quorumPeerConfig.getClientPortAddress());
+            quorumPeer.setTxnFactory(new FileTxnSnapLog(new 
File(quorumPeerConfig.getDataLogDir()), new 
File(quorumPeerConfig.getDataDir())));
+            quorumPeer.setQuorumPeers(quorumPeerConfig.getServers());
+            quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg());
+            quorumPeer.setMyid(quorumPeerConfig.getServerId());
+            quorumPeer.setTickTime(quorumPeerConfig.getTickTime());
+            
quorumPeer.setMinSessionTimeout(quorumPeerConfig.getMinSessionTimeout());
+            
quorumPeer.setMaxSessionTimeout(quorumPeerConfig.getMaxSessionTimeout());
+            quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
+            quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
+            quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier());
+            quorumPeer.setCnxnFactory(connectionFactory);
+            quorumPeer.setZKDatabase(new 
ZKDatabase(quorumPeer.getTxnFactory()));
+            quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
+            quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
+            
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
+
+            quorumPeer.start();
         } catch (final IOException ioe) {
             throw new IOException("Failed to start embedded ZooKeeper Server", 
ioe);
         } catch (final Exception e) {
@@ -96,8 +104,8 @@ public class ZooKeeperStateServer extends 
ZooKeeperServerMain {
                 connectionFactory.shutdown();
             }
 
-            if (embeddedZkServer != null && embeddedZkServer.isRunning()) {
-                embeddedZkServer.shutdown();
+            if (quorumPeer != null && quorumPeer.isRunning()) {
+                quorumPeer.shutdown();
             }
         }
     }

Reply via email to