Author: ivol37 at gmail.com
Date: Fri Oct 29 14:19:56 2010
New Revision: 217

Log:
[AMDATU-137] Improved and enhanced Cassandra related integration tests, taking 
into account the asynchronous behavior of Cassandra write operations. Also 
fixed minor bug in Shindig bundle when running in Pax Exam and added loghandler 
bundle such that integration test results contain more logging information.

Added:
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraPersistenceManagerTest.java
Modified:
   trunk/integration-tests/pom.xml
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/TenantManagementServiceTest.java
   
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/UserAdminStoreTest.java
   
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
   
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
   
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
   
trunk/platform-bundles/shindig-application/src/main/java/org/amdatu/platform/shindig/application/service/ShindigRegistrationServiceImpl.java

Modified: trunk/integration-tests/pom.xml
==============================================================================
--- trunk/integration-tests/pom.xml     (original)
+++ trunk/integration-tests/pom.xml     Fri Oct 29 14:19:56 2010
@@ -55,12 +55,19 @@
     </dependency>
     <dependency>
       <groupId>org.amdatu.platform</groupId>
-      <artifactId>shindig-application</artifactId>
+      <artifactId>loghandler</artifactId>
+      <version>${platform.version}</version>
       <scope>test</scope>
       <type>bundle</type>
     </dependency>    
     <dependency>
       <groupId>org.amdatu.platform</groupId>
+      <artifactId>shindig-application</artifactId>
+      <scope>test</scope>
+      <type>bundle</type>
+    </dependency>       
+    <dependency>
+      <groupId>org.amdatu.platform</groupId>
       <artifactId>tenant-service</artifactId>
       <scope>test</scope>
       <type>bundle</type>
@@ -152,6 +159,7 @@
            phase such that we can use Pax Exam for integration testing -->
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.6</version>
         <executions>
           <execution>
             <id>run-integration-test</id>

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
     (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/base/IntegrationTestBase.java
     Fri Oct 29 14:19:56 2010
@@ -101,6 +101,7 @@
                     amdatuTenantService(),
                     amdatuUserAdminCassandraStore(),
                     amdatuWink(),
+                    amdatuLogHandler(),
 
                 // And finally deploy ourselves
                 bundle(integrationTestJarFile().toURI().toString())
@@ -190,6 +191,9 @@
         serviceTracker.open();
         try {
             serviceInstance = (T) 
serviceTracker.waitForService(SERVICE_TIMEOUT * 1000);
+            if (serviceInstance != null) {
+                return serviceInstance;
+            }
         }
         catch (InterruptedException e) {
             e.printStackTrace();
@@ -268,6 +272,10 @@
         return 
mavenBundle().groupId("org.amdatu.platform").artifactId("config-template-manager").versionAsInProject();
     }
 
+    protected static MavenArtifactProvisionOption amdatuLogHandler() {
+        return 
mavenBundle().groupId("org.amdatu.platform").artifactId("loghandler").versionAsInProject();
+    }
+    
     protected static MavenArtifactProvisionOption amdatuHttpContext() {
         return 
mavenBundle().groupId("org.amdatu.platform").artifactId("httpcontext").versionAsInProject();
     }

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
 (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraDaemonIntegrationTest.java
 Fri Oct 29 14:19:56 2010
@@ -27,6 +27,8 @@
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Option;
@@ -52,13 +54,35 @@
 
     @Override
     protected Component[] getDependencies(DependencyManager manager) {
-        return new Component[] {
-                manager.createComponent()
-                    .setImplementation(this)
-                    .add(manager.createServiceDependency()
-                        .setService(CassandraDaemonService.class)
-                        .setRequired(true))
-        };
+        return new Component[] { manager.createComponent()
+            .setImplementation(this)
+            .add(manager.createServiceDependency()
+            .setService(CassandraDaemonService.class)
+            .setRequired(true)) };
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // FIXME: Temporary sleep, there should be a better way
+        Thread.sleep(10000);
+        
+        m_daemonService = getService(CassandraDaemonService.class);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // Clean up our keyspaces.
+        try {
+            m_daemonService.dropKeyspace(KEYSPACE);
+        }
+        finally {
+            try {
+                m_daemonService.dropKeyspace(KEYSPACE.toLowerCase());
+            }
+            finally {
+                m_daemonService.dropKeyspace(KEYSPACE.toUpperCase());
+            }
+        }
     }
 
     @Test
@@ -80,6 +104,9 @@
         // Case sensitivity check: Cassandra is case-sensitive!
         m_daemonService.addKeyspace(KEYSPACE.toLowerCase());
         m_daemonService.addKeyspace(KEYSPACE.toUpperCase());
+        
+        // Sleep a second, this increases the probability that write requests 
have been handled 
+        Thread.sleep(1000);
 
         // Get all keyspaces and check that all three have been created
         List<String> allKeyspaces = m_daemonService.getKeyspaces();
@@ -110,6 +137,9 @@
         m_daemonService.addColumnFamily(KEYSPACE, COLUMNFAMILY.toUpperCase(), 
ColumnType.STANDARD.value,
             CompareType.BYTESTYPE.value, CompareType.BYTESTYPE.value);
 
+        // Sleep a second, this increases the probability that write requests 
have been handled 
+        Thread.sleep(1000);
+        
         List<String> allColumnFamilies = 
m_daemonService.getColumnFamilies(KEYSPACE);
         Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY));
         
Assert.assertTrue(allColumnFamilies.contains(COLUMNFAMILY.toLowerCase()));
@@ -117,9 +147,5 @@
         Assert.assertTrue("Expected available ColumnFamily's = 3, but actual = 
"
             + allColumnFamilies.size(), allColumnFamilies.size() == 3);
 
-        // -3- Clean up our keyspaces.
-        m_daemonService.dropKeyspace(KEYSPACE);
-        m_daemonService.dropKeyspace(KEYSPACE.toLowerCase());
-        m_daemonService.dropKeyspace(KEYSPACE.toUpperCase());
     }
 }

Added: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraPersistenceManagerTest.java
==============================================================================
--- (empty file)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/CassandraPersistenceManagerTest.java
        Fri Oct 29 14:19:56 2010
@@ -0,0 +1,135 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.test.integration.tests;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.amdatu.platform.cassandra.application.CassandraDaemonService;
+import 
org.amdatu.platform.cassandra.listener.ColumnFamilyDefinition.ColumnType;
+import 
org.amdatu.platform.cassandra.listener.ColumnFamilyDefinition.CompareType;
+import org.amdatu.platform.cassandra.persistencemanager.CassandraException;
+import 
org.amdatu.platform.cassandra.persistencemanager.CassandraPersistenceManager;
+import 
org.amdatu.platform.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.amdatu.test.integration.base.IntegrationTestBase;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.Configuration;
+import org.ops4j.pax.exam.junit.JUnit4TestRunner;
+
+/**
+ * This class provides an integration test for the Cassandra Persistence 
Manager.
+ * 
+ * @author ivol
+ */
+ at RunWith(JUnit4TestRunner.class)
+public class CassandraPersistenceManagerTest extends IntegrationTestBase {
+    // Test keyspace and ColumnFamily
+    private final static String KEYSPACE = "PMTestKeySpace";
+    private final static String COLUMNFAMILY = "IntegrationTestColumnFamily";
+    private final static String DEFAULT_CHARSET = "UTF-8";
+
+    // Services under test
+    private CassandraDaemonService m_daemonService;
+    private CassandraPersistenceManagerFactory m_pmFactory;
+    private CassandraPersistenceManager m_pm;
+
+    @Configuration
+    public Option[] configure() {
+        return super.configure();
+    }
+
+    @Override
+    protected Component[] getDependencies(DependencyManager manager) {
+        return new Component[] { manager.createComponent()
+            .setImplementation(this)
+            .add(manager.createServiceDependency()
+            .setService(CassandraDaemonService.class)
+            .setRequired(true)) };
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // FIXME: Temporary sleep, there should be a better way
+        Thread.sleep(10000);
+        
+        m_daemonService = getService(CassandraDaemonService.class);
+
+        // Create the test Keyspace and ColumnFamily
+        m_daemonService.addKeyspace(KEYSPACE);
+        m_daemonService.addColumnFamily(KEYSPACE, COLUMNFAMILY, 
ColumnType.SUPER.value,
+            CompareType.BYTESTYPE.value, CompareType.BYTESTYPE.value);
+
+        // Retrieve the CassandraPersistenceManager for the keyspace we just 
created
+        m_pmFactory = getService(CassandraPersistenceManagerFactory.class);
+        m_pmFactory.createCassandraPersistenceManager(KEYSPACE);
+        String filter = "(" + CassandraPersistenceManager.KEYSPACE_AWARE_KEY + 
"=" + KEYSPACE + ")";
+        m_pm = getService(CassandraPersistenceManager.class, filter);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // Clean up our keyspaces, also when the test fails
+        m_daemonService.dropKeyspace(KEYSPACE);
+    }
+
+    @Test
+    public void testCassandraDaemonService() throws Exception {
+        // Perform intensive getValue/setValue test (see AMDATU-137)
+        String rowKey = "test";
+        String superColumn = "TestSuperColumn";
+        String column = "TestColumn";
+        int failures = 0;
+        for (int i = 0; i < 10000; i++) {
+            int randomNumber = new Double(Math.floor(10000 * 
Math.random())).intValue();
+            String value = "amdatutest-" + randomNumber;
+            byte[] byteValue = value.getBytes(DEFAULT_CHARSET);
+            String failure = verifySetValue(KEYSPACE, COLUMNFAMILY, rowKey, 
superColumn, column, byteValue);
+            if (failure != null) {
+                failures++;
+            }
+        }
+        if (failures > 0) {
+            Assert.fail("Cassandra setValue/getValue test failed. " + failures 
+ " failures detected.");
+        }
+    }
+
+    /**
+     * This method invokes setValue and immediately verifies if getValue 
returns the value
+     * that just has been set.
+     * 
+     * @throws UnsupportedEncodingException
+     */
+    private String verifySetValue(String keyspace, String columnFamilyName, 
String rowKey, String superColumnName,
+        String columnName, byte[] value) throws CassandraException, 
UnsupportedEncodingException {
+        m_pm.setValueSynchronously(columnFamilyName, rowKey, superColumnName, 
columnName, value);
+        byte[] persistentByteValue = m_pm.getValue(COLUMNFAMILY, rowKey, 
superColumnName, columnName);
+        if (!Arrays.equals(value, persistentByteValue)) {
+            return new String(persistentByteValue, DEFAULT_CHARSET);
+        }
+        else {
+            return null;
+        }
+    }
+}

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/TenantManagementServiceTest.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/TenantManagementServiceTest.java
    (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/TenantManagementServiceTest.java
    Fri Oct 29 14:19:56 2010
@@ -24,11 +24,17 @@
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
-import org.amdatu.platform.tenant.*;
+
+import org.amdatu.platform.tenant.Tenant;
+import org.amdatu.platform.tenant.TenantEntity;
+import org.amdatu.platform.tenant.TenantException;
+import org.amdatu.platform.tenant.TenantManagementService;
+import org.amdatu.platform.tenant.TenantStorageProvider;
 import org.amdatu.test.integration.base.IntegrationTestBase;
 import org.amdatu.test.integration.mock.InMemoryTenantStorageProvider;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Option;
@@ -54,6 +60,12 @@
     public Option[] configure() {
         return super.configure();
     }
+    
+    @Before
+    public void setUp() throws Exception {
+        // FIXME: Temporary sleep, there should be a better way
+        Thread.sleep(10000);
+    }
 
     public Component[] getDependencies(DependencyManager manager) {
         Component testComponent = manager.createComponent()

Modified: 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/UserAdminStoreTest.java
==============================================================================
--- 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/UserAdminStoreTest.java
     (original)
+++ 
trunk/integration-tests/src/test/java/org/amdatu/test/integration/tests/UserAdminStoreTest.java
     Fri Oct 29 14:19:56 2010
@@ -19,6 +19,7 @@
 import junit.framework.Assert;
 
 import org.amdatu.test.integration.base.IntegrationTestBase;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Inject;
@@ -38,6 +39,10 @@
  */
 @RunWith(JUnit4TestRunner.class)
 public class UserAdminStoreTest extends IntegrationTestBase {
+    // Amount of milliseconds to wait between WRITE and READ operations 
concerning the same row and same column.
+    // See AMDATU-137.
+    private static int WAIT = 10;
+
     @Inject
     protected BundleContext m_bundleContext;
 
@@ -47,17 +52,30 @@
     public Option[] configure() {
         return super.configure();
     }
-
+    
+    private void sleep() throws InterruptedException {
+        // Wait for 100 milliseconds, remember that Cassandra write operations 
are asynchronously
+        Thread.sleep(WAIT);
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        // FIXME: Temporary sleep, there should be a better way
+        Thread.sleep(10000);
+        
+        m_userAdmin = getService(UserAdmin.class);
+    }
+    
+    @SuppressWarnings("unchecked")
     @Test
     public void testAdminStore() throws Exception {
-        m_userAdmin = getService(UserAdmin.class);
-
         // Start the test, first remove all existing roles
         Role[] allRoles = m_userAdmin.getRoles(null);
         if (allRoles != null) {
             for (Role role : allRoles) {
                 String roleName = role.getName();
                 m_userAdmin.removeRole(roleName);
+                sleep();
                 Assert.assertTrue("Role '" + roleName + "' removed but still 
returned", m_userAdmin
                     .getRole(roleName) == null);
             }
@@ -80,8 +98,8 @@
         guestUser.getCredentials().put("password", "guestpasswd");
 
         // Now see if we can find these users from credentials
-        Assert.assertTrue("TestAdmin user could not be found",
-            m_userAdmin.getUser("name", "TestAdministrator") != null);
+        sleep();
+        Assert.assertTrue("TestAdmin user could not be found", 
m_userAdmin.getUser("name", "TestAdministrator") != null);
         Assert.assertTrue("TestEditor user could not be found", 
m_userAdmin.getUser("name", "TestEditor") != null);
         Assert.assertTrue("TestGuest user could not be found", 
m_userAdmin.getUser("name", "TestGuest") != null);
 
@@ -90,17 +108,17 @@
         adminUser.getProperties().put("lastName", "Doe".getBytes());
 
         // And check if they were set correctly
-        byte[] firstName =
-            (byte[]) m_userAdmin.getUser("name", 
"TestAdministrator").getProperties().get("firstName");
+        sleep();
+        byte[] firstName = (byte[]) m_userAdmin.getUser("name", 
"TestAdministrator").getProperties().get("firstName");
         Assert.assertTrue("firstName not properly set", new String(firstName, 
"UTF-8").equals("John"));
         byte[] lastName = (byte[]) m_userAdmin.getUser("name", 
"TestAdministrator").getProperties().get("lastName");
         Assert.assertTrue("lastName not properly set", new String(lastName, 
"UTF-8").equals("Doe"));
 
         // Remove last name
         adminUser.getProperties().remove("lastName");
+        sleep();
         Assert.assertTrue("lastName not properly removed from property set", 
m_userAdmin.getUser("name",
-            "TestAdministrator")
-            .getProperties().get("lastName") == null);
+            "TestAdministrator").getProperties().get("lastName") == null);
 
         // Now create "Administrator", "Editor" and "Guest" group and do some 
assignments
         Group adminGroup = (Group) m_userAdmin.createRole("TestAdminUsers", 
Role.GROUP);
@@ -110,24 +128,30 @@
         adminGroup.addRequiredMember(adminUser);
 
         // Excessive test of TestEditUsers group since this failed before
+        sleep();
         assertBasicMemberCount("TestEditUsers", 0);
         assertRequiredMemberCount("TestEditUsers", 0);
         editorGroup.addMember(adminUser);
+        sleep();
         assertBasicMemberCount("TestEditUsers", 1);
         assertRequiredMemberCount("TestEditUsers", 0);
         editorGroup.addMember(editorUser);
+        sleep();
         assertBasicMemberCount("TestEditUsers", 2);
         assertRequiredMemberCount("TestEditUsers", 0);
         editorGroup.addRequiredMember(adminUser);
+        sleep();
         assertBasicMemberCount("TestEditUsers", 2);
         assertRequiredMemberCount("TestEditUsers", 1);
         editorGroup.addRequiredMember(editorUser);
+        sleep();
         assertBasicMemberCount("TestEditUsers", 2);
         assertRequiredMemberCount("TestEditUsers", 2);
 
         guestGroup.addMember(adminUser);
         guestGroup.addMember(editorUser);
         guestGroup.addMember(guestUser);
+        sleep();
 
         assertBasicMemberCount("TestAdminUsers", 1);
         assertRequiredMemberCount("TestAdminUsers", 1);
@@ -138,10 +162,12 @@
 
         // Now remove some members
         guestGroup.removeMember(adminUser);
+        sleep();
         assertBasicMemberCount("TestGuestUsers", 2);
 
         // Remove the editor user, is it removed from the admin group?
         m_userAdmin.removeRole("TestEditor");
+        sleep();
         Assert.assertTrue("TestEditor should have been removed", 
m_userAdmin.getRole("TestEditor") == null);
 
         // Finally, remove all test users and groups
@@ -150,6 +176,7 @@
             for (Role role : allRoles) {
                 String roleName = role.getName();
                 m_userAdmin.removeRole(roleName);
+                sleep();
                 Assert.assertTrue("Role '" + roleName + "' removed but still 
returned", m_userAdmin
                     .getRole(roleName) == null);
             }

Modified: 
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
        (original)
+++ 
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
        Fri Oct 29 14:19:56 2010
@@ -20,8 +20,8 @@
 import java.util.List;
 
 import org.amdatu.platform.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.avro.CassandraDaemon;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.avro.CassandraDaemon;
 import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -38,6 +38,8 @@
 public class CassandraDaemonServiceImpl implements CassandraDaemonService {
     // The default placement strategy
     private final String DEFAULT_PLACEMENT_STRATEGY = 
"org.apache.cassandra.locator.SimpleStrategy";
+    
+    private final int DEFAULT_REPLICATION_FACTOR = 1;
 
     // Service dependencies, injected by the framework
     private volatile LogService m_logService;
@@ -82,6 +84,7 @@
     }
 
     public void stop() {
+        m_logService.log(LogService.LOG_INFO, "Shutting down Cassandra 
Daemon");
         m_daemon.deactivate();
         m_daemonHasShutdown = true;
         m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
@@ -117,7 +120,7 @@
 
     public void addKeyspace(String name) throws InvalidRequestException, 
TException {
         List<CfDef> empty = new ArrayList<CfDef>();
-        KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 1, empty);
+        KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 
DEFAULT_REPLICATION_FACTOR, empty);
         m_cassandraServer.system_add_keyspace(ksDef);
     }
     

Modified: 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
==============================================================================
--- 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
 (original)
+++ 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
 Fri Oct 29 14:19:56 2010
@@ -19,12 +19,22 @@
 import java.util.List;
 import java.util.Map;
 
-
 /**
  * This interface defines the Apache Cassandra persistence manager. Note that 
the persistence manager is already tight
  * to a particular keyspace, since the persistence manager is tenant aware; an 
instance exists for each tenant and
  * tenant is mapped to exactly one keyspace. For that reason most API methods 
do not take the keyspace as input
  * parameter.
+ * For all READ operations a ConsistencyLevel of ALL is used, for WRITES a 
ConsistencyLevel of ONE is used. Note that
+ * Cassandra handles WRITE operations asynchronously. So an invocation of 
setValue for example will append a
+ * 'RowMutation'
+ * to the CommitLog of nodes in the cluster with the change. However, the 
method returns before the write is actually
+ * finished; subsequent reads could still return the new value. In Cassandra 
there is no way you can ensure that this
+ * write is 'finished'. In a deployment model where Cassadnra is typically 
used this would not make much change; content
+ * is continuously changes on all nodes in the cluster and synchronized 
between them. There is no transaction support so
+ * you can never ensure that when you write value 'X' to a certain row in your 
Java program, the value has been changed
+ * in Cassandra before hitting the next line in your Java program. This is 
also the case when running only one node and
+ * running only one thread.
+ * 
  * @author ivol
  */
 public interface CassandraPersistenceManager {
@@ -33,29 +43,26 @@
      * are not tenant specific, like the tenants themselves.
      */
     String DEFAULT_KEYSPACE = "Default";
-    
+
     /**
      * The property key with which Keyspace aware services should register 
their service.
      * So a keyspace aware service should add a property named 
KEYSPACE_AWARE_KEY
      * with a value that equals the keyspace id in its service properties.
      * For example:
-     * <pre>
-     * {@code
-     *   private volatile org.apache.felix.dm.Component m_component;
-     * 
-     *   public void init() {
-     *     String keyspaceId = "example_keyspace_id";
-     *     Dictionary<String, String> serviceProperties = new 
Hashtable<String, String>();
-     *     serviceProperties.put(KEYSPACE_AWARE_PROPERTY_KEY, keyspaceId);
-     *     m_component.setServiceProperties(serviceProperties);
-     *   }
-     * }
-     * </pre>
+     * <pre> {@code
+     * private volatile org.apache.felix.dm.Component m_component;
+     * 
+     * public void init()
+     * String keyspaceId = "example_keyspace_id";
+     * Dictionary<String, String> serviceProperties = new Hashtable<String, 
String>();
+     * serviceProperties.put(KEYSPACE_AWARE_PROPERTY_KEY, keyspaceId);
+     * m_component.setServiceProperties(serviceProperties); * } </pre>
      */
     String KEYSPACE_AWARE_KEY = "keyspaceid";
-    
+
     /**
      * Returns if the specified ColumnFamily exists.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to check its 
existence for
      * @return true if a ColumnFamily with that name exists in the current 
keyspace
      */
@@ -63,6 +70,7 @@
 
     /**
      * Returns if a row with the specified key exists in the specified 
ColumnFamily.
+     * 
      * @param columnFamilyName Name of the column family to check
      * @param rowKey The row key to check
      * @return true if a record with the row key exists in the specified 
ColumnFamily.
@@ -72,6 +80,7 @@
     /**
      * Returns if the specified ColumnFamily exists and if it contains a row 
with the specified row key with a
      * SuperColumn.Column as specified.
+     * 
      * @param columnFamilyName Name of the ColumnFamily to check
      * @param rowKey The row key to check
      * @param superColumName The name of the SuperColumn to search for
@@ -80,10 +89,11 @@
      *         SuperColumn.Column as specified.
      */
     boolean exists(String columnFamilyName, String rowKey, String 
superColumnName, String columnName)
-            throws CassandraException;
+        throws CassandraException;
 
     /**
      * Returns a list of row keys of all key slices contained by the specified 
ColumnFamily.
+     * 
      * @param ColumnFamily The ColumnFamily to return all key names for
      * @return list of row key of all key slices contained by the specified 
ColumnFamily
      */
@@ -91,6 +101,7 @@
 
     /**
      * Retrieves the column names of all columns or super columns in the 
specified ColumnFamily for the given row key.
+     * 
      * @param columnFamilyName The name of the ColumnFamily for which to 
retrieve the column names
      * @param rowKey The row key for which to retrieve the column names
      * @return the column names for the given ColumnFamily and row key.
@@ -102,9 +113,10 @@
      * specified ColumnFamily. This method assumes that the ColumnFamily is of 
type 'Super' hence the name
      * 'getSuperValues'. Note that each value in Cassandra is stored as 
byte[], hence the map returns the values as
      * byte[].
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
row from
      * @return Map that maps the row key onto a (map that maps the name of the 
super column onto a (map
-     * that maps the name of the column onto the value of the column)))
+     *         that maps the name of the column onto the value of the column)))
      */
     Map<String, Map<String, Map<String, byte[]>>> getSuperValues(String 
columnFamilyName) throws CassandraException;
 
@@ -113,10 +125,11 @@
      * specified ColumnFamily. This method assumes that the ColumnFamily is of 
type 'Super' hence the name
      * 'getSuperValues'. Note that each value in Cassandra is stored as 
byte[], hence the map returns the values as
      * byte[].
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
row from
      * @param rowKey The key of the row to retrieve
      * @return map that maps the name of the super column onto a (map that 
maps the name of the column onto
-     * the value of the column))
+     *         the value of the column))
      */
     Map<String, Map<String, byte[]>> getSuperValues(String columnFamilyName, 
String rowKey) throws CassandraException;
 
@@ -125,17 +138,20 @@
      * specified ColumnFamily as String. This method assumes that the 
ColumnFamily is of type 'Super' hence the name
      * 'getSuperValues'. Note that each value in Cassandra is stored as 
byte[], but the map returns the values converted
      * to String values.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
row from
      * @param rowKey The key of the row to retrieve
      * @return map that maps the name of the super column onto a (map that 
maps the name of the column onto
-     * the value of the column))
+     *         the value of the column))
      */
-    Map<String, Map<String, String>> getSuperStringValues(String 
columnFamilyName, String rowKey) throws CassandraException;
+    Map<String, Map<String, String>> getSuperStringValues(String 
columnFamilyName, String rowKey)
+        throws CassandraException;
 
     /**
      * Retrieves a single value from the specified ColumnFamily, row, super 
column and column.
      * Throws an exception if no such column exists in the specified row, 
ColumnFamily and supercolumn.
      * Returns null if the column exists but its value is null.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
value from
      * @param rowKey The key of the row to retrieve
      * @param superColumnName The name of the super column to retrieve the 
value from
@@ -143,12 +159,14 @@
      * @return The value of the specified ColumnFamily, row, super column and 
column
      * @throws CassandraException In case an exception occurred during 
retrieval
      */
-    byte[] getValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName) throws CassandraException;
+    byte[] getValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName)
+        throws CassandraException;
 
     /**
      * Retrieves a single String value from the specified ColumnFamily, row, 
super column and column.
      * Throws an exception if no such column exists in the specified row, 
ColumnFamily and supercolumn.
      * Returns null if the column exists but its value is null.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
value from
      * @param rowKey The key of the row to retrieve
      * @param superColumnName The name of the super column to retrieve the 
value from
@@ -156,51 +174,85 @@
      * @return The value of the specified ColumnFamily, row, super column and 
column converted to a String
      * @throws CassandraException In case an exception occurred during 
retrieval
      */
-    String getStringValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName) throws CassandraException;
+    String getStringValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName)
+        throws CassandraException;
 
     /**
      * Returns the value of all columns in the specified supercolumn for the 
ColumnFamily in the given row.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
values from
      * @param rowKey The key of the row to retrieve
      * @param superColumnName The name of the super column to retrieve the 
values from
      * @return Map that maps the name of each column onto its value
      */
-    Map<String, byte[]> getValues(String columnFamilyName, String rowKey, 
String superColumnName) throws CassandraException;
+    Map<String, byte[]> getValues(String columnFamilyName, String rowKey, 
String superColumnName)
+        throws CassandraException;
 
     /**
      * Returns the String value of all columns in the specified supercolumn 
for the ColumnFamily in the given row.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to retrieve the 
values from
      * @param rowKey The key of the row to retrieve
      * @param superColumnName The name of the super column to retrieve the 
values from
      * @return Map that maps the name of each column onto its String value
      */
-    Map<String, String> getStringValues(String columnFamilyName, String 
rowKey, String superColumnName) throws CassandraException;
+    Map<String, String> getStringValues(String columnFamilyName, String 
rowKey, String superColumnName)
+        throws CassandraException;
 
     /**
-     * Sets the value of the specified column in the super column for the 
ColumnFamily to the specified value. If no such
+     * Sets the value of the specified column in the super column for the 
ColumnFamily to the specified value. If no
+     * such
      * column or row exists, it will be inserted.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to set the values 
on
      * @param rowKey The key of the row to set the values on
      * @param superColumnName The name of the super column to set the values on
      * @param columnName The name of the column to set the value for
      * @param value The value to set
      */
-    void setValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName, byte[] value) throws CassandraException;
+    void setValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName, byte[] value)
+        throws CassandraException;
 
     /**
-     * Sets the value as String of the specified column in the supercolumn for 
the ColumnFamily to the specified value. If no such
+     * Sets the value of the specified column in the super column for the 
ColumnFamily to the specified value. If no
+     * such column or row exists, it will be inserted.
+     * Although Cassandra write operations are always asynchronously, this 
method tries to simulate a synchronous write
+     * by verifying the result and waiting for the write operation to be 
finished on the current node (with a maximum of
+     * 10 retries). Usually, if you perform a setValue('x') on row r and 
immediately invoke getValue() on the same row
+     * r, it is quite unlikely that 'x' will be returned. More likely, still 
the previous value of row r is returned.
+     * This is because writes are handles asynchronously and setValue will 
return before the write operation is finished
+     * (where 'finished' means that a subsequent call to getValue would return 
the new value). This method simulates a
+     * synchronous write operation by verifying if the result written by 
setValue is returned by getValue and retrying
+     * this for a maximum of 10 times. If after 10 retries the getValue still 
doesn;t return the value set with setValue
+     * then a CassandraException is thrown.
+     * 
+     * @param columnFamilyName The name of the ColumnFamily to set the values 
on
+     * @param rowKey The key of the row to set the values on
+     * @param superColumnName The name of the super column to set the values on
+     * @param columnName The name of the column to set the value for
+     * @param value The value to s
+     */
+    public void setValueSynchronously(String columnFamilyName, String rowKey, 
String superColumnName,
+        String columnName, byte[] value)
+        throws CassandraException;
+
+    /**
+     * Sets the value as String of the specified column in the supercolumn for 
the ColumnFamily to the specified value.
+     * If no such
      * column or row exists, it will be inserted.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to set the values 
on
      * @param rowKey The key of the row to set the values on
      * @param superColumnName The name of the super column to set the values on
      * @param columnName The name of the column to set the value for
      * @param value The value to set
      */
-    void setStringValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName, String value) throws CassandraException;
-
+    void setStringValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName, String value)
+        throws CassandraException;
 
     /**
      * Deletes an entire row from the specified ColumnFamily.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to remove the row 
from
      * @param rowKey The key of the row to remove.
      */
@@ -208,6 +260,7 @@
 
     /**
      * Deletes a specific super column from a row in the specified 
ColumnFamily.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to remove the 
super column from
      * @param rowKey The key of the row to remove the super column from
      * @param superColumnName The name of the super column to remove
@@ -216,23 +269,27 @@
 
     /**
      * Deletes a specific column from the super column in a row in the 
specified ColumnFamily.
+     * 
      * @param columnFamilyName The name of the ColumnFamily to remove the 
column from
      * @param rowKey The key of the row to remove the column from
      * @param superColumnName The name of the super column to remove the 
column from
      * @param columnName The name of the column in the super column to remove
      */
-    void deleteColumn(String columnFamilyName, String rowKey, String 
superColumnName, String columnName) throws CassandraException;
+    void deleteColumn(String columnFamilyName, String rowKey, String 
superColumnName, String columnName)
+        throws CassandraException;
 
     /**
      * Persists the properties of the specified bean using reflection.
+     * 
      * @param bean The bean to persist. It is assumed that the bean has a 
method getId() which returns the identifier
-     *            ('primary key') of the object to persist, that the classname 
equals the columnFamily name and that the
-     *            superColumn name is 'Default'.
+     *        ('primary key') of the object to persist, that the classname 
equals the columnFamily name and that the
+     *        superColumn name is 'Default'.
      */
-    void persistBean(Object bean) throws CassandraException ;
+    void persistBean(Object bean) throws CassandraException;
 
     /**
      * Persists the properties of the specified bean using reflection.
+     * 
      * @param bean The bean to persist
      * @param id The id ('primary key') of the object to persist
      * @param tabe The columnFamily name to persist the object to
@@ -242,15 +299,17 @@
 
     /**
      * Loads the properties the person with the specified id.
+     * 
      * @param clazz The class of the bean to return.
      * @param id The id of the record ('primary key') to load. It is assumed 
that the classname equals the columnFamily
-     *            name and that the superColumn name is 'Default'.
+     *        name and that the superColumn name is 'Default'.
      * @return The bean loaded from the keyspace or null if no record exists 
with the specified id
      */
     <T> T loadBean(Class<T> clazz, String id) throws CassandraException;
 
     /**
      * Loads the properties the person with the specified id.
+     * 
      * @param clazz The class of the bean to return
      * @param id The id of the record ('primary key') to load
      * @param tabe The columnFamily name to read the object from
@@ -259,7 +318,6 @@
      */
     <T> T loadBean(Class<T> clazz, String id, String columnFamily, String 
superColumn) throws CassandraException;
 
-
     // WTF is this?
     List<byte[]> getSuperRow(String table, String key, List<byte[]> columns);
 }

Modified: 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
==============================================================================
--- 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
     (original)
+++ 
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
     Fri Oct 29 14:19:56 2010
@@ -16,8 +16,10 @@
  */
 package org.amdatu.platform.cassandra.persistencemanager.service;
 
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +61,18 @@
     // Maximum amount of columns to retrieve in queries
     private final static int COLUMN_LIMIT = 1000000;
 
+    // Empty byte array
+    private final static byte[] EMPTY = new byte[0];
+
+    // The consistency level to use for READ operations
+    private final static ConsistencyLevel READ_CONSISTENCY_LEVEL = 
ConsistencyLevel.ALL;
+    
+    // The consistency level to use for WRITE operations
+    private final static ConsistencyLevel WRITE_CONSISTENCY_LEVEL = 
ConsistencyLevel.ONE;
+
+    // Investigation pointed out that retrying succeeds after about 10 times.
+    private final static int MAX_RETRIES = 10;
+
     // Default characterset to use
     private final static String DEFAULT_CHARSET = "UTF-8";
 
@@ -99,15 +113,15 @@
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
             SlicePredicate p = new SlicePredicate();
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             KeyRange range = new KeyRange(100000);
-            range.setStart_key(new byte[0]);
-            range.setEnd_key(new byte[0]);
+            range.setStart_key(EMPTY);
+            range.setEnd_key(EMPTY);
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            cs.get_range_slices(columnParent, p, range, ConsistencyLevel.ONE);
+            cs.get_range_slices(columnParent, p, range, 
READ_CONSISTENCY_LEVEL);
             return true;
         }
         catch (Exception e) {
@@ -124,15 +138,15 @@
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
             SlicePredicate p = new SlicePredicate();
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             KeyRange range = new KeyRange(1);
-            range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
-            range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
+            range.setStart_key(toBytes(rowKey));
+            range.setEnd_key(toBytes(rowKey));
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            List<KeySlice> keySlices = cs.get_range_slices(columnParent, p, 
range, ConsistencyLevel.ONE);
+            List<KeySlice> keySlices = cs.get_range_slices(columnParent, p, 
range, READ_CONSISTENCY_LEVEL);
             return keySlices != null && keySlices.size() > 0 && 
keySlices.get(0).getColumns().size() > 0;
         }
         catch (Exception e) {
@@ -150,11 +164,11 @@
         throws CassandraException {
         try {
             ColumnPath columnPath = new ColumnPath(columnFamilyName);
-            
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
-            columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
+            columnPath.setSuper_column(toBytes(superColumnName));
+            columnPath.setColumn(toBytes(columnName));
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            ColumnOrSuperColumn sosc = 
cs.get(rowKey.getBytes(DEFAULT_CHARSET), columnPath, ConsistencyLevel.ONE);
+            ColumnOrSuperColumn sosc = cs.get(toBytes(rowKey), columnPath, 
READ_CONSISTENCY_LEVEL);
             sosc.getColumn();
             return true;
         }
@@ -182,17 +196,17 @@
             SlicePredicate p = new SlicePredicate();
 
             // Create and set slice range with a maximum of COLUMN_LIMIT 
columns
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             // Set key range to maximum ROW_LIMIT results
             KeyRange range = new KeyRange(ROW_LIMIT);
-            range.setStart_key(new byte[0]);
-            range.setEnd_key(new byte[0]);
+            range.setStart_key(EMPTY);
+            range.setEnd_key(EMPTY);
 
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            List<KeySlice> keySlices = cs.get_range_slices(columnParent, p, 
range, ConsistencyLevel.ONE);
+            List<KeySlice> keySlices = cs.get_range_slices(columnParent, p, 
range, READ_CONSISTENCY_LEVEL);
 
             List<String> keys = new ArrayList<String>();
             for (KeySlice keySlice : keySlices) {
@@ -222,14 +236,14 @@
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
 
             // read entire row
-            SliceRange range = new SliceRange(new byte[0], new byte[0], false, 
COLUMN_LIMIT);
+            SliceRange range = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             SlicePredicate predicate = new SlicePredicate();
             predicate.setSlice_range(range);
 
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
             List<ColumnOrSuperColumn> slice =
-                cs.get_slice(rowKey.getBytes(DEFAULT_CHARSET), columnParent, 
predicate, ConsistencyLevel.ONE);
+                cs.get_slice(toBytes(rowKey), columnParent, predicate, 
READ_CONSISTENCY_LEVEL);
 
             for (ColumnOrSuperColumn columnOrSuperColumn : slice) {
                 if (columnOrSuperColumn.isSetSuper_column()) {
@@ -258,16 +272,16 @@
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
             SlicePredicate p = new SlicePredicate();
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             KeyRange range = new KeyRange(100000);
-            range.setStart_key(new byte[0]);
-            range.setEnd_key("zzzzz".getBytes(DEFAULT_CHARSET));
+            range.setStart_key(EMPTY);
+            range.setEnd_key(toBytes("zzzzz"));
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
             List<KeySlice> getRangeSlices =
-                cs.get_range_slices(columnParent, p, range, 
ConsistencyLevel.ONE);
+                cs.get_range_slices(columnParent, p, range, 
READ_CONSISTENCY_LEVEL);
             return flattenSuper(getRangeSlices);
         }
         catch (Exception e) {
@@ -285,16 +299,16 @@
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
             SlicePredicate p = new SlicePredicate();
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             KeyRange range = new KeyRange(ROW_LIMIT);
-            range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
-            range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
+            range.setStart_key(toBytes(rowKey));
+            range.setEnd_key(toBytes(rowKey));
             List<KeySlice> getRangeSlices;
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            getRangeSlices = cs.get_range_slices(columnParent, p, range, 
ConsistencyLevel.ONE);
+            getRangeSlices = cs.get_range_slices(columnParent, p, range, 
READ_CONSISTENCY_LEVEL);
             Map<String, Map<String, Map<String, byte[]>>> flattenSuper = 
flattenSuper(getRangeSlices);
             Set<Entry<String, Map<String, Map<String, byte[]>>>> entrySet = 
flattenSuper.entrySet();
             for (Entry<String, Map<String, Map<String, byte[]>>> first : 
entrySet) {
@@ -318,16 +332,16 @@
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
             SlicePredicate p = new SlicePredicate();
-            SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], 
false, COLUMN_LIMIT);
+            SliceRange sliceRange = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             p.setSlice_range(sliceRange);
 
             KeyRange range = new KeyRange(ROW_LIMIT);
-            range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
-            range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
+            range.setStart_key(toBytes(rowKey));
+            range.setEnd_key(toBytes(rowKey));
             List<KeySlice> getRangeSlices;
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            getRangeSlices = cs.get_range_slices(columnParent, p, range, 
ConsistencyLevel.ONE);
+            getRangeSlices = cs.get_range_slices(columnParent, p, range, 
READ_CONSISTENCY_LEVEL);
             Map<String, Map<String, Map<String, String>>> flattenSuper = 
flattenStringSuper(getRangeSlices);
             Set<Entry<String, Map<String, Map<String, String>>>> entrySet = 
flattenSuper.entrySet();
             for (Entry<String, Map<String, Map<String, String>>> first : 
entrySet) {
@@ -350,11 +364,16 @@
         throws CassandraException {
         try {
             ColumnPath columnPath = new ColumnPath(columnFamilyName);
-            
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
-            columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
+            columnPath.setSuper_column(toBytes(superColumnName));
+            columnPath.setColumn(toBytes(columnName));
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            ColumnOrSuperColumn columnOrSuperColumn = 
cs.get(rowKey.getBytes(), columnPath, ConsistencyLevel.ONE);
+
+            // Perform read-repair
+            cs.get(toBytes(rowKey), columnPath, READ_CONSISTENCY_LEVEL);
+
+            // Now read
+            ColumnOrSuperColumn columnOrSuperColumn = cs.get(toBytes(rowKey), 
columnPath, READ_CONSISTENCY_LEVEL);
             byte[] value = columnOrSuperColumn.getColumn().getValue();
             return value;
         }
@@ -375,7 +394,7 @@
         try {
             byte[] value = getValue(columnFamilyName, rowKey, superColumnName, 
columnName);
             if (value != null) {
-                return new String(value, DEFAULT_CHARSET);
+                return toString(value);
             }
             else {
                 return null;
@@ -397,19 +416,19 @@
         throws CassandraException {
         try {
             ColumnParent columnParent = new ColumnParent(columnFamilyName);
-            
columnParent.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+            columnParent.setSuper_column(toBytes(superColumnName));
 
             // read entire row
-            SliceRange range = new SliceRange(new byte[0], new byte[0], false, 
COLUMN_LIMIT);
+            SliceRange range = new SliceRange(EMPTY, EMPTY, false, 
COLUMN_LIMIT);
             SlicePredicate predicate = new SlicePredicate();
             predicate.setSlice_range(range);
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
             List<ColumnOrSuperColumn> slice =
-                cs.get_slice(rowKey.getBytes(), columnParent, predicate, 
ConsistencyLevel.ONE);
+                cs.get_slice(toBytes(rowKey), columnParent, predicate, 
READ_CONSISTENCY_LEVEL);
             Map<String, byte[]> result = new HashMap<String, byte[]>();
             for (ColumnOrSuperColumn columnOrSuperColumn : slice) {
-                String name = new 
String(columnOrSuperColumn.getColumn().getName(), DEFAULT_CHARSET);
+                String name = 
toString(columnOrSuperColumn.getColumn().getName());
                 byte[] value = columnOrSuperColumn.getColumn().getValue();
                 result.put(name, value);
             }
@@ -434,7 +453,7 @@
             if (byteValues != null) {
                 Map<String, String> stringValues = new HashMap<String, 
String>();
                 for (String key : byteValues.keySet()) {
-                    stringValues.put(key, new String(byteValues.get(key), 
"UTF-8"));
+                    stringValues.put(key, toString(byteValues.get(key)));
                 }
                 return stringValues;
             }
@@ -459,11 +478,11 @@
         try {
             long timestamp = System.currentTimeMillis();
             ColumnParent column_parent = new ColumnParent(columnFamilyName);
-            
column_parent.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+            column_parent.setSuper_column(toBytes(superColumnName));
             Column column = new Column(columnName.getBytes(), value, 
timestamp);
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            cs.insert(rowKey.getBytes(DEFAULT_CHARSET), column_parent, column, 
ConsistencyLevel.ONE);
+            cs.insert(rowKey.getBytes(DEFAULT_CHARSET), column_parent, column, 
WRITE_CONSISTENCY_LEVEL);
         }
         catch (Exception e) {
             if (!(e instanceof RuntimeException)) {
@@ -477,13 +496,50 @@
         }
     }
 
+    // Since AMDATU-137 we know that Cassandra handles write operations 
asynchronously. This method is a convenience method, that
+    // significantly increases the probability that on a standalone 
installation with a single thread getValue() will return the value
+    // that has previously been set using setValue()
+    public void setValueSynchronously(String columnFamilyName, String rowKey, 
String superColumnName, String columnName, byte[] value)
+        throws CassandraException {
+        int retry = 0;
+        boolean success = false;
+        while (retry < MAX_RETRIES && !success) {
+            // First asynchronously set the value
+            setValue(columnFamilyName, rowKey, superColumnName, columnName, 
value);
+            
+            // Now verify the result
+            byte[] persistentByteValue = getValue(columnFamilyName, rowKey, 
superColumnName, columnName);
+            success = Arrays.equals(value, persistentByteValue);
+            if (!success) {
+                try {
+                    Thread.sleep(10);
+                }
+                catch (InterruptedException e) {
+                }
+            }
+            retry++;
+        }
+        if (!success) {
+            try {
+                String errorMsg = "setValueSynchronously failed for 
ColumnFamily '" + columnFamilyName + "', rowKey '" + rowKey
+                        + "', SuperColumn '" + superColumnName + "', 
columnName '" + columnName + "', value '"
+                        + toString(value) + "'. See 
http://jira.amdatu.org/jira/browse/AMDATU-137. Failed to correct the error " 
+                        + "after retrying " + (retry - 1) + " times.";
+                throw new CassandraException(errorMsg);
+            }
+            catch (UnsupportedEncodingException e) {
+                m_logService.log(LogService.LOG_ERROR, "Could not write 
setValueSynchronously failure message", e);
+            }
+        }
+    }
+
     public void setStringValue(String columnFamilyName, String rowKey, String 
superColumnName, String columnName,
         String value) throws CassandraException {
         try {
             byte[] bytes = null;
 
             if (value != null) {
-                bytes = value.getBytes(DEFAULT_CHARSET);
+                bytes = toBytes(value);
             }
             setValue(columnFamilyName, rowKey, superColumnName, columnName, 
bytes);
         }
@@ -520,14 +576,14 @@
             long timestamp = System.currentTimeMillis();
             ColumnPath columnPath = new ColumnPath(columnFamilyName);
             if (superColumnName != null) {
-                
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+                columnPath.setSuper_column(toBytes(superColumnName));
             }
             if (columnName != null) {
-                columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
+                columnPath.setColumn(toBytes(columnName));
             }
             Iface cs = m_daemonService.getCassandraServer();
             cs.set_keyspace(m_keyspace);
-            cs.remove(rowKey.getBytes(), columnPath, timestamp, 
ConsistencyLevel.ONE);
+            cs.remove(rowKey.getBytes(), columnPath, timestamp, 
WRITE_CONSISTENCY_LEVEL);
         }
         catch (Exception e) {
             if (!(e instanceof RuntimeException)) {
@@ -640,11 +696,11 @@
             p.setColumn_names(columnNames);
 
             KeyRange range = new KeyRange(1);
-            range.setStart_key(key.getBytes(DEFAULT_CHARSET));
-            range.setEnd_key(key.getBytes(DEFAULT_CHARSET));
+            range.setStart_key(toBytes(key));
+            range.setEnd_key(toBytes(key));
             Iface cs = m_daemonService.getCassandraServer();
             List<ColumnOrSuperColumn> getSlice =
-                cs.get_slice(key.getBytes(DEFAULT_CHARSET), columnParent, p, 
ConsistencyLevel.ONE);
+                cs.get_slice(toBytes(key), columnParent, p, 
READ_CONSISTENCY_LEVEL);
 
             // byte[][] result = new byte[getSlice.size()][0];
             List<byte[]> result = new ArrayList<byte[]>(getSlice.size());
@@ -666,7 +722,7 @@
         Map<String, Map<String, Map<String, byte[]>>> result = new 
HashMap<String, Map<String, Map<String, byte[]>>>();
         try {
             for (KeySlice slice : getRangeSlices) {
-                String key = new String(slice.getKey(), DEFAULT_CHARSET);
+                String key = toString(slice.getKey());
                 Map<String, Map<String, byte[]>> value = new HashMap<String, 
Map<String, byte[]>>();
                 List<ColumnOrSuperColumn> columns = slice.getColumns();
                 for (ColumnOrSuperColumn column : columns) {
@@ -674,10 +730,10 @@
                     List<Column> columns2 = superColumn.getColumns();
                     Map<String, byte[]> subvalue = new HashMap<String, 
byte[]>();
                     for (Column column3 : columns2) {
-                        String name = new String(column3.getName(), 
DEFAULT_CHARSET);
+                        String name = toString(column3.getName());
                         subvalue.put(name, column3.getValue());
                     }
-                    value.put(new String(superColumn.getName(), 
DEFAULT_CHARSET), subvalue);
+                    value.put(toString(superColumn.getName()), subvalue);
                 }
 
                 result.put(key, value);
@@ -694,7 +750,7 @@
         Map<String, Map<String, Map<String, String>>> result = new 
HashMap<String, Map<String, Map<String, String>>>();
         try {
             for (KeySlice slice : getRangeSlices) {
-                String key = new String(slice.getKey(), DEFAULT_CHARSET);
+                String key = toString(slice.getKey());
                 Map<String, Map<String, String>> value = new HashMap<String, 
Map<String, String>>();
                 List<ColumnOrSuperColumn> columns = slice.getColumns();
                 for (ColumnOrSuperColumn column : columns) {
@@ -702,10 +758,10 @@
                     List<Column> columns2 = superColumn.getColumns();
                     Map<String, String> subvalue = new HashMap<String, 
String>();
                     for (Column column3 : columns2) {
-                        String name = new String(column3.getName(), 
DEFAULT_CHARSET);
-                        subvalue.put(name, new String(column3.getValue(), 
DEFAULT_CHARSET));
+                        String name = toString(column3.getName());
+                        subvalue.put(name, toString(column3.getValue()));
                     }
-                    value.put(new String(superColumn.getName(), 
DEFAULT_CHARSET), subvalue);
+                    value.put(toString(superColumn.getName()), subvalue);
                 }
 
                 result.put(key, value);
@@ -717,4 +773,16 @@
 
         return result;
     }
+
+    private byte[] toBytes(String value) throws UnsupportedEncodingException {
+        return value.getBytes(DEFAULT_CHARSET);
+    }
+
+    private String toString(byte[] bytes) throws UnsupportedEncodingException {
+        return new String(bytes, DEFAULT_CHARSET);
+    }
+
+    public String toString() {
+        return "CassandraPersistenceManager for keyspace '" + m_keyspace + "'";
+    }
 }

Modified: 
trunk/platform-bundles/shindig-application/src/main/java/org/amdatu/platform/shindig/application/service/ShindigRegistrationServiceImpl.java
==============================================================================
--- 
trunk/platform-bundles/shindig-application/src/main/java/org/amdatu/platform/shindig/application/service/ShindigRegistrationServiceImpl.java
        (original)
+++ 
trunk/platform-bundles/shindig-application/src/main/java/org/amdatu/platform/shindig/application/service/ShindigRegistrationServiceImpl.java
        Fri Oct 29 14:19:56 2010
@@ -95,7 +95,7 @@
     private volatile SocialApiModule m_socialApiModule;
     private volatile OAuthModule m_oAuthModule;
     private volatile ConfigurationAdminGuiceModule 
m_shindigConfigurationModule;
-    
+
     // Other instance variables
     private boolean m_jmxInitialized;
     private ServletContext m_servletContext = null;
@@ -173,6 +173,7 @@
         // since the Shindig implementation requires this (uses new File(...) 
to load it).
         URL tokenUrl = 
m_bundleContext.getBundle().getResource("/conf/securitytokenkey.txt");
         String confDir = System.getProperty("user.dir") + File.separator + 
"conf";
+        new File(confDir).mkdir();
         String targetFile = confDir + File.separator + "securitytokenkey.txt";
         InputStream is = null;
         FileOutputStream fos = null;
@@ -187,14 +188,18 @@
                 }
             } finally {
                 try {
-                    is.close();
+                    if (is != null) {
+                        is.close();
+                    }
                 } finally {
-                    fos.close();
+                    if (fos != null) {
+                        fos.close();
+                    }
                 }
             } 
         } catch (IOException e) {
-           m_logService.log(LogService.LOG_ERROR, "Could not copy security 
token key file from '" +
-               tokenUrl.toString() + "' to '" + targetFile + "'", e);
+            m_logService.log(LogService.LOG_ERROR, "Could not copy security 
token key file from '" +
+                tokenUrl.toString() + "' to '" + targetFile + "'", e);
         } 
     }
 
@@ -237,13 +242,13 @@
             // Register filters
             m_authenticationServletFilter = new AuthenticationServletFilter();
             m_httpService.registerFilter(m_authenticationServletFilter, new 
String[]{
-                    GADGET_SERVLET_BASE, 
-                    MAKEREQUEST_BASE, 
-                    REST_BASE + "/*", 
-                    JSON_RPC_BASE + "/*",
-                    GADGETS_REST_BASE + "/*",
-                    WINK_REST_BASE + "/*",
-                    GADGETS_RPC_BASE + "/*"}, null, null, m_httpContext);
+                GADGET_SERVLET_BASE, 
+                MAKEREQUEST_BASE, 
+                REST_BASE + "/*", 
+                JSON_RPC_BASE + "/*",
+                GADGETS_REST_BASE + "/*",
+                WINK_REST_BASE + "/*",
+                GADGETS_RPC_BASE + "/*"}, null, null, m_httpContext);
 
             // Register static resources
             m_httpService.registerResources("/gadgets", "/files", 
m_httpContext);
@@ -344,7 +349,7 @@
 
     private String[] getGuiceModules() {
         return new String[]{
-                "org.apache.shindig.gadgets.DefaultGuiceModule",
+            "org.apache.shindig.gadgets.DefaultGuiceModule",
         "org.apache.shindig.common.cache.ehcache.EhCacheModule"};
     }
 

Reply via email to