Author: [email protected]
Date: Fri Dec 16 14:40:32 2011
New Revision: 1852

Log:
[AMDATUCASSANDRA-138] Added support for concurrent schema modifications. The 
internal Cassandra client in the CassandraDaemonService is replaced by a Thrift 
client to the first node in the lexicographic sorted list of IP addresses of 
live nodes. As a result all schema modifications on all nodes in a cluster are 
send to the same node (to which we refer as the schema master).

Added:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
   
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
   
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
   
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
   
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      Fri Dec 16 14:40:32 2011
@@ -118,6 +118,11 @@
      * Configuration key that stores the schema agreement timeout (in seconds).
      */
     String SCHEMA_AGREEMENT_TIMEOUT = "schema_agreement_timeout";
+    
+    /**
+     * Configuration key that stores the Thrift timeout (in seconds).
+     */
+    String THRIFT_TIMEOUT = "thrift_timeout";
 
     /**
      * Returns the default replication factor for new keyspaces. The 
replication factor determines
@@ -200,4 +205,17 @@
      * @return the timeout (in seconds) to wait for schema agreement
      */
     int getSchemaAgreementTimeout();
+    
+    /**
+     * The timeout (in seconds) for Thrift connections used by the Cassandra 
daemon service. All schema changes
+     * are handled by this service, which opens a Thrift client to a central 
'schema master' to send all changes to
+     * (schema modifications cannot be handles by different nodes 
concurrently).
+     * This timeout determines how long the client will wait for a response. 
When a timeout occurs, the schema update 
+     * will fail.
+     * Note that if the central 'schema master' is down, this timeout 
determines how long it will take for the daemon
+     * to detect that it is down and switch to the new schema master. It is 
therefore recommended not to increase this
+     * timeout too much.
+     * @return the timeout (in seconds) to wait for a Thrift response
+     */
+    int getThriftTimeout();
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
     (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
     Fri Dec 16 14:40:32 2011
@@ -17,7 +17,6 @@
 
 import java.util.List;
 
-import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
 
 /**
@@ -46,14 +45,6 @@
     String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
 
     /**
-     * Returns the Cassandra server which represents the Cassandra Thrift API.
-     * 
-     * @see http://wiki.apache.org/cassandra/API
-     * @return the Cassandra server.
-     */
-    CassandraServer getCassandraServer();
-
-    /**
      * Returns a list of all available keyspaces. Note that keyspace names are 
case-sensitive
      * in Cassandra.
      * 

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  Fri Dec 16 14:40:32 2011
@@ -41,7 +41,7 @@
     private static final String STORAGE_CONF_SOURCE = "conf/cassandra.yaml";
     private static final String LOG4J_CONF_SOURCE = "conf/log4j.properties";
     private static final int DEFAULT_SCHEMA_AGREEMENT_TIMEOUT = 30;
-    
+    private static final int DEFAULT_THRIFT_TIMEOUT = 10;
 
     // Reference to the LogService
     private volatile LogService m_logService;
@@ -59,6 +59,7 @@
     private int m_storagePort;
     private String m_clusterName;
     private int m_schemaAgreementTimeout;
+    private int m_thriftTimeout;
 
     /**
      * The init() method is invoked by the Felix dependency manager. It allows 
us to initialize our service. In this
@@ -139,6 +140,13 @@
                 // For backwards compatibility
                 m_schemaAgreementTimeout = DEFAULT_SCHEMA_AGREEMENT_TIMEOUT;
             }
+            
+            if (dictionary.get(THRIFT_TIMEOUT) != null) {
+                m_thriftTimeout = toInt(dictionary.get(THRIFT_TIMEOUT));
+            } else {
+                // For backwards compatibility
+                m_thriftTimeout = DEFAULT_THRIFT_TIMEOUT;
+            }
         }
     }
 
@@ -219,4 +227,8 @@
     public int getSchemaAgreementTimeout() {
         return m_schemaAgreementTimeout;
     }
+    
+    public int getThriftTimeout() {
+        return m_thriftTimeout;
+    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 Fri Dec 16 14:40:32 2011
@@ -15,7 +15,9 @@
  */
 package org.amdatu.cassandra.application.service;
 
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -26,13 +28,19 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.cassandra.thrift.SchemaDisagreementException;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.log.LogService;
@@ -50,7 +58,7 @@
     // name for the replication_factor option in Cassandra (internally hard 
coded "replication_factor" is used all over
     // the place), so we define it here for internal use.
     private static final String RF_STRATEGY_OPTION = "replication_factor";
-
+    
     // Service dependencies, injected by the framework
     private volatile LogService m_logService;
     private volatile EventAdmin m_eventAdmin;
@@ -59,7 +67,10 @@
     // Local cache of ColumnTypes of all ColumnFamilies for each keyspace
     private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new 
HashMap<String, Map<String, String>>();
 
-    private CassandraServer m_cassandraServer = null;
+    // Thrift connection to a Cassandra daemon (local or remote)
+    private String m_connectString = null;
+    private TTransport m_transport;
+    private Cassandra.Iface m_cassandraClient = null;
 
     /**
      * The init() method is invoked by the Felix dependency manager.
@@ -68,33 +79,80 @@
         m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
initialized");
     }
 
-    public void start() throws TException, InvalidRequestException {
-        m_cassandraServer = new CassandraServer();
+    public void start() {
+        m_logService.log(LogService.LOG_INFO, getClass().getName() + "service 
started");
     }
 
     public void stop() {
         m_logService.log(LogService.LOG_INFO, getClass().getName() + "service 
stopped");
     }
 
-    public void destroy() {
-        m_cassandraServer = null;
+    public void destroy() throws Exception {
+        disconnect();
         m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
destroyed");
     }
 
     public void setConfiguration(CassandraConfigurationService config) {
         m_configuration = config;
     }
-    
+
     public void setLogService(LogService logService) {
         m_logService = logService;
     }
-    
+
     public void setEventAdmin(EventAdmin eventAdmin) {
         m_eventAdmin = eventAdmin;
     }
-    
-    public CassandraServer getCassandraServer() {
-        return m_cassandraServer;
+
+    private synchronized Cassandra.Iface getCassandraClient() throws 
TTransportException {
+        // This returns the IP addresses of all live nodes in the ring
+        List<String> liveNodes = StorageService.instance.getLiveNodes();
+
+        // Sort the list and pick the first record
+        Collections.sort(liveNodes);
+
+        // Verify if the connect string has changed (which may the case if 
nodes joined
+        // or leaved the cluster). If so, reconnect to the new Cassandra node.
+        String host = liveNodes.get(0);
+        int port = m_configuration.getRPCPort();
+        String connectString = host + ":" + port;
+        if (!connectString.equals(m_connectString)) {
+            connect(host, port);
+        }
+        return m_cassandraClient;
+    }
+
+    private void connect(String host, int port) throws TTransportException {
+        // First disconnect from the current daemon, if applicable
+        disconnect();
+
+        m_logService.log(LogService.LOG_INFO, "Establishing Thrift connection 
to the Cassandra server on "
+                        + host + ":" + port);
+        int timeout = m_configuration.getThriftTimeout()*1000;
+        TSocket socket = new TSocket(host, port, timeout);
+        m_transport = new TFramedTransport(socket);
+        try {
+            m_transport.open();
+            TProtocol proto = new TBinaryProtocol(m_transport);
+            m_cassandraClient = new Cassandra.Client(proto);
+            m_connectString = host + ":" + port;
+        }
+        catch (TTransportException e) {
+            m_logService.log(LogService.LOG_ERROR, "Could not establishing 
Thrift connection to the " +
+                            "Cassandra server on " + host + ":" + port, e);
+        }
+    }
+
+    private void disconnect() throws TTransportException {
+        if (m_transport != null && m_transport.isOpen()) {
+            m_logService.log(LogService.LOG_INFO, "Disconnecting Thrift 
connection to the Cassandra server on "
+                            + m_connectString);
+            try {
+               m_transport.flush();
+            } finally {
+                m_transport.close();
+            }
+        }
     }
 
     public synchronized boolean keyspaceExists(final String keyspaceName) {
@@ -110,7 +168,7 @@
     public synchronized List<String> getKeyspaces() {
         try {
             List<String> keyspaceNames = new ArrayList<String>();
-            List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
+            List<KsDef> keyspaces = getCassandraClient().describe_keyspaces();
             for (KsDef keyspace : keyspaces) {
                 keyspaceNames.add(keyspace.getName());
             }
@@ -133,7 +191,7 @@
         while (agreed == null && System.currentTimeMillis() < expires) {
             m_logService.log(LogService.LOG_INFO,
                 "Schema definitions are not yet promulgated throughout the 
cluster, waiting for schema agreement. Cluster schema's:"
-                    + getSchemaVersions());
+                                + getSchemaVersions());
             try {
                 // Wait for 2 seconds, then try again
                 Thread.sleep(2000);
@@ -144,12 +202,14 @@
         if (System.currentTimeMillis() >= expires) {
             m_logService.log(LogService.LOG_WARNING,
                 "Schema agreement timeout reached (which is " + 
m_configuration.getSchemaAgreementTimeout()
-                    + " seconds. This may cause ScmeDisagreementExceptions to 
be thrown.");
-        } else if (pastAgreement) {
+                + " seconds. This may cause ScmeDisagreementExceptions to be 
thrown.");
+        }
+        else if (pastAgreement) {
             // Schema agreement was not an issue, so log on debug level
             m_logService.log(LogService.LOG_DEBUG, "Schema agreed between all 
nodes in the cluster, agreed version='"
-                + agreed + "'");
-        } else {
+                            + agreed + "'");
+        }
+        else {
             // Schema agreement was an issue, so log on info level
             m_logService.log(LogService.LOG_INFO, "Schema agreed between all 
nodes in the cluster, agreed version='"
                             + agreed + "'");
@@ -158,12 +218,12 @@
 
     private String checkSchemaAgreement() {
         try {
-            Map<String, List<String>> schemas = 
m_cassandraServer.describe_schema_versions();
+            Map<String, List<String>> schemas = 
getCassandraClient().describe_schema_versions();
             String version = null;
             for (String key : schemas.keySet()) {
                 if (StorageProxy.UNREACHABLE.equals(key)) {
                     m_logService.log(LogService.LOG_DEBUG, "Ignoring version 
of unreacheable node(s): " + key + "="
-                        + schemas.get(key));
+                                    + schemas.get(key));
                 }
                 else {
                     if (version == null) {
@@ -189,7 +249,7 @@
     private String getSchemaVersions() {
         try {
             String result = "";
-            Map<String, List<String>> schemas = 
m_cassandraServer.describe_schema_versions();
+            Map<String, List<String>> schemas = 
getCassandraClient().describe_schema_versions();
             for (String key : schemas.keySet()) {
                 result += "version '" + key + "' : ";
                 for (String entry : schemas.get(key)) {
@@ -221,7 +281,7 @@
                 KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 
empty);
                 try {
                     ksDef.putToStrategy_options(RF_STRATEGY_OPTION, new 
Integer(replicationFactor).toString());
-                    m_cassandraServer.system_add_keyspace(ksDef);
+                    getCassandraClient().system_add_keyspace(ksDef);
                 }
                 catch (InvalidRequestException e) {
                     // Now this error may appear if some other node in the 
cluster added this keyspace
@@ -229,7 +289,7 @@
                     // keyspace exists after all, and if so, return without 
throwing an exception
                     if (keyspaceExists(name)) {
                         m_logService.log(LogService.LOG_DEBUG, "Keyspace '" + 
name + "' was not added since it "
-                            + "already existed");
+                                        + "already existed");
                         return false;
                     }
                     else {
@@ -246,11 +306,13 @@
             }
             else {
                 m_logService
-                    .log(LogService.LOG_DEBUG, "Keyspace '" + name + "' was 
not added since it already existed");
+                .log(LogService.LOG_DEBUG, "Keyspace '" + name + "' was not 
added since it already existed");
                 return false;
             }
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while adding keyspace '" +
+                            name + "'!", e);
             throw new ThriftException(e);
         }
         catch (TException e) {
@@ -260,7 +322,7 @@
 
     private boolean compareReplicationFactor(KsDef keyspace, int rf) {
         if (keyspace.getStrategy_optionsSize() > 0
-            && keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) 
{
+                        && 
keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) {
             int currentRF = 
Integer.parseInt(keyspace.getStrategy_options().get(RF_STRATEGY_OPTION));
             return currentRF == rf;
         }
@@ -277,12 +339,13 @@
                     + " may never exceed the amount of nodes "
                     + "in the cluster, which currently is " + clusterSize);
             }
-            List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
+            Cassandra.Iface client = getCassandraClient();
+            List<KsDef> keyspaces = client.describe_keyspaces();
             for (KsDef keyspace : keyspaces) {
                 if (!compareReplicationFactor(keyspace, replicationFactor)) {
                     keyspace.putToStrategy_options(RF_STRATEGY_OPTION, new 
Integer(replicationFactor).toString());
                     keyspace.setCf_defs(null);
-                    m_cassandraServer.system_update_keyspace(keyspace);
+                    client.system_update_keyspace(keyspace);
                 }
             }
         }
@@ -293,16 +356,19 @@
             throw new ThriftException(e);
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while setting RF to " +
+                            replicationFactor + "!", e);
             throw new ThriftException(e);
         }
     }
 
     public synchronized void setReplicationFactor(final String keyspace, final 
int replicationFactor) {
         try {
-            KsDef ksDef = m_cassandraServer.describe_keyspace(keyspace);
+            Cassandra.Iface client = getCassandraClient();
+            KsDef ksDef = client.describe_keyspace(keyspace);
             if (!compareReplicationFactor(ksDef, replicationFactor)) {
                 ksDef.setCf_defs(null);
-                m_cassandraServer.system_update_keyspace(ksDef);
+                client.system_update_keyspace(ksDef);
             }
         }
         catch (TException e) {
@@ -315,6 +381,8 @@
             throw new ThriftException(e).setKeyspace(keyspace);
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while setting RF to " +
+                            replicationFactor + " for keyspace '" + keyspace + 
"'!", e);
             throw new ThriftException(e).setKeyspace(keyspace);
         }
     }
@@ -325,7 +393,7 @@
             waitForSchemaAgreement();
 
             if (keyspaceExists(keyspace)) {
-                m_cassandraServer.system_drop_keyspace(keyspace);
+                getCassandraClient().system_drop_keyspace(keyspace);
 
                 // Publish an event that a keyspace has been dropped
                 Map<String, String> properties = new HashMap<String, String>();
@@ -344,13 +412,15 @@
             throw new ThriftException(e).setKeyspace(keyspace);
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while dropping keyspace '" +
+                            keyspace + "'!", e);
             throw new ThriftException(e).setKeyspace(keyspace);
         }
     }
-    
-    public CfDef getColumnFamily(final String keyspaceName, final String 
columnFamilyName) {
+
+    public synchronized CfDef getColumnFamily(final String keyspaceName, final 
String columnFamilyName) {
         try {
-            KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
+            KsDef ksDef = getCassandraClient().describe_keyspace(keyspaceName);
             List<CfDef> cfDefs = ksDef.getCf_defs();
             for (CfDef cfDef : cfDefs) {
                 if (cfDef.getName().equals(columnFamilyName)) {
@@ -367,14 +437,18 @@
             // Rethrow the checked exception as a new unchecked Thrift 
exception
             throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
         }
+        catch (TException e) {
+            // Rethrow the checked exception as a new unchecked Thrift 
exception
+            throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+        }
     }
 
-    public boolean columnFamilyExists(final String keyspaceName, final String 
columnFamilyName) {
+    public synchronized boolean columnFamilyExists(final String keyspaceName, 
final String columnFamilyName) {
         CfDef cfDef = getColumnFamily(keyspaceName, columnFamilyName);
         return cfDef != null;
     }
 
-    public String getColumnType(final String keyspaceName, final String 
columnFamilyName) {
+    public synchronized String getColumnType(final String keyspaceName, final 
String columnFamilyName) {
         Map<String, String> columnFamilyTypeMap;
         if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
             columnFamilyTypeMap = 
m_keyspaceColumnFamilyTypes.get(keyspaceName);
@@ -387,7 +461,7 @@
         }
         else {
             try {
-                KsDef ksDef = 
m_cassandraServer.describe_keyspace(keyspaceName);
+                KsDef ksDef = 
getCassandraClient().describe_keyspace(keyspaceName);
                 List<CfDef> cfDefs = ksDef.getCf_defs();
                 for (CfDef cfDef : cfDefs) {
                     if (cfDef.getName().equals(columnFamilyName)) {
@@ -406,13 +480,17 @@
                 // Rethrow the checked exception as a new unchecked Thrift 
exception
                 throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
             }
+            catch (TException e) {
+                // Rethrow the checked exception as a new unchecked Thrift 
exception
+                throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+            }
         }
     }
 
-    public List<String> getColumnFamilies(final String keyspaceName) {
+    public synchronized List<String> getColumnFamilies(final String 
keyspaceName) {
         try {
             List<String> cfNames = new ArrayList<String>();
-            KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
+            KsDef ksDef = getCassandraClient().describe_keyspace(keyspaceName);
             List<CfDef> cfDefs = ksDef.getCf_defs();
             for (CfDef cfDef : cfDefs) {
                 cfNames.add(cfDef.getName());
@@ -427,6 +505,10 @@
             // Rethrow the checked exception as a new unchecked Thrift 
exception
             throw new ThriftException(e).setKeyspace(keyspaceName);
         }
+        catch (TException e) {
+            // Rethrow the checked exception as a new unchecked Thrift 
exception
+            throw new ThriftException(e).setKeyspace(keyspaceName);
+        }
     }
 
     public synchronized boolean addColumnFamily(final String keyspace, final 
String cfName, final String columnType,
@@ -439,9 +521,9 @@
 
         return addColumnFamily(keyspace, cfDef);
     }
-    
+
     @Override
-    public boolean addColumnFamily(String keyspace, CfDef cfDef) {
+    public synchronized boolean addColumnFamily(String keyspace, CfDef cfDef) {
         // Before we create the columnFamily, the cluster must agree upon the 
schema
         waitForSchemaAgreement();
 
@@ -452,15 +534,16 @@
         }
         try {
             if (!columnFamilyExists(keyspace, cfName)) {
-                m_cassandraServer.set_keyspace(keyspace);
-                m_cassandraServer.system_add_column_family(cfDef);
+                Cassandra.Iface client = getCassandraClient();
+                client.set_keyspace(keyspace);
+                client.system_add_column_family(cfDef);
                 m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + 
cfName + "' has been added to keyspace '"
-                    + keyspace + "'");
+                                + keyspace + "'");
                 return true;
             }
             else {
                 m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" + 
cfName + "' was not added to keyspace '"
-                    + keyspace + "' since it already existed");
+                                + keyspace + "' since it already existed");
                 return false;
             }
         }
@@ -471,12 +554,14 @@
             throw new 
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while adding '" +
+                            cfName + "' to the keyspace '" + keyspace + "'!", 
e);
             throw new 
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
         }
     }
-    
+
     @Override
-    public boolean updateColumnFamily(String keyspace, CfDef cfDef) {
+    public synchronized boolean updateColumnFamily(String keyspace, CfDef 
cfDef) {
         // Before we create the columnFamily, the cluster must agree upon the 
schema
         waitForSchemaAgreement();
 
@@ -488,15 +573,27 @@
         try {
             CfDef oldCfDef = getColumnFamily(keyspace, cfName);
             if (oldCfDef != null) {
-                m_cassandraServer.set_keyspace(keyspace);
-                cfDef.setId(oldCfDef.getId());
-                m_cassandraServer.system_update_column_family(cfDef);
-                m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + 
cfName + "' has been updated in the keyspace '"
-                    + keyspace + "'");
-                return true;
+                // First compare if there is really a difference.
+                if (!CfDefComparator.equal(oldCfDef, cfDef)) {
+                    Cassandra.Iface client = getCassandraClient();
+                    client.set_keyspace(keyspace);
+                    cfDef.setId(oldCfDef.getId());
+                    client.system_update_column_family(cfDef);
+                    m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + 
cfName
+                        + "' has been updated in the keyspace '"
+                        + keyspace + "'");
+                    return true;
+                }
+                else {
+                    m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" + 
cfName
+                        + "' was not updated in the keyspace '"
+                        + keyspace + "', no changes detected");
+                    return false;
+                }
             }
             else {
-                m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" + 
cfName + "' was not updated in the keyspace '"
+                m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" + 
cfName
+                    + "' was not updated in the keyspace '"
                     + keyspace + "' since it does not exist");
                 return false;
             }
@@ -508,6 +605,11 @@
             throw new 
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
         }
         catch (SchemaDisagreementException e) {
+            m_logService.log(LogService.LOG_ERROR, "A 
SchemaDisagreementException occurred while updating CF '" +
+                            cfName + "'!", e);
+            throw new 
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
+        }
+        catch (UnsupportedEncodingException e) {
             throw new 
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
         }
     }
@@ -529,17 +631,19 @@
         return true;
     }
 
-    public boolean isColumnFamilyChanged(final String keyspace, final String 
cfName, final String columnType,
+    public synchronized boolean isColumnFamilyChanged(final String keyspace, 
final String cfName,
+        final String columnType,
         final String comparatorType, final String subComparatorType) {
         try {
-            m_cassandraServer.set_keyspace(keyspace);
-            KsDef ksDef = m_cassandraServer.describe_keyspace(keyspace);
+            Cassandra.Iface client = getCassandraClient();
+            client.set_keyspace(keyspace);
+            KsDef ksDef = client.describe_keyspace(keyspace);
             List<CfDef> cfDefs = ksDef.getCf_defs();
             for (CfDef cfDef : cfDefs) {
                 if (cfDef.getName().equals(cfName)) {
                     if (!cfDef.column_type.equals(columnType)
-                        || !equalComparator(cfDef.comparator_type, 
comparatorType)
-                        || !equalComparator(cfDef.subcomparator_type, 
subComparatorType)) {
+                                    || !equalComparator(cfDef.comparator_type, 
comparatorType)
+                                    || 
!equalComparator(cfDef.subcomparator_type, subComparatorType)) {
                         return true;
                     }
                 }

Added: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
    Fri Dec 16 14:40:32 2011
@@ -0,0 +1,313 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.application.service;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+
+/**
+ * This is a helper class to detect if there are any changes in a given 
ColumnFamily definition
+ * with respect to an existing ColumnFamily definition. Note that the normal 
compareTo method in
+ * CfDef also considers two CfDef's to be different in case the first def has 
a columntype and the
+ * second one did not define it. If a column type is left undefined the 
special flag 'isSetColumn_type'
+ * is set to false and when updating the CF, this property is ignored. As this 
will not result in an
+ * update of the ColumnFamily, we do not consider this as a change.
+ * For that reason, this class implements an equals method that returns only 
false if invoking
+ * updateColumnFamily would effectively change anything in that CF.
+ * 
+ * @author <a href=mailto:[email protected]>Amdatu Project Team</a>
+ */
+public class CfDefComparator {
+    /**
+     * Returns if there are any properties in the new definition that would 
effective change
+     * anything in the ColumnFamily (represented by the old ColumnFamily 
definition) when passed
+     * to updateColumnFamily.
+     * @param oldDef The current ColumnFamily definition
+     * @param newDef The new ColumnFamily definition
+     * @return true if passing the newDef to the updateColumnFamily method 
would effectively change
+     * anything in the CF
+     * @throws UnsupportedEncodingException
+     */
+    public static boolean equal(CfDef oldDef, CfDef newDef) throws 
UnsupportedEncodingException {
+        int lastComparison = 0;
+
+        lastComparison = 
Boolean.valueOf(oldDef.isSetKeyspace()).compareTo(newDef.isSetKeyspace());
+        if (lastComparison != 0) {
+            return false;
+        }
+        if (oldDef.isSetKeyspace()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.keyspace, newDef.keyspace);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        lastComparison = 
Boolean.valueOf(oldDef.isSetName()).compareTo(newDef.isSetName());
+        if (lastComparison != 0) {
+            return false;
+        }
+        if (oldDef.isSetName()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.name, newDef.name);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetColumn_type()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.column_type, newDef.column_type);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetComparator_type()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.comparator_type, 
newDef.comparator_type);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetSubcomparator_type()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.subcomparator_type, 
newDef.subcomparator_type);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetComment()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.comment, newDef.comment);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetRow_cache_size()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_size, 
newDef.row_cache_size);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetKey_cache_size()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_cache_size, 
newDef.key_cache_size);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetRead_repair_chance()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.read_repair_chance, 
newDef.read_repair_chance);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetGc_grace_seconds()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.gc_grace_seconds, 
newDef.gc_grace_seconds);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetDefault_validation_class()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.default_validation_class,
+                    newDef.default_validation_class);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetId()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.id, newDef.id);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMin_compaction_threshold()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.min_compaction_threshold,
+                    newDef.min_compaction_threshold);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMax_compaction_threshold()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.max_compaction_threshold,
+                    newDef.max_compaction_threshold);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetRow_cache_save_period_in_seconds()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_save_period_in_seconds,
+                    newDef.row_cache_save_period_in_seconds);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetKey_cache_save_period_in_seconds()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_cache_save_period_in_seconds,
+                    newDef.key_cache_save_period_in_seconds);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMemtable_flush_after_mins()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_flush_after_mins,
+                    newDef.memtable_flush_after_mins);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMemtable_throughput_in_mb()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_throughput_in_mb,
+                    newDef.memtable_throughput_in_mb);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMemtable_operations_in_millions()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_operations_in_millions,
+                    newDef.memtable_operations_in_millions);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetReplicate_on_write()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.replicate_on_write, 
newDef.replicate_on_write);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetMerge_shards_chance()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.merge_shards_chance, 
newDef.merge_shards_chance);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetKey_validation_class()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_validation_class, 
newDef.key_validation_class);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetRow_cache_provider()) {
+            lastComparison =
+                
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_provider, 
newDef.row_cache_provider);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetKey_alias()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_alias, newDef.key_alias);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        List<ColumnDef> oldColDefs = oldDef.getColumn_metadata();
+        List<ColumnDef> newColDefs = newDef.getColumn_metadata();
+        int newCols = newColDefs == null ? 0 : newColDefs.size();
+        int oldCols = oldColDefs == null ? 0 : oldColDefs.size();
+        if (newCols != oldCols) {
+            return false;
+        }
+        else if (newCols > 0) {
+            // Compare by id
+            int compared = 0;
+            for (ColumnDef oldColDef : oldColDefs) {
+                String oldName = new String(oldColDef.getName(), "UTF-8");
+                for (ColumnDef newColDef : newColDefs) {
+                    String newName = new String(newColDef.getName(), "UTF-8");
+                    if (oldName.equals(newName)) {
+                        compared++;
+                        if (!equal(oldColDef, newColDef)) {
+                            return false;
+                        }
+                    }
+                }
+            }
+            return compared == newCols;
+        }
+        return true;
+    }
+
+    private static boolean equal(ColumnDef oldDef, ColumnDef newDef) {
+        int lastComparison = 0;
+
+        lastComparison = 
Boolean.valueOf(oldDef.isSetName()).compareTo(newDef.isSetName());
+        if (lastComparison != 0) {
+            return false;
+        }
+        if (newDef.isSetName()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.name, newDef.name);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetValidation_class()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.validation_class, 
newDef.validation_class);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetIndex_type()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.index_type, newDef.index_type);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+
+        if (newDef.isSetIndex_name()) {
+            lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(oldDef.index_name, newDef.index_name);
+            if (lastComparison != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
    (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
    Fri Dec 16 14:40:32 2011
@@ -79,10 +79,10 @@
         startThread.join();
 
         m_daemonService = new CassandraDaemonServiceImpl();
-        m_daemonService.start();
         m_daemonService.setConfiguration(new 
CassandraConfigurationServiceMock());
         m_daemonService.setLogService(new LogServiceMock());
         m_daemonService.setEventAdmin(new EventAdminMock());
+        m_daemonService.start();
     }
 
     public class CassandraDaemonStartThread extends Thread {

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
   Fri Dec 16 14:40:32 2011
@@ -60,4 +60,8 @@
     public int getSchemaAgreementTimeout() {
         return 30;
     }
+    
+    public int getThriftTimeout() {
+        return 10;
+    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
   Fri Dec 16 14:40:32 2011
@@ -15,6 +15,12 @@
  */
 package org.amdatu.cassandra.listener;
 
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LexicalUUIDType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.thrift.CfDef;
 
 /**
@@ -67,30 +73,32 @@
         /**
          * A column or super column name comparator of type bytes.
          */
-        BYTESTYPE("BytesType"),
+        BYTESTYPE(BytesType.class.getName()),
 
             /**
              * A column or super column name comparator of type ASCII.
              */
-            ASCIITYPE("AsciiType"),
+            ASCIITYPE(AsciiType.class.getName()),
 
             /**
              * A column or super column name comparator of type UTF-8 string.
              */
-            UTF8TYPE("UTF8Type"),
+            UTF8TYPE(UTF8Type.class.getName()),
 
             /**
              * A column or super column name non-time based comparator. It is 
compared lexically, by byte value.
              */
-            LEXICALUUIDTYPE("LexicalUUIDType"),
+            LEXICALUUIDTYPE(LexicalUUIDType.class.getName()),
+            
             /**
              * A column or super column name non-time based comparator. It 
uses a version 1 UUID.
              */
-            TIMEUUIDTYPE("TimeUUIDType"),
+            TIMEUUIDTYPE(TimeUUIDType.class.getName()),
+            
             /**
              * A column or super column name comparator of type Long.
              */
-            LONGTYPE("LongType");
+            LONGTYPE(LongType.class.getName());
 
         private String m_value;
 
@@ -103,22 +111,22 @@
         }
         
         public static CompareType get(String value) {
-            if (value.equalsIgnoreCase(BYTESTYPE.getValue())) {
+            if (value.equalsIgnoreCase(BYTESTYPE.getValue()) || 
BYTESTYPE.getValue().endsWith("." + value)) {
                 return BYTESTYPE;
             }
-            else if (value.equalsIgnoreCase(ASCIITYPE.getValue())) {
+            else if (value.equalsIgnoreCase(ASCIITYPE.getValue()) || 
ASCIITYPE.getValue().endsWith("." + value)) {
                 return ASCIITYPE;
             }
-            else if (value.equalsIgnoreCase(UTF8TYPE.getValue())) {
+            else if (value.equalsIgnoreCase(UTF8TYPE.getValue()) || 
UTF8TYPE.getValue().endsWith("." + value)) {
                 return UTF8TYPE;
             }
-            else if (value.equalsIgnoreCase(LEXICALUUIDTYPE.getValue())) {
+            else if (value.equalsIgnoreCase(LEXICALUUIDTYPE.getValue()) || 
LEXICALUUIDTYPE.getValue().endsWith("." + value)) {
                 return LEXICALUUIDTYPE;
             }
-            else if (value.equalsIgnoreCase(TIMEUUIDTYPE.getValue())) {
+            else if (value.equalsIgnoreCase(TIMEUUIDTYPE.getValue()) || 
TIMEUUIDTYPE.getValue().endsWith("." + value)) {
                 return TIMEUUIDTYPE;
             }
-            else if (value.equalsIgnoreCase(LONGTYPE.getValue())) {
+            else if (value.equalsIgnoreCase(LONGTYPE.getValue()) || 
LONGTYPE.getValue().endsWith("." + value)) {
                 return LONGTYPE;
             }
             return null;

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      Fri Dec 16 14:40:32 2011
@@ -23,7 +23,6 @@
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.thrift.TException;
 import org.osgi.service.log.LogService;
@@ -40,9 +39,9 @@
     private volatile LogService m_logService;
     private volatile CassandraDaemonService m_daemonService;
 
-    public void columnFamilyProviderAdded(final ColumnFamilyProvider provider) 
{
+    public synchronized void columnFamilyProviderAdded(final 
ColumnFamilyProvider provider) {
         try {
-            List<KsDef> keyspaceDefinitions = 
m_daemonService.getCassandraServer().describe_keyspaces();
+            List<String> keyspaceDefinitions = m_daemonService.getKeyspaces();
             ColumnFamilyDefinition[] columnFamilies = 
provider.getColumnFamilies();
             for (ColumnFamilyDefinition columnFamily : columnFamilies) {
 
@@ -66,9 +65,9 @@
                             "it is recommended not to use it anymore. The 
feature may be removed in future versions. " +
                             "For more information, see 
http://jira.amdatu.org/jira/browse/AMDATUCASSANDRA-118";;
                     m_logService.log(LogService.LOG_WARNING, msg);
-                    for (KsDef keyspaceDef : keyspaceDefinitions) {
-                        if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName())) 
{
-                            addOrUpdateColumnFamily(m_daemonService, 
keyspaceDef.getName(), columnFamily);
+                    for (String keyspaceDef : keyspaceDefinitions) {
+                        if (!Table.SYSTEM_TABLE.equals(keyspaceDef)) {
+                            addOrUpdateColumnFamily(m_daemonService, 
keyspaceDef, columnFamily);
                         }
                     }
                 }

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
   Fri Dec 16 14:40:32 2011
@@ -174,11 +174,11 @@
     }
 
     private void testUpdateColumnFamily() throws Exception {
-        // Update with the same properties, should succeed
+        // Update with the same properties, should return false (no changes)
         CfDef cfDef = new CfDef(TEST_KS, TEST_CF);
         cfDef.setColumn_type(ColumnType.STANDARD.getValue());
         cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
-        Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+        Assert.assertFalse(m_daemon.updateColumnFamily(TEST_KS, cfDef));
         
         // Update with change standard -> super, should throw an exception
         cfDef.setColumn_type(ColumnType.SUPER.getValue());
@@ -196,6 +196,9 @@
         cfDef.addToColumn_metadata(cDef);
         Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
         
+        // Invoke update without an effective change, should return false
+        Assert.assertFalse(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+        
         // Update the ColumnDefinition without the ColDef, the ColDef should 
be removed
         cfDef = new CfDef(TEST_KS, TEST_CF);
         cfDef.setColumn_type(ColumnType.STANDARD.getValue());

Modified: 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
==============================================================================
--- 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
  (original)
+++ 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
  Fri Dec 16 14:40:32 2011
@@ -103,4 +103,14 @@
 # is however no longer blocked then the schema agreement timeout. If the 
timeout kicks in, the Column Family or Keyspace
 # is created or dropped, resulting in a possible SchemaDisagreementException 
internally thrown by Cassandra.
 # The default is 30 seconds, schema agreement usually takes place within 5-10 
seconds.
-schema_agreement_timeout=30
\ No newline at end of file
+schema_agreement_timeout=30
+
+# The timeout (in seconds) for Thrift connections used by the Cassandra daemon 
service. All schema changes
+# are handled by this service, which opens a Thrift client to a central 
'schema master' to send all changes to
+# (schema modifications cannot be handles by different nodes concurrently).
+# This timeout determines how long the client will wait for a response. When a 
timeout occurs, the schema update
+# will fail.
+# Note that if the central 'schema master' is down, this timeout determines 
how long it will take for the daemon
+# to detect that it is down and switch to the new schema master. It is 
therefore recommended not to increase this
+# timeout too much.
+thrift_timeout=10
\ No newline at end of file

Modified: 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
==============================================================================
--- 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
       (original)
+++ 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
       Fri Dec 16 14:40:32 2011
@@ -61,4 +61,8 @@
     public int getSchemaAgreementTimeout() {
         return 30;
     }
+    
+    public int getThriftTimeout() {
+        return 10;
+    }
 }
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to