Author: angelo.vandersijpt at luminis.eu
Date: Wed Dec 15 18:57:28 2010
New Revision: 503

Log:
AMDATU-232 Removed the CassandraDaemonListener. All columnfamily-business now 
is handled by the ColumnFamilyHandler; this should make our Cassandra-setup a 
bit more robust.

Added:
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Removed:
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraDaemonServiceListener.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyProviderListener.java
Modified:
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
   Wed Dec 15 18:57:28 2010
@@ -18,8 +18,7 @@
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.amdatu.cassandra.listener.ColumnFamilyProvider;
-import org.amdatu.cassandra.listener.service.CassandraDaemonServiceListener;
-import org.amdatu.cassandra.listener.service.ColumnFamilyProviderListener;
+import org.amdatu.cassandra.listener.service.ColumnFamilyHandler;
 import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
@@ -34,24 +33,17 @@
 public class Activator extends DependencyActivatorBase {
     @Override
     public void init(BundleContext context, DependencyManager manager) throws 
Exception {
-        // Register the cassandra daemon service listener
-        manager.add(
-            createComponent()
-            .setImplementation(CassandraDaemonServiceListener.class)
-            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-            
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
-            
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
-
         // Register the CassandraColumnFamilyProvider listener
         manager
             .add(
             createComponent()
-                .setImplementation(ColumnFamilyProviderListener.class)
+                .setImplementation(ColumnFamilyHandler.class)
                 
.add(createServiceDependency().setService(LogService.class).setRequired(true))
                 
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
                 
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
-                .add(
-                
createServiceDependency().setService(ColumnFamilyProvider.class).setCallbacks("onAdded",
 "onRemoved")));
+                .add(createServiceDependency()
+                        .setService(ColumnFamilyProvider.class)
+                        .setCallbacks("columnFamilyProviderAdded", 
"columnFamilyProviderRemoved")));
     }
 
     @Override

Added: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      Wed Dec 15 18:57:28 2010
@@ -0,0 +1,256 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.listener.service;
+
+import java.util.*;
+
+import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.persistencemanager.CassandraException;
+import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
+import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.thrift.*;
+import org.apache.felix.dm.*;
+import org.apache.thrift.TException;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogService;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * This class makes sure that all ColumnFamilies that are available in 
Cassandra get a
+ * corresponding ColumnFamilyAvailable service. Also, it listens for 
ColumnFamilyDefinitions,
+ * add the respective ColumnFamily to Cassandra, and registers the service as 
soon as the
+ * ColumnFamily is actually available.
+ */
+public class ColumnFamilyHandler {
+
+    private volatile LogService m_logService;
+    private volatile CassandraDaemonService m_daemonService;
+    private volatile CassandraPersistenceManagerFactory m_pmFactory;
+
+    private volatile DependencyManager m_dependencyManager;
+    private volatile BundleContext m_context;
+
+    private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
+            new HashMap<KeySpaceColumnFamilyCombination, Component>();
+
+    public void start() {
+        try {
+            // Register all currently available keyspace/columnfamily 
combinations.
+            List<KsDef> keyspaces = 
m_daemonService.getCassandraServer().describe_keyspaces();
+            for (KsDef keyspace : keyspaces) {
+                String keyspaceName = keyspace.getName();
+                m_pmFactory.createCassandraPersistenceManager(keyspaceName);
+
+                List<CfDef> columnFamilies = keyspace.getCf_defs();
+                for (CfDef columnFamily : columnFamilies) {
+                    addServiceFor(keyspaceName, columnFamily.getName());
+                }
+            }
+        }
+        catch (TException e) {
+            m_logService.log(LogService.LOG_ERROR, "Error registering 
ColumnFamilyAvailable services for existing families.", e);
+        }
+        catch (InvalidRequestException e) {
+            m_logService.log(LogService.LOG_ERROR, "Error registering 
ColumnFamilyAvailable services for existing families.", e);
+        }
+    }
+
+    private void stop() {
+        // Since we registered all services, we should also remove them.
+        for (Component component : m_services.values()) {
+            m_dependencyManager.remove(component);
+        }
+    }
+
+    public void columnFamilyProviderAdded(ColumnFamilyProvider provider) {
+        try {
+            List<KsDef> keyspaceDefinitions = 
m_daemonService.getCassandraServer().describe_keyspaces();
+            ColumnFamilyDefinition[] columnFamilies = 
provider.getColumnFamilies();
+            for (ColumnFamilyDefinition columnFamily : columnFamilies) {
+
+                // Add the columnfamilies to the requested keyspaces, or to 
_all_ available keyspaces.
+                String[] keyspaces = columnFamily.getKeyspaces();
+                if (keyspaces != null) {
+                    for (String keyspace : keyspaces) {
+                        // Never add ColumnFamily's to Cassandra's system 
keyspace, this is a reserved keyspace
+                        if (!Table.SYSTEM_TABLE.equals(keyspace)) {
+                            // Create if it does not yet exist
+                            if (!m_daemonService.keyspaceExists(keyspace)) {
+                                m_daemonService.addKeyspace(keyspace);
+                                
m_pmFactory.createCassandraPersistenceManager(keyspace);
+
+                                m_logService.log(LogService.LOG_INFO, 
"Keyspace '" + keyspace + "' added");
+                            }
+                            addOrUpdateColumnFamily(keyspace, columnFamily);
+                        }
+                    }
+                }
+                else {
+                    for (KsDef keyspaceDef : keyspaceDefinitions) {
+                        if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName())) 
{
+                            addOrUpdateColumnFamily(keyspaceDef.getName(), 
columnFamily);
+                        }
+                    }
+                }
+            }
+        }
+        catch (TException e) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to register 
keyspaces and/or ColumnFamily's for provider '"
+                + provider.toString() + "'", e);
+        }
+        catch (InvalidRequestException e) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to register 
keyspaces and/or ColumnFamily's for provider '"
+                + provider.toString() + "'", e);
+        }
+        catch (NotFoundException e) {
+            m_logService.log(LogService.LOG_ERROR, "Failed to register 
keyspaces and/or ColumnFamily's for provider '"
+                + provider.toString() + "'", e);
+        }
+    }
+
+    public void columnFamilyProviderRemoved(ColumnFamilyProvider provider) {
+        // We don't act on this yet.
+    }
+
+    private void addOrUpdateColumnFamily(final String ksName, 
ColumnFamilyDefinition colDef) throws InvalidRequestException,
+    TException, NotFoundException {
+        final String cfName = colDef.getName();
+        String columnType = colDef.getColumnType().value;
+        String comparatorType = colDef.getCompareWith().value;
+        String subComparatorType = colDef.getCompareSubcolumnsWith().value;
+
+        if (!m_daemonService.columnFamilyExists(ksName, cfName)) {
+            m_daemonService.addColumnFamily(ksName, cfName, columnType, 
comparatorType, subComparatorType);
+            waitForColumnFamilyAndRegisterService(ksName, cfName);
+            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' added");
+        }
+        else {
+            // Since Cassandra does not (yet) support updating columnType, 
comparatorType or subComparatorType
+            // of existing ColumnFamily's, we throw an exception if one of 
these has been changed by the provider.
+            // If there are no changes, we do nothing
+            if (m_daemonService.isColumnFamilyChanged(ksName, cfName, 
columnType, comparatorType, subComparatorType)) {
+                throw new InvalidRequestException("Definition of ColumnFamily 
'" + cfName
+                    + "' has been changed, but changes in columnType, 
comparatorType "
+                    + "and subComparatorType are not supported by Cassandra");
+            }
+            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' not changed");
+        }
+    }
+
+    private void waitForColumnFamilyAndRegisterService(final String keyspace, 
final String columnFamily) {
+        new Thread("waiting for keyspace " + keyspace + ", columnfamily " + 
columnFamily) {
+            public void run() {
+                waitFor(keyspace, columnFamily);
+                addServiceFor(keyspace, columnFamily);
+            }
+
+        }.start();
+    }
+
+    private void waitFor(String keyspace, String columnFamily) {
+        CassandraPersistenceManager persistenceManager = 
getPersistenceManager(keyspace);
+
+        long startTime = System.currentTimeMillis();
+        boolean found = false;
+        while (System.currentTimeMillis() - startTime < 10000 && found == 
false) {
+            try {
+                found |= persistenceManager.exists(columnFamily);
+            }
+            catch (CassandraException e) {
+                // apparently our columnFamily isn't available yet... try 
again next round.
+            }
+
+            try {
+                Thread.sleep(500);
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        if (!found) {
+            throw new IllegalStateException("After waiting for " + 10000 + 
"ms, columnfamily " +
+                    columnFamily + " is not yet available.");
+        }
+    }
+
+    private CassandraPersistenceManager getPersistenceManager(String keyspace) 
{
+        String objectClassFilter = "(" + 
org.osgi.framework.Constants.OBJECTCLASS + "="
+                + CassandraPersistenceManager.class.getName() + ")";
+        String keyspaceFilter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+        String persistenceManagerFilter = "(&" + objectClassFilter + 
keyspaceFilter + ")";
+
+        ServiceTracker tracker;
+        try {
+            tracker = new ServiceTracker(m_context, 
m_context.createFilter(persistenceManagerFilter), null);
+        }
+        catch (InvalidSyntaxException e) {
+            /*
+             * This should not happen, since we construct the filter 
ourselves. If it does, the keyspace or
+             * columnFamily is invalid.
+             */
+            m_logService.log(LogService.LOG_ERROR, "Could not create filter: " 
+ persistenceManagerFilter, e);
+            return null; // let it NPE one level up.
+        }
+        tracker.open();
+
+        CassandraPersistenceManager persistenceManager = null;
+        try {
+            persistenceManager = (CassandraPersistenceManager) 
tracker.waitForService(5000);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        if (persistenceManager == null) {
+            throw new IllegalStateException("After waiting for " + 5000 + "ms, 
we don't have a "
+                    + "persistencemanager for " + keyspace + " yet.");
+        }
+        return persistenceManager;
+    }
+
+    private void addServiceFor(String keyspace, String columnFamily) {
+        if (m_services.containsKey(new 
KeySpaceColumnFamilyCombination(keyspace, columnFamily))) {
+            return;
+        }
+
+        Dictionary<String, String> serviceProps = new Hashtable<String, 
String>();
+        serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY, 
keyspace);
+        serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
+
+        Component component = m_dependencyManager.createComponent()
+                .setInterface(ColumnFamilyAvailable.class.getName(), 
serviceProps)
+                .setImplementation(ColumnFamilyAvailableImpl.class);
+
+        m_services.put(new KeySpaceColumnFamilyCombination(keyspace, 
columnFamily), component);
+        m_dependencyManager.add(component);
+        m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + columnFamily 
+ "' is now available");
+    }
+
+    private static class KeySpaceColumnFamilyCombination extends 
HashMap<String, Object> {
+        public KeySpaceColumnFamilyCombination(String keyspace, String 
columnFamily) {
+            put("keyspace", keyspace);
+            put("columnFamily", columnFamily);
+        }
+    }
+}

Reply via email to