Author: ivol37 at gmail.com
Date: Mon Jan 24 10:14:53 2011
New Revision: 689

Log:
[AMDATU-252] Fixed timing issue by using snapshots. Each run a snapshot is 
made, compared to the previous snapshots and if there are any changes those 
changes as contained by the new snapshots are handled. This prevents that 
during handling of the updates keyspaces or CF's are added, removed or updated. 
Those changes could fail to be handled as it all depends on timing; when a new 
keyspace was added just after new keyspaces are handles, the change would not 
be picked up.

Modified:
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
      Mon Jan 24 10:14:53 2011
@@ -21,6 +21,7 @@
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
@@ -85,7 +86,7 @@
      *
      */
     class InpectKeyspaceColumnFamilyThread extends Thread {
-        Map<String, List<String>> m_keyspaceColumnFamilyMap = new 
HashMap<String, List<String>>();
+        Map<String, List<String>> m_snapshot = new HashMap<String, 
List<String>>();
 
         @Override
         public void run() {
@@ -93,15 +94,19 @@
                 while(!isInterrupted()) {
                     // Inspect available keyspaces
                     try {
-                        // Only compare keyspaces with CPM's if there was a 
keyspace change
-                        // in the meantime
-                        if (checkForUpdates()) {
+                        // Only compare keyspaces with CPM's if there was a 
keyspace change in the meantime
+                        // We make a snapshot before we start checking if any 
updates are needed and performing them.
+                        // Updates are needed when the snaphot taken now 
differs from the one taken previously
+                        Map<String, List<String>> newSnapshot = getSnapshot();
+                        if (!m_snapshot.equals(newSnapshot)) {
+                            m_snapshot = newSnapshot;
                             onKeyspaceAdded();
                             onKeyspaceDropped();
                             onColumnFamilyAdded();
                             onColumnFamilyRemoved();
+                        } else {
+                            m_snapshot = newSnapshot;
                         }
-                        m_keyspaceColumnFamilyMap = 
getKeyspaceColumnFamilyMap();
                     }
                     catch (TException e) {
                         m_logService.log(LogService.LOG_ERROR, "Could not 
retrieve keyspaces. Cause: " + e.getMessage(), e);
@@ -120,10 +125,12 @@
                 }
             }
             catch (InterruptedException e) {
+                m_logService.log(LogService.LOG_INFO, "Cassandra update 
listener thread interrupted.");
             }
         }
 
-        private Map<String, List<String>> getKeyspaceColumnFamilyMap() throws 
TException, InvalidRequestException, NotFoundException {
+        // Creates and returns a snapshot of the currently available keyspaces 
and columnfamilies
+        private Map<String, List<String>> getSnapshot() throws TException, 
InvalidRequestException, NotFoundException {
             Map<String, List<String>> map = new HashMap<String, 
List<String>>();
             for (String keyspace : m_daemonService.getKeyspaces()) {
                 map.put(keyspace, m_daemonService.getColumnFamilies(keyspace));
@@ -131,21 +138,10 @@
             return map;
         }
 
-        private boolean checkForUpdates() throws TException, 
InvalidRequestException, NotFoundException {
-            if (m_keyspaceColumnFamilyMap.keySet() == null && 
m_daemonService.getKeyspaces() == null) {
-                return false;
-            } else if (m_daemonService.getKeyspaces() == null) {
-                return true;
-            } else {
-
-                return 
!getKeyspaceColumnFamilyMap().equals(m_keyspaceColumnFamilyMap);
-            }
-        }
-
         // Loop over all keyspaces and register a Cassandra persistence 
manager when there is none
         // available for that keyspace
         private void onKeyspaceAdded() throws InvalidSyntaxException, 
TException, InvalidRequestException, NotFoundException {
-            List<String> keyspaces = m_daemonService.getKeyspaces();
+            Set<String> keyspaces = m_snapshot.keySet();
             if (keyspaces != null) {
                 for (String keyspace : keyspaces) {
                     String filter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
@@ -170,13 +166,13 @@
                 for (ServiceReference ref : servRefs) {
                     ColumnFamilyProvider provider = (ColumnFamilyProvider) 
m_context.getService(ref);
                     if (isKeyspaceGlobal(provider)) {
-                        List<String> keyspaces = 
m_daemonService.getKeyspaces();
+                        Set<String> keyspaces = m_snapshot.keySet();
                         if (keyspaces != null) {
                             for (String keyspace : keyspaces) {
                                 if (!Table.SYSTEM_TABLE.equals(keyspace)) {
                                     // Verify that the ColumnFamily for this 
keyspace global provider is available in this keyspace
                                     for (ColumnFamilyDefinition cfDef : 
provider.getColumnFamilies()) {
-                                        if 
(!m_daemonService.getColumnFamilies(keyspace).contains(cfDef.getName())) {
+                                        if 
(!m_snapshot.get(keyspace).contains(cfDef.getName())) {
                                             
m_logService.log(LogService.LOG_DEBUG, "Adding ColumnFamily '" + 
cfDef.getName() + "' to keyspace '"
                                                 + keyspace + "' for the 
keyspace-global ColumnFamilyProvider '" + provider.getClass().getName() + "'");
                                             final String cfName = 
cfDef.getName();
@@ -206,7 +202,7 @@
         // Loop over all Cassandra Persistence Managers and unregister them if 
the corresponding keyspace
         // is removed from Cassandra
         private void onKeyspaceDropped() throws InvalidSyntaxException, 
TException, InvalidRequestException {
-            List<String> keyspaces = m_daemonService.getKeyspaces();
+            Set<String> keyspaces = m_snapshot.keySet();
             ServiceReference[] servRefs = 
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(), 
null);
             if (servRefs != null) {
                 for (ServiceReference servRef : servRefs) {
@@ -221,10 +217,10 @@
         }
 
         private void onColumnFamilyAdded() throws TException, 
InvalidRequestException, InvalidSyntaxException, NotFoundException {
-            List<String> keyspaces = m_daemonService.getKeyspaces();
+            Set<String> keyspaces = m_snapshot.keySet();
             if (keyspaces != null) {
                 for (String keyspace : keyspaces) {
-                    List<String> columnFamilies = 
m_daemonService.getColumnFamilies(keyspace);
+                    List<String> columnFamilies = m_snapshot.get(keyspace);
                     for (String columnFamily : columnFamilies) {
                         String ksFilter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
                         String cfFilter = "(" + 
ColumnFamilyAvailable.FILTER_NAME  + "=" + columnFamily + ")";
@@ -244,18 +240,7 @@
                 for (ServiceReference servRef : servRefs) {
                     String keyspace = 
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
                     String columnFamily = 
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
-                    boolean remove = false;
-                    try {
-                        if 
(!m_daemonService.getColumnFamilies(keyspace).contains(columnFamily)) {
-                            remove = true;
-                        }
-
-                    } catch (NotFoundException e) {
-                        // This exception is thrown when the keyspace could 
not be found anymore, in which
-                        // case also the ColumnFamilyAvailable for that 
keyspace should be removed
-                        remove = true;
-                    }
-                    if (remove) {
+                    if (m_snapshot.get(keyspace) == null || 
!m_snapshot.get(keyspace).contains(columnFamily)) {
                         m_context.ungetService(servRef);
                         m_logService.log(LogService.LOG_INFO, 
"ColumnFamilyAvailable service for keyspace '" + keyspace
                             + "' and ColumnFamily '" + columnFamily + "' 
unregistered");

Reply via email to