Author: ivol37 at gmail.com
Date: Wed Jan  5 12:58:02 2011
New Revision: 563

Log:
[AMDATU-242] The cassandra daemon service now published keyspace added/dropped 
events which are picked up by the ColumnFamilyHandler. Enhanced the integration 
test to properly test the ColumnFamilyProvider registration.

Added:
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
   trunk/integration-tests/pom.xml
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
     (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
     Wed Jan  5 12:58:02 2011
@@ -32,6 +32,23 @@
  */
 public interface CassandraDaemonService {
     /**
+     * Represents the topic under which events are send by this service to 
EventAdmin.
+     */
+    String EVENT_ADMIN_TOPIC = "org/amdatu/cassandra/application/service";
+
+    /**
+     * When a keyspace is added, an event thrown where this property name 
contains the name of the keyspace
+     * that has been added.
+     */
+    String EVENT_ADMIN_KEYSPACE_ADDED = "keyspace_added";
+
+    /**
+     * When a keyspace is dropped, an event thrown where this property name 
contains the name of the keyspace
+     * that has been dropped.
+     */
+    String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
+
+    /**
      * Returns the Cassandra server which represents the Cassandra Thrift API.
      * 
      * @see http://wiki.apache.org/cassandra/API

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
     (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
     Wed Jan  5 12:58:02 2011
@@ -24,6 +24,7 @@
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
 import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.service.log.LogService;
 
 
@@ -36,20 +37,21 @@
     public void init(BundleContext context, DependencyManager manager) throws 
Exception {
         // Register the Cassandra configuration service
         manager.add(
-                createComponent()
-                .setImplementation(CassandraConfigurationServiceImpl.class)
-                .setInterface(CassandraConfigurationService.class.getName(), 
null)
-                
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-                
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
-                
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
+            createComponent()
+            .setImplementation(CassandraConfigurationServiceImpl.class)
+            .setInterface(CassandraConfigurationService.class.getName(), null)
+            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+            
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
+            
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
 
         // Register the Cassandra daemon service
-        manager.add( 
-                createComponent()
-                .setImplementation(CassandraDaemonServiceImpl.class)
-                .setInterface(CassandraDaemonService.class.getName(), null)
-                
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-                
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
+        manager.add(
+            createComponent()
+            .setImplementation(CassandraDaemonServiceImpl.class)
+            .setInterface(CassandraDaemonService.class.getName(), null)
+            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+            
.add(createServiceDependency().setService(EventAdmin.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
     }
 
     @Override

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 Wed Jan  5 12:58:02 2011
@@ -17,17 +17,21 @@
 package org.amdatu.cassandra.application.service;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.avro.CassandraDaemon;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.thrift.TException;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.service.log.LogService;
 
 /**
@@ -38,11 +42,12 @@
 public class CassandraDaemonServiceImpl implements CassandraDaemonService {
     // The default placement strategy
     private final String DEFAULT_PLACEMENT_STRATEGY = 
"org.apache.cassandra.locator.SimpleStrategy";
-    
+
     private final int DEFAULT_REPLICATION_FACTOR = 1;
 
     // Service dependencies, injected by the framework
     private volatile LogService m_logService;
+    private volatile EventAdmin m_eventAdmin;
 
     // The CassandraDaemon cannot be stopped/started without stopping and 
updating (to enforce classloader
     // to be destroyed) this bundle. For that reason we block any attempts to 
stop/start this service since
@@ -108,7 +113,7 @@
         }
         return false;
     }
-    
+
     public List<String> getKeyspaces() throws TException, 
InvalidRequestException {
         List<String> keyspaceNames = new ArrayList<String>();
         List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
@@ -122,10 +127,20 @@
         List<CfDef> empty = new ArrayList<CfDef>();
         KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 
DEFAULT_REPLICATION_FACTOR, empty);
         m_cassandraServer.system_add_keyspace(ksDef);
+
+        // Publish an event that a new keyspace has been added
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put(EVENT_ADMIN_KEYSPACE_ADDED, name);
+        m_eventAdmin.sendEvent(new Event(EVENT_ADMIN_TOPIC, properties));
     }
-    
+
     public void dropKeyspace(String keyspace) throws InvalidRequestException, 
TException {
         m_cassandraServer.system_drop_keyspace(keyspace);
+
+        // Publish an event that a keyspace has been dropped
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put(EVENT_ADMIN_KEYSPACE_DROPPED, keyspace);
+        m_eventAdmin.sendEvent(new Event(EVENT_ADMIN_TOPIC, properties));
     }
 
     public boolean columnFamilyExists(String keyspaceName, String 
columnFamilyName) throws NotFoundException, InvalidRequestException {
@@ -148,7 +163,7 @@
         }
         return cfNames;
     }
-    
+
     public void addColumnFamily(String keyspace, String cfName, String 
columnType, String comparatorType,
         String subComparatorType) throws InvalidRequestException, TException {
         if (keyspace.equals(Table.SYSTEM_TABLE)) {

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      Wed Jan  5 12:58:02 2011
@@ -16,7 +16,11 @@
  */
 package org.amdatu.cassandra.listener.service;
 
-import java.util.*;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
@@ -26,12 +30,19 @@
 import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
 import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.thrift.*;
-import org.apache.felix.dm.*;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
 import org.apache.thrift.TException;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.InvalidSyntaxException;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 import org.osgi.service.log.LogService;
 import org.osgi.util.tracker.ServiceTracker;
 
@@ -41,7 +52,7 @@
  * add the respective ColumnFamily to Cassandra, and registers the service as 
soon as the
  * ColumnFamily is actually available.
  */
-public class ColumnFamilyHandler {
+public class ColumnFamilyHandler implements EventHandler {
 
     private volatile LogService m_logService;
     private volatile CassandraDaemonService m_daemonService;
@@ -51,8 +62,14 @@
     private volatile BundleContext m_context;
 
     private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
-            new HashMap<KeySpaceColumnFamilyCombination, Component>();
+        new HashMap<KeySpaceColumnFamilyCombination, Component>();
 
+    @SuppressWarnings("unchecked")
+    public void init() {
+        Dictionary d = new Hashtable();
+        d.put(EventConstants.EVENT_TOPIC, new 
String[]{CassandraDaemonService.EVENT_ADMIN_TOPIC});
+        m_context.registerService( EventHandler.class.getName(), this, d );
+    }
     public void start() {
         try {
             // Register all currently available keyspace/columnfamily 
combinations.
@@ -142,7 +159,7 @@
         if (!m_daemonService.columnFamilyExists(ksName, cfName)) {
             m_daemonService.addColumnFamily(ksName, cfName, columnType, 
comparatorType, subComparatorType);
             waitForColumnFamilyAndRegisterService(ksName, cfName);
-            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' added");
+            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' added to keyspace '" + ksName + "'");
         }
         else {
             // Since Cassandra does not (yet) support updating columnType, 
comparatorType or subComparatorType
@@ -153,7 +170,7 @@
                     + "' has been changed, but changes in columnType, 
comparatorType "
                     + "and subComparatorType are not supported by Cassandra");
             }
-            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' not changed");
+            m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' not changed in keyspace '" + ksName + "'");
         }
     }
 
@@ -191,13 +208,13 @@
 
         if (!found) {
             throw new IllegalStateException("After waiting for " + 10000 + 
"ms, columnfamily " +
-                    columnFamily + " is not yet available.");
+                columnFamily + " is not yet available.");
         }
     }
 
     private CassandraPersistenceManager getPersistenceManager(String keyspace) 
throws InterruptedException {
         String objectClassFilter = "(" + 
org.osgi.framework.Constants.OBJECTCLASS + "="
-                + CassandraPersistenceManager.class.getName() + ")";
+        + CassandraPersistenceManager.class.getName() + ")";
         String keyspaceFilter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
         String persistenceManagerFilter = "(&" + objectClassFilter + 
keyspaceFilter + ")";
 
@@ -225,7 +242,7 @@
 
         if (persistenceManager == null) {
             throw new IllegalStateException("After waiting for " + 5000 + "ms, 
we don't have a "
-                    + "persistencemanager for " + keyspace + " yet.");
+                + "persistencemanager for " + keyspace + " yet.");
         }
         return persistenceManager;
     }
@@ -240,8 +257,8 @@
         serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
 
         Component component = m_dependencyManager.createComponent()
-                .setInterface(ColumnFamilyAvailable.class.getName(), 
serviceProps)
-                .setImplementation(ColumnFamilyAvailableImpl.class);
+        .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
+        .setImplementation(ColumnFamilyAvailableImpl.class);
 
         m_services.put(new KeySpaceColumnFamilyCombination(keyspace, 
columnFamily), component);
         m_dependencyManager.add(component);
@@ -254,4 +271,39 @@
             put("columnFamily", columnFamily);
         }
     }
+
+    private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
+        for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
+            if (cfDef.getKeyspaces() == null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        try {
+            Object keyspaceAdded = 
event.getProperty(CassandraDaemonService.EVENT_ADMIN_KEYSPACE_ADDED);
+            if (keyspaceAdded != null) {
+                // If a keyspace was added, we must add ColumnFamily's for all 
keyspace-global ColumnFamilyProvider's
+                // (this are all ColumnFamilyProvider's that defined keyspace 
null)
+                m_logService.log(LogService.LOG_DEBUG, "Recieved keyspace 
added event for keyspace '" + keyspaceAdded + "' ");
+
+                ServiceReference[] servRefs = 
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
+                if (servRefs != null) {
+                    for (ServiceReference ref : servRefs) {
+                        ColumnFamilyProvider provider = (ColumnFamilyProvider) 
m_context.getService(ref);
+                        if (isKeyspaceGlobal(provider)) {
+                            m_logService.log(LogService.LOG_DEBUG, "Updating 
ColumnFamilyProvider '" + provider.getClass().getName() + "' as it is 
keyspace-global");
+                            columnFamilyProviderAdded(provider);
+                        }
+                    }
+                }
+            }
+        }
+        catch (InvalidSyntaxException e) {
+            m_logService.log(LogService.LOG_ERROR, "Could not handle event '" 
+ event.getTopic() + "' ", e);
+        }
+    }
 }

Modified: trunk/integration-tests/pom.xml
==============================================================================
--- trunk/integration-tests/pom.xml     (original)
+++ trunk/integration-tests/pom.xml     Wed Jan  5 12:58:02 2011
@@ -25,7 +25,7 @@
 
     <!-- This is required to be first so that pax-exam classloader is not 
messed up with a newer version of felix
       which would lead to java.lang.NoSuchMethodError: 
org.apache.felix.framework.Logger.<init>(I)V -->
-      
+
     <dependency>
       <groupId>org.ops4j.pax.exam</groupId>
       <artifactId>pax-exam</artifactId>
@@ -139,7 +139,7 @@
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.web.rest</groupId>
       <artifactId>jaxrs</artifactId>
@@ -188,45 +188,45 @@
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.authentication.oauth</groupId>
-      <artifactId>api</artifactId>    
+      <artifactId>api</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.authentication.oauth</groupId>
-      <artifactId>server</artifactId>    
+      <artifactId>server</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
     </dependency>
     <dependency>
       <groupId>org.amdatu.authentication.oauth</groupId>
-      <artifactId>client</artifactId>    
+      <artifactId>client</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.authentication.oauth</groupId>
-      <artifactId>consumerregistry-fs</artifactId>    
+      <artifactId>consumerregistry-fs</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.authentication</groupId>
-      <artifactId>tokenprovider</artifactId>    
+      <artifactId>tokenprovider</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>     
+    </dependency>
     <dependency>
       <groupId>org.amdatu.authentication.tokenstore</groupId>
-      <artifactId>mem</artifactId>    
+      <artifactId>mem</artifactId>
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
@@ -237,7 +237,7 @@
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.amdatu.semanticweb</groupId>
       <artifactId>rdf2go.api</artifactId>
@@ -251,10 +251,16 @@
       <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
-    </dependency>    
+    </dependency>
 
     <dependency>
       <groupId>org.apache.felix</groupId>
+      <artifactId>org.apache.felix.eventadmin</artifactId>
+      <version>${org.apache.felix.eventadmin.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.felix</groupId>
       <artifactId>org.apache.felix.configadmin</artifactId>
       <version>${org.apache.felix.configadmin.version}</version>
       <scope>test</scope>

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
     (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
     Wed Jan  5 12:58:02 2011
@@ -97,6 +97,7 @@
                     compendium(),
                     dependencyManager(),
                     configAdmin(),
+                    eventAdmin(),
                     felixLog(),
 
                     // Amdatu platform bundles
@@ -413,8 +414,7 @@
     }
 
     protected static MavenArtifactProvisionOption felixHttpServiceWhiteboard() 
{
-        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.http.whiteboard")
-        .versionAsInProject();
+        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.http.whiteboard").versionAsInProject();
     }
 
     protected static MavenArtifactProvisionOption felixLog() {
@@ -422,13 +422,15 @@
     }
 
     protected static MavenArtifactProvisionOption configAdmin() {
-        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin")
-        .versionAsInProject();
+        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin").versionAsInProject();
+    }
+
+    protected static MavenArtifactProvisionOption eventAdmin() {
+        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.eventadmin").versionAsInProject();
     }
 
     protected static MavenArtifactProvisionOption dependencyManager() {
-        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager")
-        .versionAsInProject();
+        return 
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager").versionAsInProject();
     }
 
     protected static MavenArtifactProvisionOption compendium() {

Added: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
==============================================================================
--- (empty file)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
        Wed Jan  5 12:58:02 2011
@@ -0,0 +1,36 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.test.integration.mock;
+
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition.ColumnType;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition.CompareType;
+
+public class ColumnFamilyProviderImpl implements ColumnFamilyProvider {
+    private String[] m_keyspaces;
+    private String m_columnFamily;
+
+    public ColumnFamilyProviderImpl(String[] keyspaces, String columnFamily) {
+        m_keyspaces = keyspaces;
+        m_columnFamily = columnFamily;
+    }
+    public ColumnFamilyDefinition[] getColumnFamilies() {
+        return new ColumnFamilyDefinition[] {new ColumnFamilyDefinition(
+            m_columnFamily, m_keyspaces, ColumnType.SUPER, 
CompareType.BYTESTYPE, CompareType.BYTESTYPE)};
+    }
+}

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
 (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
 Wed Jan  5 12:58:02 2011
@@ -24,15 +24,19 @@
 import junit.framework.Assert;
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
 import org.amdatu.cassandra.listener.ColumnFamilyDefinition.ColumnType;
 import org.amdatu.cassandra.listener.ColumnFamilyDefinition.CompareType;
 import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
 import org.amdatu.test.integration.base.ConfigProvider;
 import org.amdatu.test.integration.base.IntegrationTestBase;
+import org.amdatu.test.integration.mock.ColumnFamilyProviderImpl;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,6 +46,7 @@
 import org.ops4j.pax.exam.junit.JUnit4TestRunner;
 import org.osgi.framework.BundleException;
 import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.log.LogService;
 
 /**
  * This class provides an integration test for the Cassandra Daemon service.
@@ -51,10 +56,15 @@
 @RunWith(JUnit4TestRunner.class)
 public class CassandraDaemonIntegrationTest extends IntegrationTestBase {
     private final static String KEYSPACE = "IntegrationTestKeySpace";
+    private final static String KEYSPACE2 = "IntegrationTestKeySpace2";
+    private final static String KEYSPACE3 = "IntegrationTestKeySpace3";
     private final static String COLUMNFAMILY = "IntegrationTestColumnFamily";
+    private final static String COLUMNFAMILY2 = "IntegrationTestColumnFamily2";
 
     private volatile CassandraDaemonService m_daemonService;
     private volatile ConfigurationAdmin m_configAdmin;
+    private volatile DependencyManager m_dependencyManager;
+    private volatile LogService m_logService;
 
     @Configuration
     public Option[] configure() {
@@ -64,8 +74,8 @@
     protected Option provisionBundles() {
         // Deploy bundles needed by cassandra daemon integration test
         return provision(
-               javaxServlet(), // Required if the httpservice is not deployed
-               amdatuConfigTemplateManager(), // Required for placeholders in 
cassandra.yaml
+            javaxServlet(), // Required if the httpservice is not deployed
+            amdatuConfigTemplateManager(), // Required for placeholders in 
cassandra.yaml
             amdatuCassandraApplication(),
             amdatuCassandraListener(),
             amdatuCassandraPersistenceManager());
@@ -77,11 +87,12 @@
         // during this integration test. If we don't, it is likely that the 
two schema updates conflict
         // with each other (see org.apache.cassandra.db.migration.Migration)
         String filter = "(" + CassandraPersistenceManager.KEYSPACE_AWARE_KEY + 
"="
-            + Table.SYSTEM_TABLE + ")";
+        + Table.SYSTEM_TABLE + ")";
         return new Component[] { manager.createComponent()
             .setImplementation(this)
             
.add(manager.createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
             
.add(manager.createServiceDependency().setService(ConfigurationAdmin.class).setRequired(true))
+            
.add(manager.createServiceDependency().setService(LogService.class).setRequired(true))
             
.add(manager.createServiceDependency().setService(CassandraPersistenceManager.class,
 filter).setRequired(true)) };
     }
 
@@ -100,21 +111,21 @@
             }
         }
     }
-    
-    @Before 
+
+    @Before
     public void initConfig() throws IOException, BundleException {
-       m_configAdmin = getService(ConfigurationAdmin.class);
-       
-       // Add cassandra and templates configs
-       ConfigProvider configProvider = new ConfigProvider();
-       configProvider.addCassandraConfig(m_configAdmin);
-       configProvider.addTemplateConfig(m_configAdmin);
+        m_configAdmin = getService(ConfigurationAdmin.class);
+
+        // Add cassandra and templates configs
+        ConfigProvider configProvider = new ConfigProvider();
+        configProvider.addCassandraConfig(m_configAdmin);
+        configProvider.addTemplateConfig(m_configAdmin);
     }
 
     @Test
     public void testCassandraDaemonService() throws Exception {
-       m_daemonService = getService(CassandraDaemonService.class);
-       
+        m_daemonService = getService(CassandraDaemonService.class);
+
         // -1- Test adding/updating/removing keyspaces
         int beforeCount = m_daemonService.getKeyspaces().size();
         m_daemonService.addKeyspace(KEYSPACE);
@@ -163,6 +174,9 @@
         m_daemonService.addColumnFamily(KEYSPACE, COLUMNFAMILY.toUpperCase(), 
ColumnType.STANDARD.value,
             CompareType.BYTESTYPE.value, null);
 
+        // Rest registration of column family's and keyspaces using 
ColumnFamilyProvider's
+        testColumnFamilyProvider();
+
         // Sleep a second, this increases the probability that write requests 
have been handled
         Thread.sleep(1000);
 
@@ -170,8 +184,63 @@
         Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY));
         
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY.toLowerCase()));
         
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY.toUpperCase()));
-        Assert.assertTrue("Expected available ColumnFamily's = 3, but actual = 
"
-            + allColumnFamilies.size(), allColumnFamilies.size() == 3);
+        Assert.assertTrue("Expected available ColumnFamily's = 4, but actual = 
"
+            + allColumnFamilies.size(), allColumnFamilies.size() == 4);
+
+    }
 
+    // This tests the proper working of ColumnFamily providers. If I register 
a new columnfamily provider,
+    // a CF should be added for it to the proper cassandra keyspace(s).
+    private void testColumnFamilyProvider() throws TException, 
InvalidRequestException, NotFoundException, InterruptedException {
+        // -1- Register a ColumnFamilyProvider for one specific (new) keyspace.
+        m_dependencyManager.add(
+            m_dependencyManager.createComponent()
+            .setImplementation(new ColumnFamilyProviderImpl(new 
String[]{KEYSPACE2}, COLUMNFAMILY))
+            .setInterface(new String[] {ColumnFamilyProvider.class.getName()}, 
null));
+        m_logService.log(LogService.LOG_DEBUG, "TestColumnFamilyProvider 
service registered");
+
+        // Give the cassandra listener 3 seconds time to pick up the column 
family provider and add a keyspace/CF for it
+        Thread.sleep(3000);
+
+        // -2- Check if a new KEYSPACE2 is available with column family 
COLUMNFAMILY
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE2 + "', but the keyspace is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getKeyspaces().contains(KEYSPACE2));
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE2 + "', but the CF '" + COLUMNFAMILY + "' is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getColumnFamilies(KEYSPACE2).contains(COLUMNFAMILY));
+
+        // -3- Now register another ColumnFamilyProvider with keyspace null.
+        m_dependencyManager.add(
+            m_dependencyManager.createComponent()
+            .setImplementation(new ColumnFamilyProviderImpl(null, 
COLUMNFAMILY2))
+            .setInterface(new String[] {ColumnFamilyProvider.class.getName()}, 
null));
+        m_logService.log(LogService.LOG_DEBUG, "TestColumnFamilyProvider2 
service registered");
+
+        // Give the cassandra listener 3 seconds time to pick up the column 
family provider and add a keyspace/CF for it
+        Thread.sleep(3000);
+
+        // -4- Verify that the CF comes available in all keyspaces
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE + "', but the keyspace is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getKeyspaces().contains(KEYSPACE));
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE2 + "', but the keyspace is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getKeyspaces().contains(KEYSPACE2));
+
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE+ "', but the CF '" + COLUMNFAMILY2 + "' is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getColumnFamilies(KEYSPACE).contains(COLUMNFAMILY2));
+        Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace 
'" + KEYSPACE2 + "', but the CF '" + COLUMNFAMILY2 + "' is " +
+            "unavailable in the cassandra daemon service", 
m_daemonService.getColumnFamilies(KEYSPACE2).contains(COLUMNFAMILY2));
+
+        // -5- Add another keyspace
+        m_daemonService.addKeyspace(KEYSPACE3);
+
+        // Give the cassandra listener 3 seconds time to pick up the added 
keyspace and a CF for it
+        Thread.sleep(3000);
+
+        // -6- Verify that our ColumnFamily is available in this new keyspace
+        Assert.assertTrue("Keyspace '" + KEYSPACE3 + "' added, but 
keyspace-global ColumnFamily '" + COLUMNFAMILY2 + "' does not come " +
+            "available in this keyspace", 
m_daemonService.getColumnFamilies(KEYSPACE3).contains(COLUMNFAMILY2));
+
+        // -7- Cleanup: remove KEYSPACE2, KEYSPACE3
+        m_daemonService.dropKeyspace(KEYSPACE2);
+        m_daemonService.dropKeyspace(KEYSPACE3);
     }
 }

Reply via email to