Author: angelo.vandersijpt at luminis.eu
Date: Wed Dec 15 18:57:28 2010
New Revision: 503
Log:
AMDATU-232 Removed the CassandraDaemonListener. All columnfamily-business now
is handled by the ColumnFamilyHandler; this should make our Cassandra-setup a
bit more robust.
Added:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Removed:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraDaemonServiceListener.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyProviderListener.java
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
Wed Dec 15 18:57:28 2010
@@ -18,8 +18,7 @@
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.listener.ColumnFamilyProvider;
-import org.amdatu.cassandra.listener.service.CassandraDaemonServiceListener;
-import org.amdatu.cassandra.listener.service.ColumnFamilyProviderListener;
+import org.amdatu.cassandra.listener.service.ColumnFamilyHandler;
import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
@@ -34,24 +33,17 @@
public class Activator extends DependencyActivatorBase {
@Override
public void init(BundleContext context, DependencyManager manager) throws
Exception {
- // Register the cassandra daemon service listener
- manager.add(
- createComponent()
- .setImplementation(CassandraDaemonServiceListener.class)
-
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
-
// Register the CassandraColumnFamilyProvider listener
manager
.add(
createComponent()
- .setImplementation(ColumnFamilyProviderListener.class)
+ .setImplementation(ColumnFamilyHandler.class)
.add(createServiceDependency().setService(LogService.class).setRequired(true))
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
- .add(
-
createServiceDependency().setService(ColumnFamilyProvider.class).setCallbacks("onAdded",
"onRemoved")));
+ .add(createServiceDependency()
+ .setService(ColumnFamilyProvider.class)
+ .setCallbacks("columnFamilyProviderAdded",
"columnFamilyProviderRemoved")));
}
@Override
Added:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Wed Dec 15 18:57:28 2010
@@ -0,0 +1,256 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.listener.service;
+
+import java.util.*;
+
+import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.persistencemanager.CassandraException;
+import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
+import
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.thrift.*;
+import org.apache.felix.dm.*;
+import org.apache.thrift.TException;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogService;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * This class makes sure that all ColumnFamilies that are available in
Cassandra get a
+ * corresponding ColumnFamilyAvailable service. Also, it listens for
ColumnFamilyDefinitions,
+ * add the respective ColumnFamily to Cassandra, and registers the service as
soon as the
+ * ColumnFamily is actually available.
+ */
+public class ColumnFamilyHandler {
+
+ private volatile LogService m_logService;
+ private volatile CassandraDaemonService m_daemonService;
+ private volatile CassandraPersistenceManagerFactory m_pmFactory;
+
+ private volatile DependencyManager m_dependencyManager;
+ private volatile BundleContext m_context;
+
+ private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
+ new HashMap<KeySpaceColumnFamilyCombination, Component>();
+
+ public void start() {
+ try {
+ // Register all currently available keyspace/columnfamily
combinations.
+ List<KsDef> keyspaces =
m_daemonService.getCassandraServer().describe_keyspaces();
+ for (KsDef keyspace : keyspaces) {
+ String keyspaceName = keyspace.getName();
+ m_pmFactory.createCassandraPersistenceManager(keyspaceName);
+
+ List<CfDef> columnFamilies = keyspace.getCf_defs();
+ for (CfDef columnFamily : columnFamilies) {
+ addServiceFor(keyspaceName, columnFamily.getName());
+ }
+ }
+ }
+ catch (TException e) {
+ m_logService.log(LogService.LOG_ERROR, "Error registering
ColumnFamilyAvailable services for existing families.", e);
+ }
+ catch (InvalidRequestException e) {
+ m_logService.log(LogService.LOG_ERROR, "Error registering
ColumnFamilyAvailable services for existing families.", e);
+ }
+ }
+
+ private void stop() {
+ // Since we registered all services, we should also remove them.
+ for (Component component : m_services.values()) {
+ m_dependencyManager.remove(component);
+ }
+ }
+
+ public void columnFamilyProviderAdded(ColumnFamilyProvider provider) {
+ try {
+ List<KsDef> keyspaceDefinitions =
m_daemonService.getCassandraServer().describe_keyspaces();
+ ColumnFamilyDefinition[] columnFamilies =
provider.getColumnFamilies();
+ for (ColumnFamilyDefinition columnFamily : columnFamilies) {
+
+ // Add the columnfamilies to the requested keyspaces, or to
_all_ available keyspaces.
+ String[] keyspaces = columnFamily.getKeyspaces();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ // Never add ColumnFamily's to Cassandra's system
keyspace, this is a reserved keyspace
+ if (!Table.SYSTEM_TABLE.equals(keyspace)) {
+ // Create if it does not yet exist
+ if (!m_daemonService.keyspaceExists(keyspace)) {
+ m_daemonService.addKeyspace(keyspace);
+
m_pmFactory.createCassandraPersistenceManager(keyspace);
+
+ m_logService.log(LogService.LOG_INFO,
"Keyspace '" + keyspace + "' added");
+ }
+ addOrUpdateColumnFamily(keyspace, columnFamily);
+ }
+ }
+ }
+ else {
+ for (KsDef keyspaceDef : keyspaceDefinitions) {
+ if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName()))
{
+ addOrUpdateColumnFamily(keyspaceDef.getName(),
columnFamily);
+ }
+ }
+ }
+ }
+ }
+ catch (TException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
+ }
+ catch (InvalidRequestException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
+ }
+ catch (NotFoundException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
+ }
+ }
+
+ public void columnFamilyProviderRemoved(ColumnFamilyProvider provider) {
+ // We don't act on this yet.
+ }
+
+ private void addOrUpdateColumnFamily(final String ksName,
ColumnFamilyDefinition colDef) throws InvalidRequestException,
+ TException, NotFoundException {
+ final String cfName = colDef.getName();
+ String columnType = colDef.getColumnType().value;
+ String comparatorType = colDef.getCompareWith().value;
+ String subComparatorType = colDef.getCompareSubcolumnsWith().value;
+
+ if (!m_daemonService.columnFamilyExists(ksName, cfName)) {
+ m_daemonService.addColumnFamily(ksName, cfName, columnType,
comparatorType, subComparatorType);
+ waitForColumnFamilyAndRegisterService(ksName, cfName);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' added");
+ }
+ else {
+ // Since Cassandra does not (yet) support updating columnType,
comparatorType or subComparatorType
+ // of existing ColumnFamily's, we throw an exception if one of
these has been changed by the provider.
+ // If there are no changes, we do nothing
+ if (m_daemonService.isColumnFamilyChanged(ksName, cfName,
columnType, comparatorType, subComparatorType)) {
+ throw new InvalidRequestException("Definition of ColumnFamily
'" + cfName
+ + "' has been changed, but changes in columnType,
comparatorType "
+ + "and subComparatorType are not supported by Cassandra");
+ }
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' not changed");
+ }
+ }
+
+ private void waitForColumnFamilyAndRegisterService(final String keyspace,
final String columnFamily) {
+ new Thread("waiting for keyspace " + keyspace + ", columnfamily " +
columnFamily) {
+ public void run() {
+ waitFor(keyspace, columnFamily);
+ addServiceFor(keyspace, columnFamily);
+ }
+
+ }.start();
+ }
+
+ private void waitFor(String keyspace, String columnFamily) {
+ CassandraPersistenceManager persistenceManager =
getPersistenceManager(keyspace);
+
+ long startTime = System.currentTimeMillis();
+ boolean found = false;
+ while (System.currentTimeMillis() - startTime < 10000 && found ==
false) {
+ try {
+ found |= persistenceManager.exists(columnFamily);
+ }
+ catch (CassandraException e) {
+ // apparently our columnFamily isn't available yet... try
again next round.
+ }
+
+ try {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (!found) {
+ throw new IllegalStateException("After waiting for " + 10000 +
"ms, columnfamily " +
+ columnFamily + " is not yet available.");
+ }
+ }
+
+ private CassandraPersistenceManager getPersistenceManager(String keyspace)
{
+ String objectClassFilter = "(" +
org.osgi.framework.Constants.OBJECTCLASS + "="
+ + CassandraPersistenceManager.class.getName() + ")";
+ String keyspaceFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+ String persistenceManagerFilter = "(&" + objectClassFilter +
keyspaceFilter + ")";
+
+ ServiceTracker tracker;
+ try {
+ tracker = new ServiceTracker(m_context,
m_context.createFilter(persistenceManagerFilter), null);
+ }
+ catch (InvalidSyntaxException e) {
+ /*
+ * This should not happen, since we construct the filter
ourselves. If it does, the keyspace or
+ * columnFamily is invalid.
+ */
+ m_logService.log(LogService.LOG_ERROR, "Could not create filter: "
+ persistenceManagerFilter, e);
+ return null; // let it NPE one level up.
+ }
+ tracker.open();
+
+ CassandraPersistenceManager persistenceManager = null;
+ try {
+ persistenceManager = (CassandraPersistenceManager)
tracker.waitForService(5000);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (persistenceManager == null) {
+ throw new IllegalStateException("After waiting for " + 5000 + "ms,
we don't have a "
+ + "persistencemanager for " + keyspace + " yet.");
+ }
+ return persistenceManager;
+ }
+
+ private void addServiceFor(String keyspace, String columnFamily) {
+ if (m_services.containsKey(new
KeySpaceColumnFamilyCombination(keyspace, columnFamily))) {
+ return;
+ }
+
+ Dictionary<String, String> serviceProps = new Hashtable<String,
String>();
+ serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspace);
+ serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
+
+ Component component = m_dependencyManager.createComponent()
+ .setInterface(ColumnFamilyAvailable.class.getName(),
serviceProps)
+ .setImplementation(ColumnFamilyAvailableImpl.class);
+
+ m_services.put(new KeySpaceColumnFamilyCombination(keyspace,
columnFamily), component);
+ m_dependencyManager.add(component);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + columnFamily
+ "' is now available");
+ }
+
+ private static class KeySpaceColumnFamilyCombination extends
HashMap<String, Object> {
+ public KeySpaceColumnFamilyCombination(String keyspace, String
columnFamily) {
+ put("keyspace", keyspace);
+ put("columnFamily", columnFamily);
+ }
+ }
+}