Author: [email protected]
Date: Fri Dec  2 17:11:48 2011
New Revision: 1804

Log:


Added:
   sandbox/ivol/amdatu-zookeeper/
   sandbox/ivol/amdatu-zookeeper/org.amdatu.zookeeper.cfg
   sandbox/ivol/amdatu-zookeeper/pom.xml
   sandbox/ivol/amdatu-zookeeper/src/
   sandbox/ivol/amdatu-zookeeper/src/main/
   sandbox/ivol/amdatu-zookeeper/src/main/java/
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/ZooKeeperService.java
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/osgi/
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/osgi/Activator.java
   sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/DataMonitor.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
   
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperServiceImpl.java

Added: sandbox/ivol/amdatu-zookeeper/org.amdatu.zookeeper.cfg
==============================================================================
--- (empty file)
+++ sandbox/ivol/amdatu-zookeeper/org.amdatu.zookeeper.cfg      Fri Dec  2 
17:11:48 2011
@@ -0,0 +1,12 @@
+# The number of milliseconds of each tick
+tickTime=2000
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=5
+# the directory where the snapshot is stored.
+dataDir=export/crawlspace/mahadev/zookeeper/server1/data
+# the port at which the clients will connect
+clientPort=2181

Added: sandbox/ivol/amdatu-zookeeper/pom.xml
==============================================================================
--- (empty file)
+++ sandbox/ivol/amdatu-zookeeper/pom.xml       Fri Dec  2 17:11:48 2011
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright (c) 2010, 2011 The Amdatu Foundation
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.verning permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.amdatu</groupId>
+    <artifactId>amdatu</artifactId>
+    <version>4</version>
+  </parent>
+  <groupId>org.amdatu.zookeeper</groupId>
+  <artifactId>org.amdatu.zookeeper</artifactId>
+  <packaging>bundle</packaging>
+  <version>1.0.0-SNAPSHOT</version>
+  <name>Amdatu ZooKeeper</name>
+
+  <repositories>
+    <repository>
+      <id>amdatu.releases</id>
+      <name>Amdatu Release Repository</name>
+      <url>http://repository.amdatu.org/releases</url>
+      <releases>
+        <enabled>true</enabled>
+      </releases>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+    <repository>
+      <id>amdatu.snapshots</id>
+      <name>Amdatu Snapshot Repository</name>
+      <url>http://repository.amdatu.org/snapshots</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.0</version>
+      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+<build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <configuration>
+          <instructions>
+            
<Bundle-Activator>org.amdatu.zookeeper.osgi.Activator</Bundle-Activator>
+            <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+            <Export-Package>org.amdatu.zookeeper</Export-Package>
+            <Embed-Dependency>*;scope=compile</Embed-Dependency>
+            <Embed-Transitive>true</Embed-Transitive>
+            <Import-Package>
+              !com.google.protobuf,
+              !com.sun.jdmk.comm,
+              !javax.jms,
+              !org.apache.commons.logging,
+              !org.jboss.logging,
+              !sun.security.krb5,
+              *
+            </Import-Package>
+          </instructions>
+        </configuration>
+      </plugin>
+
+      <!-- Plugin configuration for -Pdeploy -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <version>1.6</version>
+        <dependencies>
+          <dependency>
+            <groupId>ant-contrib</groupId>
+            <artifactId>ant-contrib</artifactId>
+            <version>1.0b3</version>
+            <exclusions>
+              <exclusion>
+                <groupId>ant</groupId>
+                <artifactId>ant</artifactId>
+              </exclusion>
+            </exclusions>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.ant</groupId>
+            <artifactId>ant-nodeps</artifactId>
+            <version>1.8.1</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <phase>install</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+
+                <taskdef resource="net/sf/antcontrib/antcontrib.properties" 
classpathref="maven.compile.classpath" />
+                <available 
file="${project.build.directory}/${project.build.finalName}.jar" 
property="fileExists" value="true" />
+
+                <if>
+                  <isset property="fileExists" />
+                  <then>
+                    <if>
+                      <isset property="amdatu.deploy.directory" />
+                      <then>
+                        <copy 
file="${project.build.directory}/${project.build.finalName}.jar" 
tofile="${amdatu.deploy.directory}/${project.artifactId}-${project.version}.jar"
 overwrite="true" />
+                      </then>
+                      <else>
+                        <echo message="Property amdatu.deploy.directory not 
defined, skipping copy to deploy dir task" />
+                      </else>
+                    </if>
+                  </then>
+                </if>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/ZooKeeperService.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/ZooKeeperService.java
      Fri Dec  2 17:11:48 2011
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper;
+
+public interface ZooKeeperService {
+    String PID = "org.amdatu.zookeeper";
+}
\ No newline at end of file

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/osgi/Activator.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/osgi/Activator.java
        Fri Dec  2 17:11:48 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.osgi;
+
+import org.amdatu.zookeeper.ZooKeeperService;
+import org.amdatu.zookeeper.service.ZooKeeperExecutorImpl;
+import org.amdatu.zookeeper.service.ZooKeeperServiceImpl;
+import org.apache.felix.dm.DependencyActivatorBase;
+import org.apache.felix.dm.DependencyManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.log.LogService;
+
+/**
+ * This is the activator for the authentication bundle.
+ * 
+ * @author ivol
+ */
+public class Activator extends DependencyActivatorBase {
+    @Override
+    public void init(final BundleContext context, final DependencyManager 
manager) throws Exception {
+        manager.add(
+            createComponent()
+                .setInterface(new String[] { ZooKeeperService.class.getName(), 
ManagedService.class.getName() }, null)
+                .setImplementation(ZooKeeperServiceImpl.class)
+                
.add(createConfigurationDependency().setPid(ZooKeeperService.PID))
+                
.add(createServiceDependency().setService(LogService.class).setRequired(true)));
+        
+        manager.add(
+            createComponent()
+                .setImplementation(ZooKeeperExecutorImpl.class));
+    }
+
+    @Override
+    public void destroy(final BundleContext context, final DependencyManager 
manager) throws Exception {
+    }
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/DataMonitor.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/DataMonitor.java
   Fri Dec  2 17:11:48 2011
@@ -0,0 +1,127 @@
+package org.amdatu.zookeeper.service;
+
+/**
+ * A simple class that monitors the data and existence of a ZooKeeper
+ * node. It uses asynchronous ZooKeeper APIs.
+ */
+import java.util.Arrays;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+public class DataMonitor implements Watcher, StatCallback {
+
+    ZooKeeper zk;
+
+    String znode;
+
+    Watcher chainedWatcher;
+
+    boolean dead;
+
+    DataMonitorListener listener;
+
+    byte prevData[];
+
+    public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
+            DataMonitorListener listener) {
+        this.zk = zk;
+        this.znode = znode;
+        this.chainedWatcher = chainedWatcher;
+        this.listener = listener;
+        // Get things started by checking if the node exists. We are going
+        // to be completely event driven
+        zk.exists(znode, true, this, null);
+    }
+
+    /**
+     * Other classes use the DataMonitor by implementing this method
+     */
+    public interface DataMonitorListener {
+        /**
+         * The existence status of the node has changed.
+         */
+        void exists(byte data[]);
+
+        /**
+         * The ZooKeeper session is no longer valid.
+         *
+         * @param rc
+         *                the ZooKeeper reason code
+         */
+        void closing(int rc);
+    }
+
+    public void process(WatchedEvent event) {
+        String path = event.getPath();
+        if (event.getType() == Event.EventType.None) {
+            // We are are being told that the state of the
+            // connection has changed
+            switch (event.getState()) {
+            case SyncConnected:
+                // In this particular example we don't need to do anything
+                // here - watches are automatically re-registered with 
+                // server and any watches triggered while the client was 
+                // disconnected will be delivered (in order of course)
+                break;
+            case Expired:
+                // It's all over
+                dead = true;
+                listener.closing(KeeperException.Code.SessionExpired);
+                break;
+            }
+        } else {
+            if (path != null && path.equals(znode)) {
+                // Something has changed on the node, let's find out
+                zk.exists(znode, true, this, null);
+            }
+        }
+        if (chainedWatcher != null) {
+            chainedWatcher.process(event);
+        }
+    }
+
+    public void processResult(int rc, String path, Object ctx, Stat stat) {
+        boolean exists;
+        switch (rc) {
+        case Code.Ok:
+            exists = true;
+            break;
+        case Code.NoNode:
+            exists = false;
+            break;
+        case Code.SessionExpired:
+        case Code.NoAuth:
+            dead = true;
+            listener.closing(rc);
+            return;
+        default:
+            // Retry errors
+            zk.exists(znode, true, this, null);
+            return;
+        }
+
+        byte b[] = null;
+        if (exists) {
+            try {
+                b = zk.getData(znode, false, null);
+            } catch (KeeperException e) {
+                // We don't need to worry about recovering now. The watch
+                // callbacks will kick off any exception handling
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+        if ((b == null && b != prevData)
+                || (b != null && !Arrays.equals(prevData, b))) {
+            listener.exists(b);
+            prevData = b;
+        }
+    }
+}
\ No newline at end of file

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/Executor.java
      Fri Dec  2 17:11:48 2011
@@ -0,0 +1,167 @@
+package org.amdatu.zookeeper.service;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class Executor extends Thread implements Watcher, Runnable, 
DataMonitor.DataMonitorListener 
+{
+    String znode;
+
+    DataMonitor dm;
+
+    ZooKeeper zk;
+
+    String filename;
+
+    String exec[];
+
+    Process child;
+    
+
+    public Executor(String hostPort, String znode, String filename,
+        String exec[]) throws KeeperException, IOException {
+        this.filename = filename;
+        this.exec = exec;
+        zk = new ZooKeeper(hostPort, 3000, this);
+        dm = new DataMonitor(zk, znode, null, this);
+    }
+
+    public ZooKeeper getClient() {
+        return zk;
+    }
+    
+    /**
+     * @param args
+     */
+    public static Executor main(String[] args) {
+        if (args.length < 4) {
+            System.err
+                .println("USAGE: Executor hostPort znode filename program 
[args ...]");
+            System.exit(2);
+        }
+        String hostPort = args[0];
+        String znode = args[1];
+        String filename = args[2];
+        String exec[] = new String[args.length - 3];
+        System.arraycopy(args, 3, exec, 0, exec.length);
+        try {
+            Executor executor = new Executor(hostPort, znode, filename, exec);
+            executor.start();
+            return executor;
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    
/***************************************************************************
+     * We do process any events ourselves, we just need to forward them on.
+     * 
+     * @see 
org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
+     */
+    public void process(WatchedEvent event) {
+        dm.process(event);
+    }
+
+    public void run() {
+        try {
+            synchronized (this) {
+                while (!dm.dead && !m_interrupt) {
+                    wait();
+                }
+            }
+        }
+        catch (InterruptedException e) {}
+    }
+    
+    boolean m_interrupt = false;
+    
+    @Override
+    public void interrupt() {
+        // TODO Auto-generated method stub
+        super.interrupt();
+        m_interrupt = true;
+    }
+    
+   
+    public void closing(int rc) {
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+
+    static class StreamWriter extends Thread {
+        OutputStream os;
+
+        InputStream is;
+
+        StreamWriter(InputStream is, OutputStream os) {
+            this.is = is;
+            this.os = os;
+            start();
+        }
+
+        public void run() {
+            byte b[] = new byte[80];
+            int rc;
+            try {
+                while ((rc = is.read(b)) > 0) {
+                    os.write(b, 0, rc);
+                }
+            }
+            catch (IOException e) {}
+
+        }
+    }
+
+    public void exists(byte[] data) {
+        if (data == null) {
+            if (child != null) {
+                System.out.println("Killing process");
+                child.destroy();
+                try {
+                    child.waitFor();
+                }
+                catch (InterruptedException e) {}
+            }
+            child = null;
+        }
+        else {
+            if (child != null) {
+                System.out.println("Stopping child");
+                child.destroy();
+                try {
+                    child.waitFor();
+                }
+                catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                FileOutputStream fos = new FileOutputStream(filename);
+                fos.write(data);
+                fos.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+            try {
+                System.out.println("Starting child");
+                child = Runtime.getRuntime().exec(exec);
+                new StreamWriter(child.getInputStream(), System.out);
+                new StreamWriter(child.getErrorStream(), System.err);
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperExecutorImpl.java
 Fri Dec  2 17:11:48 2011
@@ -0,0 +1,50 @@
+package org.amdatu.zookeeper.service;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ZooKeeperExecutorImpl{
+ 
+    private Executor executor;
+    
+    public void start() {
+       String[] args = new String[4];
+       args[0] = "localhost:2181";
+       args[1] = "/"; // path to monitor, '/' monitors all
+       args[2] = "work/zookeeper/executor"; // filename to write changes to
+       args[3] = "cmd.exe"; // process to be executed when a child is added. 
This is just an example
+       
+       // TODO: Add a distributed lock service, which we will be using for 
cassandra
+       // in cassandra we will set a lock to /cassandra/keyspace1/schema
+       // see http://zookeeper.apache.org/doc/trunk/recipes.html#Shared+Locks. 
+       executor = Executor.main(args);
+       
+       ZooKeeper client = getClient();
+       try {
+        byte[] result = client.getData("/zk_test", false, new Stat());
+        String g = new String(result);
+        int gss=0;
+    }
+    catch (KeeperException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+    }
+    catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+    }
+    }
+    
+    public ZooKeeper getClient() {
+        return executor.getClient();
+    }
+
+    public void stop() {
+        executor.interrupt();
+    }
+ 
+    
+}
+

Added: 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperServiceImpl.java
==============================================================================
--- (empty file)
+++ 
sandbox/ivol/amdatu-zookeeper/src/main/java/org/amdatu/zookeeper/service/ZooKeeperServiceImpl.java
  Fri Dec  2 17:11:48 2011
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.zookeeper.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import org.amdatu.zookeeper.ZooKeeperService;
+import org.apache.felix.dm.Component;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.log.LogService;
+
+public class ZooKeeperServiceImpl implements ZooKeeperService, ManagedService {
+    private volatile LogService m_logService;
+    private volatile BundleContext bundleContext;
+
+    private MyZooKeeperServerMain m_zooKeeperServer = null;
+    private Thread m_zooKeeperMainThread = null;
+    private Properties m_serviceProperties = null;
+
+    public synchronized void init(final Component component) {
+        if (m_serviceProperties != null) {
+            component.setServiceProperties(m_serviceProperties);
+        }
+    }
+
+    public synchronized void start() throws IOException, ConfigException {
+        startZooKeeper();
+    }
+
+    public synchronized void stop() {
+        stopZooKeeper();
+    }
+
+    @SuppressWarnings("rawtypes")
+    public synchronized void updated(Dictionary properties) throws 
ConfigurationException {
+        if (properties == null) {
+            m_serviceProperties = null;
+        }
+        else {
+            try {
+                m_serviceProperties = new Properties();
+                for (Enumeration e = properties.keys(); e.hasMoreElements();) {
+                    Object key = e.nextElement();
+                    m_serviceProperties.put(key, properties.get(key));
+                }
+                setDefaults(m_serviceProperties);
+
+                if (m_zooKeeperServer != null) {
+                    try {
+                        restartZooKeeper();
+                    }
+                    catch (ConfigException e) {
+                        m_logService.log(LogService.LOG_ERROR,
+                            "Problem restarting ZooKeeper after configuration 
update: " + properties, e);
+                        throw new ConfigurationException("unknown", "unknown", 
e);
+                    }
+                }
+            }
+            catch (IOException e) {
+                m_logService.log(LogService.LOG_ERROR, "Problem applying 
configuration update: " + properties, e);
+                throw new ConfigurationException("unknown", "unknown", e);
+            }
+        }
+    }
+
+    private void startZooKeeper() throws IOException, ConfigException {
+        if (m_serviceProperties.get("clientPort") == null) {
+            m_logService.log(LogService.LOG_INFO, "Cannot start ZooKeeper, 
required property 'clientPort' isn't set.");
+            return;
+        }
+
+        QuorumPeerConfig config = new QuorumPeerConfig();
+        config.parseProperties(m_serviceProperties);
+        final ServerConfig serverConfig = new ServerConfig();
+        serverConfig.readFrom(config);
+
+        m_zooKeeperServer = getZooKeeperMain();
+        m_zooKeeperMainThread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    m_zooKeeperServer.runFromConfig(serverConfig);
+                }
+                catch (IOException e) {
+                    m_logService.log(LogService.LOG_ERROR, "Problem running 
ZooKeeper server.", e);
+                }
+            }
+        });
+        startThread();
+
+        m_logService.log(LogService.LOG_INFO, "Applied configuration update :" 
+ m_serviceProperties);
+    }
+
+    private void stopZooKeeper() {
+        if (m_zooKeeperServer != null) {
+            m_logService.log(LogService.LOG_INFO, "Shutting down ZooKeeper 
server");
+            m_zooKeeperServer.shutdown();
+            try {
+                m_zooKeeperMainThread.join();
+            }
+            catch (InterruptedException e) {
+                // ignore
+            }
+            m_zooKeeperServer = null;
+            m_zooKeeperMainThread = null;
+        }
+    }
+
+    private void restartZooKeeper() throws IOException, ConfigException {
+        stopZooKeeper();
+        startZooKeeper();
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void setDefaults(Dictionary dict) throws IOException {
+        setDefault(dict, "tickTime", "2000");
+        setDefault(dict, "initLimit", "10");
+        setDefault(dict, "syncLimit", "5");
+        setDefault(dict, "dataDir", new File(bundleContext.getDataFile(""), 
"zkdata").getCanonicalPath());
+        setDefault(dict, Constants.SERVICE_PID, 
"org.apache.cxf.dosgi.discovery.zookeeper.server");
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void setDefault(Dictionary dict, String key, String value) {
+        if (dict.get(key) == null) {
+            dict.put(key, value);
+        }
+    }
+
+    // Isolated for testing
+    private void startThread() {
+        m_zooKeeperMainThread.start();
+    }
+
+    // Isolated for testing
+    private MyZooKeeperServerMain getZooKeeperMain() {
+        return new MyZooKeeperServerMain();
+    }
+
+    private static class MyZooKeeperServerMain extends ZooKeeperServerMain {
+        @Override
+        protected void shutdown() {
+            super.shutdown();
+            // Make the shutdown accessible from here.
+        }
+    }
+}
\ No newline at end of file
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to