Author: ivol37 at gmail.com
Date: Wed Jan 5 12:58:02 2011
New Revision: 563
Log:
[AMDATU-242] The cassandra daemon service now published keyspace added/dropped
events which are picked up by the ColumnFamilyHandler. Enhanced the integration
test to properly test the ColumnFamilyProvider registration.
Added:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
trunk/integration-tests/pom.xml
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
Wed Jan 5 12:58:02 2011
@@ -32,6 +32,23 @@
*/
public interface CassandraDaemonService {
/**
+ * Represents the topic under which events are send by this service to
EventAdmin.
+ */
+ String EVENT_ADMIN_TOPIC = "org/amdatu/cassandra/application/service";
+
+ /**
+ * When a keyspace is added, an event thrown where this property name
contains the name of the keyspace
+ * that has been added.
+ */
+ String EVENT_ADMIN_KEYSPACE_ADDED = "keyspace_added";
+
+ /**
+ * When a keyspace is dropped, an event thrown where this property name
contains the name of the keyspace
+ * that has been dropped.
+ */
+ String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
+
+ /**
* Returns the Cassandra server which represents the Cassandra Thrift API.
*
* @see http://wiki.apache.org/cassandra/API
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
Wed Jan 5 12:58:02 2011
@@ -24,6 +24,7 @@
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
import org.osgi.service.log.LogService;
@@ -36,20 +37,21 @@
public void init(BundleContext context, DependencyManager manager) throws
Exception {
// Register the Cassandra configuration service
manager.add(
- createComponent()
- .setImplementation(CassandraConfigurationServiceImpl.class)
- .setInterface(CassandraConfigurationService.class.getName(),
null)
-
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
-
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
+ createComponent()
+ .setImplementation(CassandraConfigurationServiceImpl.class)
+ .setInterface(CassandraConfigurationService.class.getName(), null)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
+
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
// Register the Cassandra daemon service
- manager.add(
- createComponent()
- .setImplementation(CassandraDaemonServiceImpl.class)
- .setInterface(CassandraDaemonService.class.getName(), null)
-
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
+ manager.add(
+ createComponent()
+ .setImplementation(CassandraDaemonServiceImpl.class)
+ .setInterface(CassandraDaemonService.class.getName(), null)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(EventAdmin.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
}
@Override
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Wed Jan 5 12:58:02 2011
@@ -17,17 +17,21 @@
package org.amdatu.cassandra.application.service;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.avro.CassandraDaemon;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
import org.osgi.service.log.LogService;
/**
@@ -38,11 +42,12 @@
public class CassandraDaemonServiceImpl implements CassandraDaemonService {
// The default placement strategy
private final String DEFAULT_PLACEMENT_STRATEGY =
"org.apache.cassandra.locator.SimpleStrategy";
-
+
private final int DEFAULT_REPLICATION_FACTOR = 1;
// Service dependencies, injected by the framework
private volatile LogService m_logService;
+ private volatile EventAdmin m_eventAdmin;
// The CassandraDaemon cannot be stopped/started without stopping and
updating (to enforce classloader
// to be destroyed) this bundle. For that reason we block any attempts to
stop/start this service since
@@ -108,7 +113,7 @@
}
return false;
}
-
+
public List<String> getKeyspaces() throws TException,
InvalidRequestException {
List<String> keyspaceNames = new ArrayList<String>();
List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
@@ -122,10 +127,20 @@
List<CfDef> empty = new ArrayList<CfDef>();
KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY,
DEFAULT_REPLICATION_FACTOR, empty);
m_cassandraServer.system_add_keyspace(ksDef);
+
+ // Publish an event that a new keyspace has been added
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put(EVENT_ADMIN_KEYSPACE_ADDED, name);
+ m_eventAdmin.sendEvent(new Event(EVENT_ADMIN_TOPIC, properties));
}
-
+
public void dropKeyspace(String keyspace) throws InvalidRequestException,
TException {
m_cassandraServer.system_drop_keyspace(keyspace);
+
+ // Publish an event that a keyspace has been dropped
+ Map<String, String> properties = new HashMap<String, String>();
+ properties.put(EVENT_ADMIN_KEYSPACE_DROPPED, keyspace);
+ m_eventAdmin.sendEvent(new Event(EVENT_ADMIN_TOPIC, properties));
}
public boolean columnFamilyExists(String keyspaceName, String
columnFamilyName) throws NotFoundException, InvalidRequestException {
@@ -148,7 +163,7 @@
}
return cfNames;
}
-
+
public void addColumnFamily(String keyspace, String cfName, String
columnType, String comparatorType,
String subComparatorType) throws InvalidRequestException, TException {
if (keyspace.equals(Table.SYSTEM_TABLE)) {
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Wed Jan 5 12:58:02 2011
@@ -16,7 +16,11 @@
*/
package org.amdatu.cassandra.listener.service;
-import java.util.*;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
@@ -26,12 +30,19 @@
import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.thrift.*;
-import org.apache.felix.dm.*;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
import org.apache.thrift.TException;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
import org.osgi.service.log.LogService;
import org.osgi.util.tracker.ServiceTracker;
@@ -41,7 +52,7 @@
* add the respective ColumnFamily to Cassandra, and registers the service as
soon as the
* ColumnFamily is actually available.
*/
-public class ColumnFamilyHandler {
+public class ColumnFamilyHandler implements EventHandler {
private volatile LogService m_logService;
private volatile CassandraDaemonService m_daemonService;
@@ -51,8 +62,14 @@
private volatile BundleContext m_context;
private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
- new HashMap<KeySpaceColumnFamilyCombination, Component>();
+ new HashMap<KeySpaceColumnFamilyCombination, Component>();
+ @SuppressWarnings("unchecked")
+ public void init() {
+ Dictionary d = new Hashtable();
+ d.put(EventConstants.EVENT_TOPIC, new
String[]{CassandraDaemonService.EVENT_ADMIN_TOPIC});
+ m_context.registerService( EventHandler.class.getName(), this, d );
+ }
public void start() {
try {
// Register all currently available keyspace/columnfamily
combinations.
@@ -142,7 +159,7 @@
if (!m_daemonService.columnFamilyExists(ksName, cfName)) {
m_daemonService.addColumnFamily(ksName, cfName, columnType,
comparatorType, subComparatorType);
waitForColumnFamilyAndRegisterService(ksName, cfName);
- m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' added");
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' added to keyspace '" + ksName + "'");
}
else {
// Since Cassandra does not (yet) support updating columnType,
comparatorType or subComparatorType
@@ -153,7 +170,7 @@
+ "' has been changed, but changes in columnType,
comparatorType "
+ "and subComparatorType are not supported by Cassandra");
}
- m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' not changed");
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' not changed in keyspace '" + ksName + "'");
}
}
@@ -191,13 +208,13 @@
if (!found) {
throw new IllegalStateException("After waiting for " + 10000 +
"ms, columnfamily " +
- columnFamily + " is not yet available.");
+ columnFamily + " is not yet available.");
}
}
private CassandraPersistenceManager getPersistenceManager(String keyspace)
throws InterruptedException {
String objectClassFilter = "(" +
org.osgi.framework.Constants.OBJECTCLASS + "="
- + CassandraPersistenceManager.class.getName() + ")";
+ + CassandraPersistenceManager.class.getName() + ")";
String keyspaceFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
String persistenceManagerFilter = "(&" + objectClassFilter +
keyspaceFilter + ")";
@@ -225,7 +242,7 @@
if (persistenceManager == null) {
throw new IllegalStateException("After waiting for " + 5000 + "ms,
we don't have a "
- + "persistencemanager for " + keyspace + " yet.");
+ + "persistencemanager for " + keyspace + " yet.");
}
return persistenceManager;
}
@@ -240,8 +257,8 @@
serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
Component component = m_dependencyManager.createComponent()
- .setInterface(ColumnFamilyAvailable.class.getName(),
serviceProps)
- .setImplementation(ColumnFamilyAvailableImpl.class);
+ .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
+ .setImplementation(ColumnFamilyAvailableImpl.class);
m_services.put(new KeySpaceColumnFamilyCombination(keyspace,
columnFamily), component);
m_dependencyManager.add(component);
@@ -254,4 +271,39 @@
put("columnFamily", columnFamily);
}
}
+
+ private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
+ for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
+ if (cfDef.getKeyspaces() == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ try {
+ Object keyspaceAdded =
event.getProperty(CassandraDaemonService.EVENT_ADMIN_KEYSPACE_ADDED);
+ if (keyspaceAdded != null) {
+ // If a keyspace was added, we must add ColumnFamily's for all
keyspace-global ColumnFamilyProvider's
+ // (this are all ColumnFamilyProvider's that defined keyspace
null)
+ m_logService.log(LogService.LOG_DEBUG, "Recieved keyspace
added event for keyspace '" + keyspaceAdded + "' ");
+
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
+ if (servRefs != null) {
+ for (ServiceReference ref : servRefs) {
+ ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
+ if (isKeyspaceGlobal(provider)) {
+ m_logService.log(LogService.LOG_DEBUG, "Updating
ColumnFamilyProvider '" + provider.getClass().getName() + "' as it is
keyspace-global");
+ columnFamilyProviderAdded(provider);
+ }
+ }
+ }
+ }
+ }
+ catch (InvalidSyntaxException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not handle event '"
+ event.getTopic() + "' ", e);
+ }
+ }
}
Modified: trunk/integration-tests/pom.xml
==============================================================================
--- trunk/integration-tests/pom.xml (original)
+++ trunk/integration-tests/pom.xml Wed Jan 5 12:58:02 2011
@@ -25,7 +25,7 @@
<!-- This is required to be first so that pax-exam classloader is not
messed up with a newer version of felix
which would lead to java.lang.NoSuchMethodError:
org.apache.felix.framework.Logger.<init>(I)V -->
-
+
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam</artifactId>
@@ -139,7 +139,7 @@
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.web.rest</groupId>
<artifactId>jaxrs</artifactId>
@@ -188,45 +188,45 @@
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.authentication.oauth</groupId>
- <artifactId>api</artifactId>
+ <artifactId>api</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.authentication.oauth</groupId>
- <artifactId>server</artifactId>
+ <artifactId>server</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
</dependency>
<dependency>
<groupId>org.amdatu.authentication.oauth</groupId>
- <artifactId>client</artifactId>
+ <artifactId>client</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.authentication.oauth</groupId>
- <artifactId>consumerregistry-fs</artifactId>
+ <artifactId>consumerregistry-fs</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.authentication</groupId>
- <artifactId>tokenprovider</artifactId>
+ <artifactId>tokenprovider</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.authentication.tokenstore</groupId>
- <artifactId>mem</artifactId>
+ <artifactId>mem</artifactId>
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
@@ -237,7 +237,7 @@
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.amdatu.semanticweb</groupId>
<artifactId>rdf2go.api</artifactId>
@@ -251,10 +251,16 @@
<version>${platform.version}</version>
<scope>test</scope>
<type>bundle</type>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.eventadmin</artifactId>
+ <version>${org.apache.felix.eventadmin.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.configadmin</artifactId>
<version>${org.apache.felix.configadmin.version}</version>
<scope>test</scope>
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
==============================================================================
---
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
(original)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
Wed Jan 5 12:58:02 2011
@@ -97,6 +97,7 @@
compendium(),
dependencyManager(),
configAdmin(),
+ eventAdmin(),
felixLog(),
// Amdatu platform bundles
@@ -413,8 +414,7 @@
}
protected static MavenArtifactProvisionOption felixHttpServiceWhiteboard()
{
- return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.http.whiteboard")
- .versionAsInProject();
+ return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.http.whiteboard").versionAsInProject();
}
protected static MavenArtifactProvisionOption felixLog() {
@@ -422,13 +422,15 @@
}
protected static MavenArtifactProvisionOption configAdmin() {
- return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin")
- .versionAsInProject();
+ return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin").versionAsInProject();
+ }
+
+ protected static MavenArtifactProvisionOption eventAdmin() {
+ return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.eventadmin").versionAsInProject();
}
protected static MavenArtifactProvisionOption dependencyManager() {
- return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager")
- .versionAsInProject();
+ return
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager").versionAsInProject();
}
protected static MavenArtifactProvisionOption compendium() {
Added:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
==============================================================================
--- (empty file)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/mock/ColumnFamilyProviderImpl.java
Wed Jan 5 12:58:02 2011
@@ -0,0 +1,36 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.test.integration.mock;
+
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition.ColumnType;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition.CompareType;
+
+public class ColumnFamilyProviderImpl implements ColumnFamilyProvider {
+ private String[] m_keyspaces;
+ private String m_columnFamily;
+
+ public ColumnFamilyProviderImpl(String[] keyspaces, String columnFamily) {
+ m_keyspaces = keyspaces;
+ m_columnFamily = columnFamily;
+ }
+ public ColumnFamilyDefinition[] getColumnFamilies() {
+ return new ColumnFamilyDefinition[] {new ColumnFamilyDefinition(
+ m_columnFamily, m_keyspaces, ColumnType.SUPER,
CompareType.BYTESTYPE, CompareType.BYTESTYPE)};
+ }
+}
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
==============================================================================
---
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
(original)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
Wed Jan 5 12:58:02 2011
@@ -24,15 +24,19 @@
import junit.framework.Assert;
import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
import org.amdatu.cassandra.listener.ColumnFamilyDefinition.ColumnType;
import org.amdatu.cassandra.listener.ColumnFamilyDefinition.CompareType;
import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
import org.amdatu.test.integration.base.ConfigProvider;
import org.amdatu.test.integration.base.IntegrationTestBase;
+import org.amdatu.test.integration.mock.ColumnFamilyProviderImpl;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,6 +46,7 @@
import org.ops4j.pax.exam.junit.JUnit4TestRunner;
import org.osgi.framework.BundleException;
import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.log.LogService;
/**
* This class provides an integration test for the Cassandra Daemon service.
@@ -51,10 +56,15 @@
@RunWith(JUnit4TestRunner.class)
public class CassandraDaemonIntegrationTest extends IntegrationTestBase {
private final static String KEYSPACE = "IntegrationTestKeySpace";
+ private final static String KEYSPACE2 = "IntegrationTestKeySpace2";
+ private final static String KEYSPACE3 = "IntegrationTestKeySpace3";
private final static String COLUMNFAMILY = "IntegrationTestColumnFamily";
+ private final static String COLUMNFAMILY2 = "IntegrationTestColumnFamily2";
private volatile CassandraDaemonService m_daemonService;
private volatile ConfigurationAdmin m_configAdmin;
+ private volatile DependencyManager m_dependencyManager;
+ private volatile LogService m_logService;
@Configuration
public Option[] configure() {
@@ -64,8 +74,8 @@
protected Option provisionBundles() {
// Deploy bundles needed by cassandra daemon integration test
return provision(
- javaxServlet(), // Required if the httpservice is not deployed
- amdatuConfigTemplateManager(), // Required for placeholders in
cassandra.yaml
+ javaxServlet(), // Required if the httpservice is not deployed
+ amdatuConfigTemplateManager(), // Required for placeholders in
cassandra.yaml
amdatuCassandraApplication(),
amdatuCassandraListener(),
amdatuCassandraPersistenceManager());
@@ -77,11 +87,12 @@
// during this integration test. If we don't, it is likely that the
two schema updates conflict
// with each other (see org.apache.cassandra.db.migration.Migration)
String filter = "(" + CassandraPersistenceManager.KEYSPACE_AWARE_KEY +
"="
- + Table.SYSTEM_TABLE + ")";
+ + Table.SYSTEM_TABLE + ")";
return new Component[] { manager.createComponent()
.setImplementation(this)
.add(manager.createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
.add(manager.createServiceDependency().setService(ConfigurationAdmin.class).setRequired(true))
+
.add(manager.createServiceDependency().setService(LogService.class).setRequired(true))
.add(manager.createServiceDependency().setService(CassandraPersistenceManager.class,
filter).setRequired(true)) };
}
@@ -100,21 +111,21 @@
}
}
}
-
- @Before
+
+ @Before
public void initConfig() throws IOException, BundleException {
- m_configAdmin = getService(ConfigurationAdmin.class);
-
- // Add cassandra and templates configs
- ConfigProvider configProvider = new ConfigProvider();
- configProvider.addCassandraConfig(m_configAdmin);
- configProvider.addTemplateConfig(m_configAdmin);
+ m_configAdmin = getService(ConfigurationAdmin.class);
+
+ // Add cassandra and templates configs
+ ConfigProvider configProvider = new ConfigProvider();
+ configProvider.addCassandraConfig(m_configAdmin);
+ configProvider.addTemplateConfig(m_configAdmin);
}
@Test
public void testCassandraDaemonService() throws Exception {
- m_daemonService = getService(CassandraDaemonService.class);
-
+ m_daemonService = getService(CassandraDaemonService.class);
+
// -1- Test adding/updating/removing keyspaces
int beforeCount = m_daemonService.getKeyspaces().size();
m_daemonService.addKeyspace(KEYSPACE);
@@ -163,6 +174,9 @@
m_daemonService.addColumnFamily(KEYSPACE, COLUMNFAMILY.toUpperCase(),
ColumnType.STANDARD.value,
CompareType.BYTESTYPE.value, null);
+ // Rest registration of column family's and keyspaces using
ColumnFamilyProvider's
+ testColumnFamilyProvider();
+
// Sleep a second, this increases the probability that write requests
have been handled
Thread.sleep(1000);
@@ -170,8 +184,63 @@
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY));
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY.toLowerCase()));
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY.toUpperCase()));
- Assert.assertTrue("Expected available ColumnFamily's = 3, but actual =
"
- + allColumnFamilies.size(), allColumnFamilies.size() == 3);
+ Assert.assertTrue("Expected available ColumnFamily's = 4, but actual =
"
+ + allColumnFamilies.size(), allColumnFamilies.size() == 4);
+
+ }
+ // This tests the proper working of ColumnFamily providers. If I register
a new columnfamily provider,
+ // a CF should be added for it to the proper cassandra keyspace(s).
+ private void testColumnFamilyProvider() throws TException,
InvalidRequestException, NotFoundException, InterruptedException {
+ // -1- Register a ColumnFamilyProvider for one specific (new) keyspace.
+ m_dependencyManager.add(
+ m_dependencyManager.createComponent()
+ .setImplementation(new ColumnFamilyProviderImpl(new
String[]{KEYSPACE2}, COLUMNFAMILY))
+ .setInterface(new String[] {ColumnFamilyProvider.class.getName()},
null));
+ m_logService.log(LogService.LOG_DEBUG, "TestColumnFamilyProvider
service registered");
+
+ // Give the cassandra listener 3 seconds time to pick up the column
family provider and add a keyspace/CF for it
+ Thread.sleep(3000);
+
+ // -2- Check if a new KEYSPACE2 is available with column family
COLUMNFAMILY
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE2 + "', but the keyspace is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getKeyspaces().contains(KEYSPACE2));
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE2 + "', but the CF '" + COLUMNFAMILY + "' is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getColumnFamilies(KEYSPACE2).contains(COLUMNFAMILY));
+
+ // -3- Now register another ColumnFamilyProvider with keyspace null.
+ m_dependencyManager.add(
+ m_dependencyManager.createComponent()
+ .setImplementation(new ColumnFamilyProviderImpl(null,
COLUMNFAMILY2))
+ .setInterface(new String[] {ColumnFamilyProvider.class.getName()},
null));
+ m_logService.log(LogService.LOG_DEBUG, "TestColumnFamilyProvider2
service registered");
+
+ // Give the cassandra listener 3 seconds time to pick up the column
family provider and add a keyspace/CF for it
+ Thread.sleep(3000);
+
+ // -4- Verify that the CF comes available in all keyspaces
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE + "', but the keyspace is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getKeyspaces().contains(KEYSPACE));
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE2 + "', but the keyspace is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getKeyspaces().contains(KEYSPACE2));
+
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE+ "', but the CF '" + COLUMNFAMILY2 + "' is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getColumnFamilies(KEYSPACE).contains(COLUMNFAMILY2));
+ Assert.assertTrue("A ColumnFamilyProvider was registered for keyspace
'" + KEYSPACE2 + "', but the CF '" + COLUMNFAMILY2 + "' is " +
+ "unavailable in the cassandra daemon service",
m_daemonService.getColumnFamilies(KEYSPACE2).contains(COLUMNFAMILY2));
+
+ // -5- Add another keyspace
+ m_daemonService.addKeyspace(KEYSPACE3);
+
+ // Give the cassandra listener 3 seconds time to pick up the added
keyspace and a CF for it
+ Thread.sleep(3000);
+
+ // -6- Verify that our ColumnFamily is available in this new keyspace
+ Assert.assertTrue("Keyspace '" + KEYSPACE3 + "' added, but
keyspace-global ColumnFamily '" + COLUMNFAMILY2 + "' does not come " +
+ "available in this keyspace",
m_daemonService.getColumnFamilies(KEYSPACE3).contains(COLUMNFAMILY2));
+
+ // -7- Cleanup: remove KEYSPACE2, KEYSPACE3
+ m_daemonService.dropKeyspace(KEYSPACE2);
+ m_daemonService.dropKeyspace(KEYSPACE3);
}
}