Author: [email protected]
Date: Tue Nov  1 11:59:42 2011
New Revision: 1698

Log:
[AMDATUCASSANDRA-127] Merged fix from 0.2.1 branch


Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
   
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
   
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      Tue Nov  1 11:59:42 2011
@@ -113,6 +113,11 @@
      * Configuration key that stores the name of the cluster that this node is 
part of.
      */
     String CLUSTER_NAME = "clustername";
+    
+    /**
+     * Configuration key that stores the schema agreement timeout (in seconds).
+     */
+    String SCHEMA_AGREEMENT_TIMEOUT = "schema_agreement_timeout";
 
     /**
      * Returns the default replication factor for new keyspaces. The 
replication factor determines
@@ -186,4 +191,13 @@
      * @return the name of the cluster that this node is part of.
      */
     String getClustername();
+    
+    /**
+     * Returns the timeout (in seconds) to wait for schema agreement. When a 
Keyspace or ColumnFamily is added or dropped, 
+     * the thread is blocked until the while cluster agrees upon the schema 
version (neglecting unreachable nodes). The thread
+     * is however no longer blocked then the schema agreement timeout. If the 
timeout kicks in, the Column Family or Keyspace
+     * is created or dropped, resulting in a possible 
SchemaDisagreementException internally thrown by Cassandra.
+     * @return the timeout (in seconds) to wait for schema agreement
+     */
+    int getSchemaAgreementTimeout();
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  Tue Nov  1 11:59:42 2011
@@ -40,6 +40,8 @@
     // Statics
     private static final String STORAGE_CONF_SOURCE = "conf/cassandra.yaml";
     private static final String LOG4J_CONF_SOURCE = "conf/log4j.properties";
+    private static final int DEFAULT_SCHEMA_AGREEMENT_TIMEOUT = 30;
+    
 
     // Reference to the LogService
     private volatile LogService m_logService;
@@ -56,6 +58,7 @@
     private int m_rpcPort;
     private int m_storagePort;
     private String m_clusterName;
+    private int m_schemaAgreementTimeout;
 
     /**
      * The init() method is invoked by the Felix dependency manager. It allows 
us to initialize our service. In this
@@ -130,6 +133,12 @@
             m_rpcPort = toInt(dictionary.get(RPC_PORT));
             m_storagePort = toInt(dictionary.get(STORAGE_PORT));
             m_clusterName = dictionary.get(CLUSTER_NAME).toString();
+            if (dictionary.get(SCHEMA_AGREEMENT_TIMEOUT) != null) {
+                m_schemaAgreementTimeout = 
toInt(dictionary.get(SCHEMA_AGREEMENT_TIMEOUT));
+            } else {
+                // For backwards compatibility
+                m_schemaAgreementTimeout = DEFAULT_SCHEMA_AGREEMENT_TIMEOUT;
+            }
         }
     }
 
@@ -206,4 +215,8 @@
     public String getClustername() {
         return m_clusterName;
     }
+    
+    public int getSchemaAgreementTimeout() {
+        return m_schemaAgreementTimeout;
+    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 Tue Nov  1 11:59:42 2011
@@ -24,6 +24,7 @@
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.amdatu.cassandra.application.ThriftException;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
@@ -67,7 +68,7 @@
         m_logService.log(LogService.LOG_INFO, getClass().getName() + " service 
initialized");
     }
 
-    public void start() {
+    public void start() throws TException, InvalidRequestException {
         m_cassandraServer = new CassandraServer();
     }
 
@@ -113,12 +114,96 @@
         }
     }
 
+    private void waitForSchemaAgreement() {
+        String agreed = checkSchemaAgreement();
+        long expires = System.currentTimeMillis() + 1000 * 
m_configuration.getSchemaAgreementTimeout();
+        boolean pastAgreement = (agreed != null);
+        while (agreed == null && System.currentTimeMillis() < expires) {
+            m_logService.log(LogService.LOG_INFO,
+                "Schema definitions are not yet promulgated throughout the 
cluster, waiting for schema agreement. Cluster schema's:"
+                    + getSchemaVersions());
+            try {
+                // Wait for 2 seconds, then try again
+                Thread.sleep(2000);
+            }
+            catch (InterruptedException e) {}
+            agreed = checkSchemaAgreement();
+        }
+        if (System.currentTimeMillis() >= expires) {
+            m_logService.log(LogService.LOG_WARNING,
+                "Schema agreement timeout reached (which is " + 
m_configuration.getSchemaAgreementTimeout()
+                    + " seconds. This may cause ScmeDisagreementExceptions to 
be thrown.");
+        } else if (pastAgreement) {
+            // Schema agreement was not an issue, so log on debug level
+            m_logService.log(LogService.LOG_DEBUG, "Schema agreed between all 
nodes in the cluster, agreed version='"
+                + agreed + "'");
+        } else {
+            // Schema agreement was an issue, so log on info level
+            m_logService.log(LogService.LOG_INFO, "Schema agreed between all 
nodes in the cluster, agreed version='"
+                            + agreed + "'");
+        }
+    }
+
+    private String checkSchemaAgreement() {
+        try {
+            Map<String, List<String>> schemas = 
m_cassandraServer.describe_schema_versions();
+            String version = null;
+            for (String key : schemas.keySet()) {
+                if (StorageProxy.UNREACHABLE.equals(key)) {
+                    m_logService.log(LogService.LOG_DEBUG, "Ignoring version 
of unreacheable node(s): " + key + "="
+                        + schemas.get(key));
+                }
+                else {
+                    if (version == null) {
+                        version = key;
+                    }
+                    else {
+                        if (!version.equals(key)) {
+                            return null;
+                        }
+                    }
+                }
+            }
+            return version;
+        }
+        catch (TException e) {
+            return null;
+        }
+        catch (InvalidRequestException e) {
+            return null;
+        }
+    }
+
+    private String getSchemaVersions() {
+        try {
+            String result = "";
+            Map<String, List<String>> schemas = 
m_cassandraServer.describe_schema_versions();
+            for (String key : schemas.keySet()) {
+                result += "version '" + key + "' : ";
+                for (String entry : schemas.get(key)) {
+                    result += entry + " ";
+                }
+                result += System.getProperty("line.separator");
+            }
+            return result;
+        }
+        catch (TException e) {
+            return null;
+        }
+        catch (InvalidRequestException e) {
+            return null;
+        }
+    }
+
     public synchronized boolean addKeyspace(final String name) {
         return addKeyspace(name, 
m_configuration.getDefaultReplicationFactor());
     }
 
     public synchronized boolean addKeyspace(final String name, final int 
replicationFactor) {
         try {
+            // Before we create the keyspace, the cluster must agree upon the 
schema
+            waitForSchemaAgreement();
+
             if (!keyspaceExists(name)) {
                 List<CfDef> empty = new ArrayList<CfDef>();
                 KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 
empty);
@@ -163,7 +248,7 @@
 
     private boolean compareReplicationFactor(KsDef keyspace, int rf) {
         if (keyspace.getStrategy_optionsSize() > 0
-                        && 
keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) {
+            && keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) 
{
             int currentRF = 
Integer.parseInt(keyspace.getStrategy_options().get(RF_STRATEGY_OPTION));
             return currentRF == rf;
         }
@@ -224,6 +309,9 @@
 
     public synchronized boolean dropKeyspace(final String keyspace) {
         try {
+            // Before we drop the keyspace, the cluster must agree upon the 
schema
+            waitForSchemaAgreement();
+
             if (keyspaceExists(keyspace)) {
                 m_cassandraServer.system_drop_keyspace(keyspace);
 
@@ -326,6 +414,9 @@
 
     public synchronized boolean addColumnFamily(final String keyspace, final 
String cfName, final String columnType,
         final String comparatorType, final String subComparatorType) {
+        // Before we create the columnFamily, the cluster must agree upon the 
schema
+        waitForSchemaAgreement();
+
         if (keyspace.equals(Table.SYSTEM_TABLE)) {
             throw new ThriftException("ColumnFamily's cannot be added to 
Cassandra's system keyspace");
         }

Modified: 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
 Tue Nov  1 11:59:42 2011
@@ -55,5 +55,9 @@
 
     public String getClustername() {
         return "Amdatu Unit Test Cluster";
+    }
+    
+    public int getSchemaAgreementTimeout() {
+        return 30;
     }
 }

Modified: 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
==============================================================================
--- 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
  (original)
+++ 
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
  Tue Nov  1 11:59:42 2011
@@ -96,4 +96,11 @@
 rpc_port=${cassandra.rpc_port}
 
 # storage port
-storage_port=${cassandra.storage_port}
\ No newline at end of file
+storage_port=${cassandra.storage_port}
+
+# The timeout (in seconds) to wait for schema agreement. When a Keyspace or 
ColumnFamily is added or dropped,
+# the thread is blocked until the while cluster agrees upon the schema version 
(neglecting unreachable nodes). The thread
+# is however no longer blocked then the schema agreement timeout. If the 
timeout kicks in, the Column Family or Keyspace
+# is created or dropped, resulting in a possible SchemaDisagreementException 
internally thrown by Cassandra.
+# The default is 30 seconds, schema agreement usually takes place within 5-10 
seconds.
+schema_agreement_timeout=30
\ No newline at end of file

Modified: 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
==============================================================================
--- 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
       (original)
+++ 
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
       Tue Nov  1 11:59:42 2011
@@ -57,4 +57,8 @@
     public String getClustername() {
         return "Amdatu Unit Test Cluster";
     }
+    
+    public int getSchemaAgreementTimeout() {
+        return 30;
+    }
 }
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to