Author: ivol37 at gmail.com
Date: Wed Jan 19 10:45:51 2011
New Revision: 677

Log:
[AMDATU-254] Refactored the dependency of the daemon service with he 
availability of the internal cassandra daemon, such that starting cassandra can 
be interrupted when needed (for example when bootstrapping fails it never 
reaches the 'Normal' operation mode)

Added:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
     (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
     Wed Jan 19 10:45:51 2011
@@ -19,6 +19,8 @@
 import org.amdatu.cassandra.application.CassandraConfigurationService;
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import 
org.amdatu.cassandra.application.service.CassandraConfigurationServiceImpl;
+import org.amdatu.cassandra.application.service.CassandraDaemonActivatorImpl;
+import org.amdatu.cassandra.application.service.CassandraDaemonAvailable;
 import org.amdatu.cassandra.application.service.CassandraDaemonServiceImpl;
 import org.amdatu.core.config.templates.ConfigTemplateManager;
 import org.apache.felix.dm.DependencyActivatorBase;
@@ -44,11 +46,19 @@
             
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
             
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
 
+        // Register the Cassandra daemon activator
+        manager.add(
+            createComponent()
+            .setImplementation(CassandraDaemonActivatorImpl.class)
+            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
+
         // Register the Cassandra daemon service
         manager.add(
             createComponent()
             .setImplementation(CassandraDaemonServiceImpl.class)
             .setInterface(CassandraDaemonService.class.getName(), null)
+            
.add(createServiceDependency().setService(CassandraDaemonAvailable.class).setRequired(true))
             
.add(createServiceDependency().setService(LogService.class).setRequired(true))
             
.add(createServiceDependency().setService(EventAdmin.class).setRequired(true))
             
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));

Added: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
       Wed Jan 19 10:45:51 2011
@@ -0,0 +1,179 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.application.service;
+
+import org.amdatu.cassandra.application.CassandraConfigurationService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.osgi.service.log.LogService;
+
+public class CassandraDaemonActivatorImpl implements CassandraDaemonAvailable {
+    // Timeout for the initial Thrift connection
+    private static final int THRIFT_RETRIES = 3;
+    private static final int THRIFT_TIMEOUT = 3000;
+    private static final int DAEMON_TIMEOUT = 5000;
+
+    // Service dependencies, injected by the framework
+    private volatile LogService m_logService;
+    private volatile CassandraConfigurationService m_configuration = null;
+    private volatile DependencyManager m_dependencyManager;
+
+    // The Cassandra daemon
+    private CassandraDaemon m_daemon = null;
+
+    // The CassandraDaemon cannot be stopped/started without stopping and 
updating (to enforce classloader
+    // to be destroyed) this bundle. For that reason we block any attempts to 
stop/start this service since
+    // that will fail.
+    private static boolean m_daemonHasShutdown = false;
+
+    /**
+     * The init() method is invoked by the Felix dependency manager.
+     */
+    public void init() {
+        if (m_daemonHasShutdown) {
+            throw new RuntimeException("CassandraDaemon has already been 
shutdown and cannot be restarted.");
+        }
+
+        m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
initialized");
+
+        try {
+            // Setup the cassandra daemon
+            m_daemon = new CassandraDaemon();
+            m_logService.log(LogService.LOG_INFO, getClass().getName() + " 
service started.");
+        }
+        catch (Throwable t) {
+            m_logService.log(LogService.LOG_ERROR, "An error occurred while 
starting Cassandra service", t);
+        }
+    }
+
+    public void start() {
+        if (m_daemonHasShutdown) {
+            throw new RuntimeException("CassandraDaemon has already been 
shutdown and cannot be restarted.");
+        }
+
+        m_logService.log(LogService.LOG_INFO, "Starting Cassandra Daemon with 
configuration: ");
+        m_logService.log(LogService.LOG_INFO, "  Auto bootstrap mode = " + 
m_configuration.isAutoBootstrapMode());
+        m_logService.log(LogService.LOG_INFO, "  Default replication factor = 
" + m_configuration.getDefaultReplicationFactor());
+        m_logService.log(LogService.LOG_INFO, "  Read consistency level= " + 
m_configuration.getReadConsistencyLevel());
+        m_logService.log(LogService.LOG_INFO, "  Write consistency level = " + 
m_configuration.getWriteConsistencyLevel());
+        m_logService.log(LogService.LOG_INFO, "  RPC address = " + 
m_configuration.getRPCAddress());
+        m_logService.log(LogService.LOG_INFO, "  RPC Port = " + 
m_configuration.getRPCPort());
+
+        // Activate the daemon from a separate thread, as the activate() 
method never returns
+        new CassandraDaemonActivateThread().start();
+
+        // Start a new daemon listener thread. It will register a 
CassandraDaemonAvailableService
+        // as soon as the daemon reached operation mode 'Normal' and a Thrift 
connection could be
+        // established
+        new CassandraDaemonListenerThread().start();
+    }
+
+    public void stop() {
+        m_logService.log(LogService.LOG_INFO, "Shutting down Cassandra 
Daemon");
+        m_daemon.deactivate();
+        m_daemonHasShutdown = true;
+        m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
+    }
+
+    public void destroy() {
+        m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
destroyed");
+    }
+
+    class CassandraDaemonActivateThread extends Thread {
+        public void run() {
+            m_daemon.activate();
+        }
+    }
+
+    class CassandraDaemonListenerThread extends Thread {
+        public void run() {
+            try {
+                // First wait until we can establish a Thrift connection to 
the daemon, the connection is
+                // established as soon as the daemon is running.
+                testThriftConnection();
+
+                // Now wait until the operation mode of the daemon becomes 
"Normal". In auto bootstrap mode this can take quite a
+                // while (2 minutes minimum). In a single node cluster this 
will be almost immediately. Unfortunately the operation
+                // mode is not covered by any enum value.
+                String prevOperationMode = "";
+                String operationMode = 
StorageService.instance.getOperationMode();
+                while (!operationMode.equals("Normal") && !isInterrupted() && 
isAlive()) {
+                    if (!operationMode.equals(prevOperationMode)) {
+                        m_logService.log(LogService.LOG_INFO, "Current 
Cassandra Daemon operation mode is '" + operationMode
+                            + "', waiting for daemon to reach operation mode 
'Normal'");
+                    }
+                    prevOperationMode = operationMode;
+                    operationMode = StorageService.instance.getOperationMode();
+                    Thread.sleep(DAEMON_TIMEOUT);
+                }
+
+                if 
("Normal".equals(StorageService.instance.getOperationMode())) {
+                    m_logService.log(LogService.LOG_INFO, "Operation mode is 
now 'Normal', continuing starting Cassandra");
+
+                    // Register a new CassandraDaemonAvailable service
+                    CassandraDaemonAvailable service = new 
CassandraDaemonAvailable() {};
+                    m_dependencyManager.add(
+                        m_dependencyManager.createComponent()
+                        
.setInterface(CassandraDaemonAvailable.class.getName(), null)
+                        .setImplementation(service));
+                }
+            }
+            catch (TTransportException e) {
+                m_logService.log(LogService.LOG_ERROR, "Could not establish 
Thrift connection to Cassandra daemon, daemon could not be started.");
+            }
+            catch (InterruptedException e) {
+                m_logService.log(LogService.LOG_ERROR, "Starting Cassandra 
daemon interrupted.");
+            }
+        }
+
+        /**
+         * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
+         * has been established. This is usefull to ensure that the daemon is 
running before continuing.
+         * @throws TTransportException
+         */
+        private void testThriftConnection() throws TTransportException {
+            int retry = 0;
+            try {
+                String thrift = m_configuration.getRPCAddress() + ":" + 
m_configuration.getRPCPort();
+                m_logService.log(LogService.LOG_INFO, "Establishing Thrift 
connection to the Cassandra Daemon on " + thrift);
+                TTransport tr = new TSocket(m_configuration.getRPCAddress(), 
m_configuration.getRPCPort(), THRIFT_TIMEOUT);
+                TProtocol proto = new TBinaryProtocol(tr);
+                new Cassandra.Client(proto);
+                tr.open();
+            } catch (TTransportException e) {
+                retry++;
+                if (retry >= THRIFT_RETRIES) {
+                    throw e;
+                }
+                try {
+                    Thread.sleep(THRIFT_TIMEOUT);
+                }
+                catch (InterruptedException e1) {
+                }
+                m_logService.log(LogService.LOG_INFO, "Thrift connection 
cannot yet be established, retrying... (" + retry + ")");
+            }
+            m_logService.log(LogService.LOG_INFO, "Thrift connection 
established successfully");
+        }
+    }
+}
\ No newline at end of file

Added: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
   Wed Jan 19 10:45:51 2011
@@ -0,0 +1,26 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.application.service;
+
+/**
+ * The sole purpose of this interface is to allow the CassandraDaemonService 
to define a service
+ * dependency on the availability of the internal Cassandra daemon.
+ * 
+ * @author ivol
+ */
+public interface CassandraDaemonAvailable {
+}

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 Wed Jan 19 10:45:51 2011
@@ -25,19 +25,12 @@
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CassandraDaemon;
 import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.log.LogService;
@@ -48,11 +41,6 @@
  * @author ivol
  */
 public class CassandraDaemonServiceImpl implements CassandraDaemonService {
-    // Timeout for the initial Thrift connection
-    private final int THRIFT_RETRIES = 3;
-    private final int THRIFT_TIMEOUT = 3000;
-    private final int DAEMON_TIMEOUT = 5000;
-
     // The default placement strategy
     private final String DEFAULT_PLACEMENT_STRATEGY = 
"org.apache.cassandra.locator.SimpleStrategy";
 
@@ -61,85 +49,21 @@
     private volatile EventAdmin m_eventAdmin;
     private volatile CassandraConfigurationService m_configuration = null;
 
-    // The CassandraDaemon cannot be stopped/started without stopping and 
updating (to enforce classloader
-    // to be destroyed) this bundle. For that reason we block any attempts to 
stop/start this service since
-    // that will fail.
-    private static boolean m_daemonHasShutdown = false;
-
-    private CassandraDaemon m_daemon = null;
     private CassandraServer m_cassandraServer = null;
 
     /**
      * The init() method is invoked by the Felix dependency manager.
      */
     public void init() {
-        if (m_daemonHasShutdown) {
-            throw new RuntimeException("CassandraDaemon has already been 
shutdown and cannot be restarted.");
-        }
-
         m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
initialized");
-
-        try {
-            // Setup the cassandra daemon
-            m_daemon = new CassandraDaemon();
-            m_logService.log(LogService.LOG_INFO, getClass().getName() + " 
service started.");
-        }
-        catch (Throwable t) {
-            m_logService.log(LogService.LOG_ERROR, "An error occurred while 
starting Cassandra service", t);
-        }
     }
 
     public void start() {
-        if (m_daemonHasShutdown) {
-            throw new RuntimeException("CassandraDaemon has already been 
shutdown and cannot be restarted.");
-        }
-
-        m_logService.log(LogService.LOG_INFO, "Starting Cassandra Daemon with 
configuration: ");
-        m_logService.log(LogService.LOG_INFO, "  Auto bootstrap mode = " + 
m_configuration.isAutoBootstrapMode());
-        m_logService.log(LogService.LOG_INFO, "  Default replication factor = 
" + m_configuration.getDefaultReplicationFactor());
-        m_logService.log(LogService.LOG_INFO, "  Read consistency level= " + 
m_configuration.getReadConsistencyLevel());
-        m_logService.log(LogService.LOG_INFO, "  Write consistency level = " + 
m_configuration.getWriteConsistencyLevel());
-        m_logService.log(LogService.LOG_INFO, "  RPC address = " + 
m_configuration.getRPCAddress());
-        m_logService.log(LogService.LOG_INFO, "  RPC Port = " + 
m_configuration.getRPCPort());
-
-        // Activate the daemon from a separate thread, as the activate() 
method never returns
-        new CassandraDaemonActivateThread().start();
-        try {
-            // Now establish a Thrift connection to the daemon, the connection 
is established as soon
-            // as the daemon is running.
-            testThriftConnection();
-
-            // Now wait until the operation mode of Cassandra becomes 
"Normal". In auto bootstrap mode this can take quite a
-            // while (2 minutes minimum). In a single node cluster this will 
be amost immediately. Unfortunately the operation
-            // mode is not covered by any enum value.
-            String prevOperationMode = "";
-            String operationMode = StorageService.instance.getOperationMode();
-            while (!operationMode.equals("Normal")) {
-                if (!operationMode.equals(prevOperationMode)) {
-                    m_logService.log(LogService.LOG_INFO, "Current Cassandra 
Daemon operation mode is '" + operationMode
-                        + "', waiting for daemon to reach operation mode 
'Normal'");
-                }
-                Thread.sleep(DAEMON_TIMEOUT);
-            }
-            m_logService.log(LogService.LOG_INFO, "Operation mode is now '" + 
StorageService.instance.getOperationMode()
-                + "', continuing starting Cassandra");
-
-            // Create the cassandra server
-            m_cassandraServer = new CassandraServer();
-        }
-        catch (TTransportException e) {
-            m_logService.log(LogService.LOG_INFO, "Could not establish a 
Thrift connection to the Cassandra Daemon", e);
-        }
-        catch (InterruptedException e) {
-            m_logService.log(LogService.LOG_INFO, "Waiting for Cassandra 
Daemon to reach normal operation mode interrupted", e);
-        }
+        m_cassandraServer = new CassandraServer();
     }
 
     public void stop() {
-        m_logService.log(LogService.LOG_INFO, "Shutting down Cassandra 
Daemon");
-        m_daemon.deactivate();
-        m_daemonHasShutdown = true;
-        m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
+        m_logService.log(LogService.LOG_INFO, getClass().getName() +  "service 
stopped");
     }
 
     public void destroy() {
@@ -304,40 +228,4 @@
         }
         return false;
     }
-
-    class CassandraDaemonActivateThread extends Thread {
-        public void run() {
-            m_daemon.activate();
-        }
-    }
-
-    /**
-     * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
-     * has been established. This is usefull to ensure that the daemon is 
running before continuing.
-     * @throws TTransportException
-     */
-    private void testThriftConnection() throws TTransportException {
-        int retry = 0;
-        try {
-            String thrift = m_configuration.getRPCAddress() + ":" + 
m_configuration.getRPCPort();
-            m_logService.log(LogService.LOG_INFO, "Establishing Thrift 
connection to the Cassandra Daemon on " + thrift);
-            TTransport tr = new TSocket(m_configuration.getRPCAddress(), 
m_configuration.getRPCPort(), THRIFT_TIMEOUT);
-            TProtocol proto = new TBinaryProtocol(tr);
-            new Cassandra.Client(proto);
-            tr.open();
-        } catch (TTransportException e) {
-            retry++;
-            if (retry >= THRIFT_RETRIES) {
-                throw e;
-            }
-            try {
-                Thread.sleep(THRIFT_TIMEOUT);
-            }
-            catch (InterruptedException e1) {
-            }
-            m_logService.log(LogService.LOG_INFO, "Thrift connection cannot 
yet be established, retrying... (" + retry + ")");
-        }
-        m_logService.log(LogService.LOG_INFO, "Thrift connection established 
successfully");
-    }
-
 }

Reply via email to