Author: [email protected]
Date: Wed Feb 29 16:50:59 2012
New Revision: 2132

Log:
[AMDATUCASSANDRA-176] Added monitor for r/w locks on the Keyspace -> 
ColumnFamily type map

Modified:
   
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java

Modified: 
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
--- 
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
  (original)
+++ 
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
  Wed Feb 29 16:50:59 2012
@@ -36,6 +36,10 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.cassandra.thrift.CfDef;
 import org.osgi.service.event.Event;
@@ -61,7 +65,9 @@
     private volatile CassandraClientConfigurationService m_configuration = 
null;
 
     // Local cache of ColumnTypes of all ColumnFamilies for each keyspace
-    private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new 
HashMap<String, Map<String, String>>();
+    private final Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes 
=
+        new ConcurrentHashMap<String, Map<String, String>>();
+    private final ReentrantReadWriteLock m_ksCfMaplock = new 
ReentrantReadWriteLock();
 
     /**
      * The init() method is invoked by the Felix dependency manager.
@@ -173,7 +179,7 @@
         while (agreed == null && System.currentTimeMillis() < expires) {
             m_logService.log(LogService.LOG_INFO,
                 "Schema definitions are not yet promulgated throughout the 
cluster, waiting for schema agreement."
-                + " Cluster schema's:" + getSchemaVersions());
+                    + " Cluster schema's:" + getSchemaVersions());
             try {
                 // Wait for 2 seconds, then try again
                 Thread.sleep(2000);
@@ -343,7 +349,15 @@
                 getHectorCluster().dropKeyspace(keyspace, true);
 
                 // Purge the keyspace from the keyspace -> CF type map
-                m_keyspaceColumnFamilyTypes.remove(keyspace);
+                // Get a write lock before removing the keyspace from the map
+                WriteLock writeLock = m_ksCfMaplock.writeLock();
+                try {
+                    writeLock.lock();
+                    m_keyspaceColumnFamilyTypes.remove(keyspace);
+                }
+                finally {
+                    writeLock.unlock();
+                }
 
                 // Publish an event that a keyspace has been dropped
                 Map<String, String> properties = new HashMap<String, String>();
@@ -384,34 +398,49 @@
         return cfDef != null;
     }
 
-    public synchronized String getColumnType(final String keyspaceName, final 
String columnFamilyName) {
+    public String getColumnType(final String keyspaceName, final String 
columnFamilyName) {
         Map<String, String> columnFamilyTypeMap;
-        if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
-            columnFamilyTypeMap = 
m_keyspaceColumnFamilyTypes.get(keyspaceName);
-        }
-        else {
-            columnFamilyTypeMap = new HashMap<String, String>();
+
+        // Get a read lock before getting the keyspace from the map
+        ReadLock readLock = m_ksCfMaplock.readLock();
+        try {
+            readLock.lock();
+            if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
+                columnFamilyTypeMap = 
m_keyspaceColumnFamilyTypes.get(keyspaceName);
+            }
+            else {
+                columnFamilyTypeMap = new HashMap<String, String>();
+            }
+            if (columnFamilyTypeMap.containsKey(columnFamilyName)) {
+                return columnFamilyTypeMap.get(columnFamilyName);
+            }
         }
-        if (columnFamilyTypeMap.containsKey(columnFamilyName)) {
-            return columnFamilyTypeMap.get(columnFamilyName);
+        finally {
+            readLock.unlock();
         }
-        else {
-            try {
-                KeyspaceDefinition ksDef = 
getHectorCluster().describeKeyspace(keyspaceName);
-                List<ColumnFamilyDefinition> cfDefs = ksDef.getCfDefs();
-                for (ColumnFamilyDefinition cfDef : cfDefs) {
-                    if (cfDef.getName().equals(columnFamilyName)) {
-                        columnFamilyTypeMap.put(columnFamilyName, 
cfDef.getColumnType().getValue());
-                        m_keyspaceColumnFamilyTypes.put(keyspaceName, 
columnFamilyTypeMap);
-                        return cfDef.getColumnType().getValue();
-                    }
+
+        // Get a write lock before update the keyspace in the map
+        WriteLock writeLock = m_ksCfMaplock.writeLock();
+        try {
+            writeLock.lock();
+
+            KeyspaceDefinition ksDef = 
getHectorCluster().describeKeyspace(keyspaceName);
+            List<ColumnFamilyDefinition> cfDefs = ksDef.getCfDefs();
+            for (ColumnFamilyDefinition cfDef : cfDefs) {
+                if (cfDef.getName().equals(columnFamilyName)) {
+                    columnFamilyTypeMap.put(columnFamilyName, 
cfDef.getColumnType().getValue());
+                    m_keyspaceColumnFamilyTypes.put(keyspaceName, 
columnFamilyTypeMap);
+                    return cfDef.getColumnType().getValue();
                 }
-                return null;
-            }
-            catch (HectorException e) {
-                // Throw the unchecked Hector exception as a new unchecked 
Thrift exception
-                throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
             }
+            return null;
+        }
+        catch (HectorException e) {
+            // Throw the unchecked Hector exception as a new unchecked Thrift 
exception
+            throw new 
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+        }
+        finally {
+            writeLock.unlock();
         }
     }
 
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to