Author: ivol37 at gmail.com
Date: Wed Jan 19 15:17:10 2011
New Revision: 678
Log:
[AMDATU-252] Refactored registration mechanism for ColumnFamilyProviders and
CassandraPersistenceManagers to work in a clustered setup
Added:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
Wed Jan 19 15:17:10 2011
@@ -116,16 +116,12 @@
// Now wait until the operation mode of the daemon becomes
"Normal". In auto bootstrap mode this can take quite a
// while (2 minutes minimum). In a single node cluster this
will be almost immediately. Unfortunately the operation
// mode is not covered by any enum value.
- String prevOperationMode = "";
String operationMode =
StorageService.instance.getOperationMode();
- while (!operationMode.equals("Normal") && !isInterrupted() &&
isAlive()) {
- if (!operationMode.equals(prevOperationMode)) {
- m_logService.log(LogService.LOG_INFO, "Current
Cassandra Daemon operation mode is '" + operationMode
- + "', waiting for daemon to reach operation mode
'Normal'");
- }
- prevOperationMode = operationMode;
- operationMode = StorageService.instance.getOperationMode();
+ while (!"Normal".equals(operationMode) && !isInterrupted() &&
isAlive()) {
+ m_logService.log(LogService.LOG_INFO, "Current Cassandra
Daemon operation mode is '" + operationMode
+ + "', waiting for daemon to reach operation mode
'Normal'");
Thread.sleep(DAEMON_TIMEOUT);
+ operationMode = StorageService.instance.getOperationMode();
}
if
("Normal".equals(StorageService.instance.getOperationMode())) {
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
Wed Jan 19 15:17:10 2011
@@ -18,6 +18,7 @@
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.listener.service.CassandraUpdateListenerImpl;
import org.amdatu.cassandra.listener.service.ColumnFamilyHandler;
import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
import org.apache.felix.dm.DependencyActivatorBase;
@@ -33,17 +34,23 @@
public class Activator extends DependencyActivatorBase {
@Override
public void init(BundleContext context, DependencyManager manager) throws
Exception {
+ manager.add(createComponent()
+ .setImplementation(CassandraUpdateListenerImpl.class)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
+
// Register the CassandraColumnFamilyProvider listener
manager
- .add(
+ .add(
createComponent()
- .setImplementation(ColumnFamilyHandler.class)
-
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
- .add(createServiceDependency()
- .setService(ColumnFamilyProvider.class)
- .setCallbacks("columnFamilyProviderAdded",
"columnFamilyProviderRemoved")));
+ .setImplementation(ColumnFamilyHandler.class)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
+ .add(createServiceDependency()
+ .setService(ColumnFamilyProvider.class)
+ .setCallbacks("columnFamilyProviderAdded",
"columnFamilyProviderRemoved")));
}
@Override
Added:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Wed Jan 19 15:17:10 2011
@@ -0,0 +1,280 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.listener.service;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
+import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.TException;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogService;
+
+/**
+ * This class is responsible for listening to changes (add or remove) in the
Cassandra database with respect to
+ * keyspaces and ColumnFamilies. When a keyspace is added, it will register a
new CassandraPersistenceManager.
+ * When a keyspace is dropped, it will unregister the corresponding
CassandraPersistenceManager.
+ * When a ColumnFamily is added, it will register a new ColumnFamilyAvailable
service. When a ColumnFamily is dropped
+ * it will unregister the corresponding ColumnFamilyAvailable service.
+ * Note that in a clustered setup, keyspaces and columnfamilies can be added
or removed by other nodes in the cluster.
+ * Since there is no event mechanism for triggering those changes, we create
an inspect thread that inspects available
+ * keyspaces and columnfamilies each x seconds.
+ *
+ * @author ivol
+ */
+public class CassandraUpdateListenerImpl {
+ // Services injected by the dependency manager
+ private volatile LogService m_logService;
+ private volatile CassandraDaemonService m_daemonService;
+ private volatile CassandraPersistenceManagerFactory m_pmFactory;
+ private volatile BundleContext m_context;
+ private volatile DependencyManager m_dependencyManager;
+
+ // The thread that inspects the cassandra db for changes
+ private InpectKeyspaceColumnFamilyThread m_inspectThread;
+
+ // The interval for each individual inspect
+ private int INSPECT_INTERVAL = 5000;
+
+ public void start() {
+ // Now start the inspect thread
+ m_inspectThread = new InpectKeyspaceColumnFamilyThread();
+ m_inspectThread.start();
+ }
+
+ public void stop() {
+ // Stop the inspect thread
+ m_inspectThread.interrupt();
+ }
+
+ /**
+ * This Thread inspects available Keyspaces and ColumnFamilies in
Cassandra and compares these to
+ * the registered CassandraPersistenceManager and ColumnFamilyAvailable
services. Since other nodes
+ * in the cluster may add, update or drop keyspaces and columnfamilies and
Cassandra does not support
+ * any event mechanism to act upon these events we will continuously
inspect this ourselves.
+ * @author ivol
+ *
+ */
+ class InpectKeyspaceColumnFamilyThread extends Thread {
+ Map<String, List<String>> m_keyspaceColumnFamilyMap = new
HashMap<String, List<String>>();
+
+ @Override
+ public void run() {
+ try {
+ while(!isInterrupted()) {
+ // Inspect available keyspaces
+ try {
+ // Only compare keyspaces with CPM's if there was a
keyspace change
+ // in the meantime
+ if (checkForUpdates()) {
+ onKeyspaceAdded();
+ onKeyspaceDropped();
+ onColumnFamilyAdded();
+ onColumnFamilyRemoved();
+ }
+ m_keyspaceColumnFamilyMap =
getKeyspaceColumnFamilyMap();
+ }
+ catch (TException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not
retrieve keyspaces", e);
+ }
+ catch (InvalidRequestException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not
retrieve keyspaces", e);
+ }
+ catch (InvalidSyntaxException e) {
+ m_logService.log(LogService.LOG_ERROR,"Could not
retrieve Cassandra Persistence Manager services", e);
+ }
+ catch (NotFoundException e) {
+ m_logService.log(LogService.LOG_ERROR,"An error
occurred while synchronizing ColumnFamilyAvailable services", e);
+ }
+
+ Thread.sleep(INSPECT_INTERVAL);
+ }
+ }
+ catch (InterruptedException e) {
+ }
+ }
+
+ private Map<String, List<String>> getKeyspaceColumnFamilyMap() throws
TException, InvalidRequestException, NotFoundException {
+ Map<String, List<String>> map = new HashMap<String,
List<String>>();
+ for (String keyspace : m_daemonService.getKeyspaces()) {
+ map.put(keyspace, m_daemonService.getColumnFamilies(keyspace));
+ }
+ return map;
+ }
+
+ private boolean checkForUpdates() throws TException,
InvalidRequestException, NotFoundException {
+ if (m_keyspaceColumnFamilyMap.keySet() == null &&
m_daemonService.getKeyspaces() == null) {
+ return false;
+ } else if (m_daemonService.getKeyspaces() == null) {
+ return true;
+ } else {
+
+ return
!getKeyspaceColumnFamilyMap().equals(m_keyspaceColumnFamilyMap);
+ }
+ }
+
+ // Loop over all keyspaces and register a Cassandra persistence
manager when there is none
+ // available for that keyspace
+ private void onKeyspaceAdded() throws InvalidSyntaxException,
TException, InvalidRequestException, NotFoundException {
+ List<String> keyspaces = m_daemonService.getKeyspaces();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ String filter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
filter);
+ if (servRefs == null || servRefs.length == 0) {
+ // No Cassandra persistence manager available for this
keyspace, register it now
+
m_pmFactory.createCassandraPersistenceManager(keyspace);
+ }
+ }
+
+ // Now verify the keyspace global CF providers and add
ColumnFamilies for new keyspaces when needed
+ verifyKeyspaceGlobalProviders();
+ }
+ }
+
+ // Special use case: when a ColumnFamily{Provider is registered as a
keyspace global service (so keyspace equals null),
+ // the ColumnFamily should be present in all keyspaces. So when a
keyspace is added, the ColumnFamily should be added too
+ // for such a provider
+ public void verifyKeyspaceGlobalProviders() throws NotFoundException,
InvalidRequestException, TException, InvalidSyntaxException {
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
+ if (servRefs != null) {
+ for (ServiceReference ref : servRefs) {
+ ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
+ if (isKeyspaceGlobal(provider)) {
+ List<String> keyspaces =
m_daemonService.getKeyspaces();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ if (!Table.SYSTEM_TABLE.equals(keyspace)) {
+ // Verify that the ColumnFamily for this
keyspace global provider is available in this keyspace
+ for (ColumnFamilyDefinition cfDef :
provider.getColumnFamilies()) {
+ if
(!m_daemonService.getColumnFamilies(keyspace).contains(cfDef.getName())) {
+
m_logService.log(LogService.LOG_DEBUG, "Adding ColumnFamily '" +
cfDef.getName() + "' to keyspace '"
+ + keyspace + "' for the
keyspace-global ColumnFamilyProvider '" + provider.getClass().getName() + "'");
+ final String cfName =
cfDef.getName();
+ String columnType =
cfDef.getColumnType().value;
+ String comparatorType =
cfDef.getCompareWith().value;
+ String subComparatorType =
cfDef.getCompareSubcolumnsWith().value;
+
m_daemonService.addColumnFamily(keyspace, cfName, columnType, comparatorType,
subComparatorType);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
+ for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
+ if (cfDef.getKeyspaces() == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Loop over all Cassandra Persistence Managers and unregister them if
the corresponding keyspace
+ // is removed from Cassandra
+ private void onKeyspaceDropped() throws InvalidSyntaxException,
TException, InvalidRequestException {
+ List<String> keyspaces = m_daemonService.getKeyspaces();
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
null);
+ if (servRefs != null) {
+ for (ServiceReference servRef : servRefs) {
+ String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+ if (keyspaces == null || !keyspaces.contains(keyspace)){
+ // No keyspace available for this Cassandra
persistence manager, unregister the service now
+ m_context.ungetService(servRef);
+ m_logService.log(LogService.LOG_INFO, "Cassandra
Persistence Manager service for keyspace '" + keyspace + "' unregistered");
+ }
+ }
+ }
+ }
+
+ private void onColumnFamilyAdded() throws TException,
InvalidRequestException, InvalidSyntaxException, NotFoundException {
+ List<String> keyspaces = m_daemonService.getKeyspaces();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ List<String> columnFamilies =
m_daemonService.getColumnFamilies(keyspace);
+ for (String columnFamily : columnFamilies) {
+ String ksFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+ String cfFilter = "(" +
ColumnFamilyAvailable.FILTER_NAME + "=" + columnFamily + ")";
+ String filter = "(&" + ksFilter + cfFilter + ")";
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(),
filter);
+ if (servRefs == null || servRefs.length == 0) {
+ registerColumnamilyAvailableService(keyspace,
columnFamily);
+ }
+ }
+ }
+ }
+ }
+
+ private void onColumnFamilyRemoved() throws TException,
InvalidRequestException, InvalidSyntaxException, NotFoundException {
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(), null);
+ if (servRefs != null) {
+ for (ServiceReference servRef : servRefs) {
+ String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+ String columnFamily =
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
+ boolean remove = false;
+ try {
+ if
(!m_daemonService.getColumnFamilies(keyspace).contains(columnFamily)) {
+ remove = true;
+ }
+
+ } catch (NotFoundException e) {
+ // This exception is thrown when the keyspace could
not be found anymore, in which
+ // case also the ColumnFamilyAvailable for that
keyspace should be removed
+ remove = true;
+ }
+ if (remove) {
+ m_context.ungetService(servRef);
+ m_logService.log(LogService.LOG_INFO,
"ColumnFamilyAvailable service for keyspace '" + keyspace
+ + "' and ColumnFamily '" + columnFamily + "'
unregistered");
+ }
+ }
+ }
+ }
+
+ private void registerColumnamilyAvailableService(String keyspace,
String columnFamily) {
+ Dictionary<String, String> serviceProps = new Hashtable<String,
String>();
+ serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspace);
+ serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
+
+ Component component = m_dependencyManager.createComponent()
+ .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
+ .setImplementation(ColumnFamilyAvailableImpl.class);
+
+ m_dependencyManager.add(component);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamilyAvailable
service for keyspace '" + keyspace + "' and ColumnFamily '" + columnFamily +
"' registered");
+ }
+ }
+}
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Wed Jan 19 15:17:10 2011
@@ -16,88 +16,29 @@
*/
package org.amdatu.cassandra.listener.service;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.Hashtable;
import java.util.List;
-import java.util.Map;
import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
import org.amdatu.cassandra.listener.ColumnFamilyProvider;
-import org.amdatu.cassandra.persistencemanager.CassandraException;
-import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
-import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.felix.dm.Component;
-import org.apache.felix.dm.DependencyManager;
import org.apache.thrift.TException;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
import org.osgi.service.log.LogService;
-import org.osgi.util.tracker.ServiceTracker;
/**
- * This class makes sure that all ColumnFamilies that are available in
Cassandra get a
- * corresponding ColumnFamilyAvailable service. Also, it listens for
ColumnFamilyDefinitions,
- * add the respective ColumnFamily to Cassandra, and registers the service as
soon as the
- * ColumnFamily is actually available.
+ * This service listens to ColumnFamilyProvider services and adds or updates
keyspaces or ColumnFamilies
+ * if they are not yet available in Cassandra. Note that this service is not
responsible for registration
+ * of corresponding CassandraPersistenceManager and ColumnFamilyAvailable
services, that is up to the
+ * CassandraUpdateListener since that should work independent of this OSGi
framework (in a Cassandra cluster
+ * data is synchronized between Cassandra nodes).
*/
-public class ColumnFamilyHandler implements EventHandler {
-
+public class ColumnFamilyHandler {
+ // Service dependencies, injected by the dependency manager
private volatile LogService m_logService;
private volatile CassandraDaemonService m_daemonService;
- private volatile CassandraPersistenceManagerFactory m_pmFactory;
-
- private volatile DependencyManager m_dependencyManager;
- private volatile BundleContext m_context;
-
- private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
- new HashMap<KeySpaceColumnFamilyCombination, Component>();
-
- @SuppressWarnings("unchecked")
- public void init() {
- Dictionary d = new Hashtable();
- d.put(EventConstants.EVENT_TOPIC, new
String[]{CassandraDaemonService.EVENT_ADMIN_TOPIC});
- m_context.registerService( EventHandler.class.getName(), this, d );
- }
-
- public void start() {
- try {
- // Register all currently available keyspace/columnfamily
combinations.
- List<KsDef> keyspaces =
m_daemonService.getCassandraServer().describe_keyspaces();
- for (KsDef keyspace : keyspaces) {
- String keyspaceName = keyspace.getName();
- m_pmFactory.createCassandraPersistenceManager(keyspaceName);
- List<CfDef> columnFamilies = keyspace.getCf_defs();
- for (CfDef columnFamily : columnFamilies) {
- addServiceFor(keyspaceName, columnFamily.getName());
- }
- }
- }
- catch (TException e) {
- m_logService.log(LogService.LOG_ERROR, "Error registering
ColumnFamilyAvailable services for existing families.", e);
- }
- catch (InvalidRequestException e) {
- m_logService.log(LogService.LOG_ERROR, "Error registering
ColumnFamilyAvailable services for existing families.", e);
- }
- }
-
- public void stop() {
- // Since we registered all services, we should also remove them.
- for (Component component : m_services.values()) {
- m_dependencyManager.remove(component);
- }
- }
public void columnFamilyProviderAdded(ColumnFamilyProvider provider) {
try {
@@ -113,14 +54,15 @@
if (!Table.SYSTEM_TABLE.equals(keyspace)) {
// Create if it does not yet exist
m_daemonService.addKeyspace(keyspace);
- addOrUpdateColumnFamily(keyspace, columnFamily);
+ addOrUpdateColumnFamily(m_daemonService, keyspace,
columnFamily);
}
}
}
else {
+ // Add to all available keyspaces
for (KsDef keyspaceDef : keyspaceDefinitions) {
if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName()))
{
- addOrUpdateColumnFamily(keyspaceDef.getName(),
columnFamily);
+ addOrUpdateColumnFamily(m_daemonService,
keyspaceDef.getName(), columnFamily);
}
}
}
@@ -144,21 +86,18 @@
// We don't act on this yet.
}
- private void addOrUpdateColumnFamily(final String ksName,
ColumnFamilyDefinition colDef) throws InvalidRequestException,
+ private void addOrUpdateColumnFamily(CassandraDaemonService daemonService,
final String ksName, ColumnFamilyDefinition colDef) throws
InvalidRequestException,
TException, NotFoundException {
final String cfName = colDef.getName();
String columnType = colDef.getColumnType().value;
String comparatorType = colDef.getCompareWith().value;
String subComparatorType = colDef.getCompareSubcolumnsWith().value;
- if (m_daemonService.addColumnFamily(ksName, cfName, columnType,
comparatorType, subComparatorType)) {
- waitForColumnFamilyAndRegisterService(ksName, cfName);
- }
- else {
+ if (!daemonService.addColumnFamily(ksName, cfName, columnType,
comparatorType, subComparatorType)) {
// Since Cassandra does not (yet) support updating columnType,
comparatorType or subComparatorType
// of existing ColumnFamily's, we throw an exception if one of
these has been changed by the provider.
// If there are no changes, we do nothing
- if (m_daemonService.isColumnFamilyChanged(ksName, cfName,
columnType, comparatorType, subComparatorType)) {
+ if (daemonService.isColumnFamilyChanged(ksName, cfName,
columnType, comparatorType, subComparatorType)) {
throw new InvalidRequestException("Definition of ColumnFamily
'" + cfName
+ "' has been changed, but changes in columnType,
comparatorType "
+ "and subComparatorType are not supported by Cassandra");
@@ -166,142 +105,4 @@
m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' not changed in keyspace '" + ksName + "'");
}
}
-
- private void waitForColumnFamilyAndRegisterService(final String keyspace,
final String columnFamily) {
- new Thread("waiting for keyspace " + keyspace + ", columnfamily " +
columnFamily) {
- public void run() {
- try {
- waitFor(keyspace, columnFamily);
- }
- catch (InterruptedException e) {
- // Right, someone wants us to stop. Abort!
- return;
- }
- addServiceFor(keyspace, columnFamily);
- }
- }.start();
- }
-
- private void waitFor(String keyspace, String columnFamily) throws
InterruptedException {
- CassandraPersistenceManager persistenceManager =
getPersistenceManager(keyspace);
-
- long startTime = System.currentTimeMillis();
- boolean found = false;
- while (System.currentTimeMillis() - startTime < 10000 && found ==
false) {
- try {
- found |= persistenceManager.exists(columnFamily);
- }
- catch (CassandraException e) {
- // apparently our columnFamily isn't available yet... try
again next round.
- }
-
- Thread.sleep(500);
- }
-
- if (!found) {
- throw new IllegalStateException("After waiting for " + 10000 +
"ms, columnfamily " +
- columnFamily + " is not yet available.");
- }
- }
-
- private CassandraPersistenceManager getPersistenceManager(String keyspace)
throws InterruptedException {
- String objectClassFilter = "(" +
org.osgi.framework.Constants.OBJECTCLASS + "="
- + CassandraPersistenceManager.class.getName() + ")";
- String keyspaceFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
- String persistenceManagerFilter = "(&" + objectClassFilter +
keyspaceFilter + ")";
-
- ServiceTracker tracker;
- try {
- tracker = new ServiceTracker(m_context,
m_context.createFilter(persistenceManagerFilter), null);
- }
- catch (InvalidSyntaxException e) {
- /*
- * This should not happen, since we construct the filter
ourselves. If it does, the keyspace or
- * columnFamily is invalid.
- */
- m_logService.log(LogService.LOG_ERROR, "Could not create filter: "
+ persistenceManagerFilter, e);
- return null; // let it NPE one level up.
- }
- tracker.open();
-
- CassandraPersistenceManager persistenceManager = null;
- try {
- persistenceManager = (CassandraPersistenceManager)
tracker.waitForService(5000);
- }
- finally {
- tracker.close();
- }
-
- if (persistenceManager == null) {
- throw new IllegalStateException("After waiting for " + 5000 + "ms,
we don't have a "
- + "persistencemanager for " + keyspace + " yet.");
- }
- return persistenceManager;
- }
-
- private void addServiceFor(String keyspace, String columnFamily) {
- if (m_services.containsKey(new
KeySpaceColumnFamilyCombination(keyspace, columnFamily))) {
- return;
- }
-
- Dictionary<String, String> serviceProps = new Hashtable<String,
String>();
- serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspace);
- serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
-
- Component component = m_dependencyManager.createComponent()
- .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
- .setImplementation(ColumnFamilyAvailableImpl.class);
-
- m_services.put(new KeySpaceColumnFamilyCombination(keyspace,
columnFamily), component);
- m_dependencyManager.add(component);
- m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + columnFamily
+ "' is now available in keyspace '" + keyspace + "'");
- }
-
- private static class KeySpaceColumnFamilyCombination extends
HashMap<String, Object> {
- private static final long serialVersionUID = 6574039194678276636L;
-
- public KeySpaceColumnFamilyCombination(String keyspace, String
columnFamily) {
- put("keyspace", keyspace);
- put("columnFamily", columnFamily);
- }
- }
-
- private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
- for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
- if (cfDef.getKeyspaces() == null) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void handleEvent(Event event) {
- try {
- Object keyspaceAdded =
event.getProperty(CassandraDaemonService.EVENT_ADMIN_KEYSPACE_ADDED);
- if (keyspaceAdded != null) {
- // If a keyspace was added, we must add ColumnFamily's for all
keyspace-global ColumnFamilyProvider's
- // (this are all ColumnFamilyProvider's that defined keyspace
null)
- String keyspaceName = keyspaceAdded.toString();
- m_logService.log(LogService.LOG_DEBUG, "Recieved keyspace
added event for keyspace '" + keyspaceName + "' ");
-
- // First register a new cassandra persistence manager for this
new keyspace
- m_pmFactory.createCassandraPersistenceManager(keyspaceName);
-
- ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
- if (servRefs != null) {
- for (ServiceReference ref : servRefs) {
- ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
- if (isKeyspaceGlobal(provider)) {
- m_logService.log(LogService.LOG_DEBUG, "Updating
ColumnFamilyProvider '" + provider.getClass().getName() + "' as it is
keyspace-global");
- columnFamilyProviderAdded(provider);
- }
- }
- }
- }
- }
- catch (InvalidSyntaxException e) {
- m_logService.log(LogService.LOG_ERROR, "Could not handle event '"
+ event.getTopic() + "' ", e);
- }
- }
}
Modified:
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
Wed Jan 19 15:17:10 2011
@@ -49,5 +49,6 @@
.setRequired(true));
m_dependencyManager.add(component);
+ m_logService.log(LogService.LOG_INFO, "Cassandra Persistence Manager
service for keyspace '" + keyspaceId + "' registered");
}
}