Author: ivol
Date: Thu Oct 21 11:49:40 2010
New Revision: 198
Log:
[AMDATU-110] First working version of Cassandra 0.7 persistence layer. TODO's:
- Add integration/unit tests
- Cleanup code, remove unused libraries and other resources, etc
- Remove ServiceTracker if still used
- Improve javadoc
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/cassandra/
trunk/platform-bundles/cassandra-application/src/main/resources/cassandra/apache-cassandra-0.7.0-beta2.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/conf/cassandra.yaml
trunk/platform-bundles/cassandra-application/src/main/resources/lib/avro-1.4.0-rc4.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/guava-r05.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jetty-6.1.21.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jetty-util-6.1.21.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jug-2.0.0.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/libthrift-r959516.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/log4j-1.2.16.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/servlet-api-2.5-20081211.jar
(contents, props changed)
trunk/platform-bundles/cassandra-application/src/main/resources/lib/snakeyaml-1.6.jar
(contents, props changed)
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderListener.java
- copied, changed from r195,
/trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderServiceTracker.java
Removed:
trunk/platform-bundles/cassandra-application/src/main/resources/conf/storage-conf.xml
trunk/platform-bundles/cassandra-application/src/main/resources/lib/apache-cassandra-0.6.4.jar
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyListenerServiceImpl.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderServiceTracker.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/util/ColumnFamilyUpdate.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/util/ColumnFamilyUpdateHandler.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/util/DOMDocumentHandler.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/util/DOMDocumentParser.java
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/IntegrationTestRunner.java
trunk/integration-tests/src/test/java/org/amdatu/test/integration/TenantManagementServiceTest.java
trunk/integration-tests/src/test/java/org/amdatu/test/integration/UserAdminStoreTest.java
trunk/platform-bundles/cassandra-application/pom.xml
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraConfigurationService.java
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraDaemonService.java
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraConfigurationServiceImpl.java
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/osgi/Activator.java
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/CassandraDaemonServiceListener.java
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
trunk/platform-bundles/filebased-configuration/src/main/resources/conf/org.amdatu.platform.cassandra.application.cfg
trunk/platform-bundles/useradmin-cassandra-store/src/main/java/org/amdatu/platform/useradmin/store/cassandra/service/CassandraStorageProvider.java
trunk/pom.xml
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/IntegrationTestRunner.java
==============================================================================
---
trunk/integration-tests/src/test/java/org/amdatu/test/integration/IntegrationTestRunner.java
(original)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/IntegrationTestRunner.java
Thu Oct 21 11:49:40 2010
@@ -37,16 +37,21 @@
abstract class IntegrationTestRunner {
public final static String TEST_PREFIX = "> TESTING: ";
+ private static final int FRAMEWORK_STARTUP_WAIT = 10000;
+
+ private final static String MARKER = "*****************";
+
public Option[] configure() {
- // First get our own JAR
- FileFilter ff = new FileFilter (){
+ // First get our own JAR
+ FileFilter ff = new FileFilter() {
public boolean accept(File pathname) {
- return
pathname.getName().startsWith("org.amdatu.platform.integration-tests-")
- && pathname.getName().endsWith(".jar");
+ return
pathname.getName().startsWith("org.amdatu.platform.integration-tests-")
+ && pathname.getName().endsWith(".jar");
}
};
File integrationTestJarFile = new File("target").listFiles(ff)[0];
- System.out.println("> TESTING: Deploy integration test file: '" +
integrationTestJarFile.getAbsolutePath() + "'");
+ System.out.println("> TESTING: Deploy integration test file: '" +
integrationTestJarFile.getAbsolutePath()
+ + "'");
return options(
mavenConfiguration(),
@@ -57,21 +62,30 @@
// Setting this system property unfortunately is necessary with
the current Cassandra implementation
systemProperty("org.osgi.framework.system.packages.extra").value("sun.misc,com.sun.management"),
+ // Enable this line to allow a remote debugger to attach to the VM
in which Pax Exam runs
+ // new
VMOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"),
+
// Install bundles we need to execute our test
provision(
mavenBundle().groupId("org.osgi").artifactId("org.osgi.compendium").versionAsInProject(),
-
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager").versionAsInProject(),
-
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin").versionAsInProject(),
+
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.dependencymanager")
+ .versionAsInProject(),
+
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.configadmin")
+ .versionAsInProject(),
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.log").versionAsInProject(),
mavenBundle().groupId("org.ops4j.pax.web").artifactId("pax-web-jetty-bundle").versionAsInProject(),
mavenBundle().groupId("org.ops4j.pax.web").artifactId("pax-web-jsp").versionAsInProject(),
-
mavenBundle().groupId("org.ops4j.pax.useradmin").artifactId("pax-useradmin-service").versionAsInProject(),
-
mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.mime").versionAsInProject(),
-
mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.osgi").versionAsInProject(),
+
mavenBundle().groupId("org.ops4j.pax.useradmin").artifactId("pax-useradmin-service")
+ .versionAsInProject(),
+
mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.mime")
+ .versionAsInProject(),
+
mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.osgi")
+ .versionAsInProject(),
+
mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.scr").versionAsInProject(),
// Amdatu platform bundles
// TODO: this works fine when running 'mvn install' since the
artifacts will then be deployed to the maven
- // repository prior to running this integration test. With
'mvn integration-test' however, artifacts will
+ // repository prior to running this integration test. With
'mvn integration-test' however, artifacts will
// not be deployed and so this will only work when artifacts
have been deployed to the maven repository
// before, possibly using outdated artifacts.
mavenBundle().groupId("org.amdatu.platform").artifactId("filebased-configuration").versionAsInProject(),
@@ -80,12 +94,35 @@
mavenBundle().groupId("org.amdatu.platform").artifactId("cassandra-application").versionAsInProject(),
mavenBundle().groupId("org.amdatu.platform").artifactId("shindig-application").versionAsInProject(),
mavenBundle().groupId("org.amdatu.platform").artifactId("cassandra-listener").versionAsInProject(),
-
mavenBundle().groupId("org.amdatu.platform").artifactId("cassandra-persistencemanager").versionAsInProject(),
+
mavenBundle().groupId("org.amdatu.platform").artifactId("cassandra-persistencemanager")
+ .versionAsInProject(),
mavenBundle().groupId("org.amdatu.platform").artifactId("tenant-service").versionAsInProject(),
-
mavenBundle().groupId("org.amdatu.platform").artifactId("useradmin-cassandra-store").versionAsInProject(),
+
mavenBundle().groupId("org.amdatu.platform").artifactId("useradmin-cassandra-store")
+ .versionAsInProject(),
mavenBundle().groupId("org.amdatu.platform").artifactId("wink-application").versionAsInProject(),
-
+
// And finally deploy ourselves
bundle(integrationTestJarFile.toURI().toString())));
}
+
+ public void runTest(String testName) {
+ try {
+ // We need to sleep for some time since we need to make sure that
the framework is 'finished'
+ // starting. If we continue immediately the framework is stopped
immediately after finishing
+ // this test, but at that time it is still busy starting up which
will cause IllegalStateException
+ // errors and "Invalid BundleContext" error messages, and as a
result the container might fail
+ // to shutdown properly preventing further integration tests to
run properly.
+ System.out.println(TEST_PREFIX + MARKER + " Waiting for framework
to complete startup ("
+ + FRAMEWORK_STARTUP_WAIT / 10 + " seconds) " + MARKER);
+ Thread.sleep(FRAMEWORK_STARTUP_WAIT);
+ }
+ catch (InterruptedException e) {}
+
+ // Now run the test
+ System.out.println(TEST_PREFIX + MARKER + " Starting integration
test: " + testName + " " + MARKER);
+ run();
+ System.out.println(TEST_PREFIX + MARKER + " Finished integration
test: " + testName + " " + MARKER);
+ }
+
+ abstract void run();
}
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/TenantManagementServiceTest.java
==============================================================================
---
trunk/integration-tests/src/test/java/org/amdatu/test/integration/TenantManagementServiceTest.java
(original)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/TenantManagementServiceTest.java
Thu Oct 21 11:49:40 2010
@@ -55,18 +55,11 @@
@Test
public void testTenantManagementService() {
- System.out.println(TEST_PREFIX + "************ Running Tenant
Management Service test ************");
- run();
- System.out.println(TEST_PREFIX + "************ Finished Tenant
Management Service test ************");
+ super.runTest("Tenant Management Service");
}
public void run() {
- // TODO: in the integration test we should use the regular
TenantManagementService. With the current Cassandra
- // implementation, this however cannot work because of the way the
dynamic registration of ColumnFamily's was
- // implemented. However, since this will be replaced by Cassandra 0.7
which supports dynamic registration of
- // keyspaces and columnfamily's this will be fixed in future releases.
Until that, we can use this approach.
-
- // First we register a new Mocked TenantDAO service and register it
with a higher service rank then the default
+ // First we register a new in-memory TenantDAO service and register it
with a higher service rank then the default
// Cassandra DAO such that our test framework will pick it up
DependencyManager depMgr = new DependencyManager(m_bundleContext);
Hashtable<String, Integer> ht = new Hashtable<String, Integer>();
Modified:
trunk/integration-tests/src/test/java/org/amdatu/test/integration/UserAdminStoreTest.java
==============================================================================
---
trunk/integration-tests/src/test/java/org/amdatu/test/integration/UserAdminStoreTest.java
(original)
+++
trunk/integration-tests/src/test/java/org/amdatu/test/integration/UserAdminStoreTest.java
Thu Oct 21 11:49:40 2010
@@ -37,43 +37,44 @@
/**
* This class provides an integration test for testing the User Admin store.
+ *
* @author ivol
*/
@RunWith(JUnit4TestRunner.class)
public class UserAdminStoreTest extends IntegrationTestRunner {
@Inject
protected BundleContext m_bundleContext;
-
+
private UserAdmin m_userAdmin;
@Configuration
public Option[] configure() {
return super.configure();
}
-
+
@Test
- public void testUserAdminStore() {
- System.out.println(TEST_PREFIX + "************ Running User Admin
Store test ************");
- new UserAdminStoreTest().run(m_bundleContext);
- System.out.println(TEST_PREFIX + "************ Finished User Admin
Store test ************");
+ public void testTenantManagementService() {
+ super.runTest("User Admin Store");
}
-
+
@SuppressWarnings("unchecked")
- public void run(BundleContext bundleContext) {
- ServiceTracker serviceTracker = new ServiceTracker(bundleContext,
UserAdmin.class.getName(), null);
+ public void run() {
+ ServiceTracker serviceTracker = new ServiceTracker(m_bundleContext,
UserAdmin.class.getName(), null);
serviceTracker.open();
try {
- System.out.println(TEST_PREFIX + "Waiting for UserAdmin service to
become available (timeout = 30 seconds)");
+ System.out
+ .println(TEST_PREFIX + "Waiting for UserAdmin service to
become available (timeout = 30 seconds)");
serviceTracker.waitForService(30000);
+ System.out.println(TEST_PREFIX + "UserAdmin service available");
}
catch (InterruptedException e) {
e.printStackTrace();
Assert.fail(TEST_PREFIX + "UserAdmin service not available: " +
e.toString());
}
-
+
// First get the UserAdmin service
- ServiceReference servRef =
bundleContext.getServiceReference(UserAdmin.class.getName());
- m_userAdmin = (UserAdmin) bundleContext.getService(servRef);
+ ServiceReference servRef =
m_bundleContext.getServiceReference(UserAdmin.class.getName());
+ m_userAdmin = (UserAdmin) m_bundleContext.getService(servRef);
try {
// Start the test, first remove all existing roles
@@ -82,12 +83,14 @@
for (Role role : allRoles) {
String roleName = role.getName();
m_userAdmin.removeRole(roleName);
- Assert.assertTrue("Role '" + roleName + "' removed but
still returned", m_userAdmin.getRole(roleName) == null);
+ Assert.assertTrue("Role '" + roleName + "' removed but
still returned", m_userAdmin
+ .getRole(roleName) == null);
}
}
// Test if all roles have been removed
- Assert.assertTrue("All roles were removed, but getRoles() still
returns roles", m_userAdmin.getRoles(null) == null
+ Assert.assertTrue("All roles were removed, but getRoles() still
returns roles",
+ m_userAdmin.getRoles(null) == null
|| m_userAdmin.getRoles(null).length == 0);
// Create three test users and set the credentials
@@ -102,7 +105,8 @@
guestUser.getCredentials().put("password", "guestpasswd");
// Now see if we can find these users from credentials
- Assert.assertTrue("TestAdmin user could not be found",
m_userAdmin.getUser("name", "TestAdministrator") != null);
+ Assert.assertTrue("TestAdmin user could not be found",
+ m_userAdmin.getUser("name", "TestAdministrator") != null);
Assert.assertTrue("TestEditor user could not be found",
m_userAdmin.getUser("name", "TestEditor") != null);
Assert.assertTrue("TestGuest user could not be found",
m_userAdmin.getUser("name", "TestGuest") != null);
@@ -111,14 +115,16 @@
adminUser.getProperties().put("lastName", "Doe".getBytes());
// And check if they were set correctly
- byte[] firstName = (byte[]) m_userAdmin.getUser("name",
"TestAdministrator").getProperties().get("firstName");
+ byte[] firstName =
+ (byte[]) m_userAdmin.getUser("name",
"TestAdministrator").getProperties().get("firstName");
Assert.assertTrue("firstName not properly set", new
String(firstName, "UTF-8").equals("John"));
byte[] lastName = (byte[]) m_userAdmin.getUser("name",
"TestAdministrator").getProperties().get("lastName");
Assert.assertTrue("lastName not properly set", new
String(lastName, "UTF-8").equals("Doe"));
// Remove last name
adminUser.getProperties().remove("lastName");
- Assert.assertTrue("lastName not properly removed from property
set", m_userAdmin.getUser("name", "TestAdministrator")
+ Assert.assertTrue("lastName not properly removed from property
set", m_userAdmin.getUser("name",
+ "TestAdministrator")
.getProperties().get("lastName") == null);
// Now create "Administrator", "Editor" and "Guest" group and do
some assignments
@@ -127,10 +133,23 @@
Group guestGroup = (Group)
m_userAdmin.createRole("TestGuestUsers", Role.GROUP);
adminGroup.addMember(adminUser);
adminGroup.addRequiredMember(adminUser);
+
+ // Excessive test of TestEditUsers group since this failed before
+ assertBasicMemberCount("TestEditUsers", 0);
+ assertRequiredMemberCount("TestEditUsers", 0);
editorGroup.addMember(adminUser);
+ assertBasicMemberCount("TestEditUsers", 1);
+ assertRequiredMemberCount("TestEditUsers", 0);
editorGroup.addMember(editorUser);
+ assertBasicMemberCount("TestEditUsers", 2);
+ assertRequiredMemberCount("TestEditUsers", 0);
editorGroup.addRequiredMember(adminUser);
+ assertBasicMemberCount("TestEditUsers", 2);
+ assertRequiredMemberCount("TestEditUsers", 1);
editorGroup.addRequiredMember(editorUser);
+ assertBasicMemberCount("TestEditUsers", 2);
+ assertRequiredMemberCount("TestEditUsers", 2);
+
guestGroup.addMember(adminUser);
guestGroup.addMember(editorUser);
guestGroup.addMember(guestUser);
@@ -148,8 +167,7 @@
// Remove the editor user, is it removed from the admin group?
m_userAdmin.removeRole("TestEditor");
- Assert.assertTrue("EditUsers should have had 2 basic members",
((Group) m_userAdmin.getRole("TestEditUsers"))
- .getMembers().length == 2);
+ Assert.assertTrue("TestEditor should have been removed",
m_userAdmin.getRole("TestEditor") == null);
// Finally, remove all test users and groups
allRoles = m_userAdmin.getRoles(null);
@@ -157,10 +175,12 @@
for (Role role : allRoles) {
String roleName = role.getName();
m_userAdmin.removeRole(roleName);
- Assert.assertTrue("Role '" + roleName + "' removed but
still returned", m_userAdmin.getRole(roleName) == null);
+ Assert.assertTrue("Role '" + roleName + "' removed but
still returned", m_userAdmin
+ .getRole(roleName) == null);
}
}
- } catch (InvalidSyntaxException e) {
+ }
+ catch (InvalidSyntaxException e) {
Assert.fail(TEST_PREFIX + "An error has occurred: " +
e.toString());
e.printStackTrace();
}
@@ -171,7 +191,10 @@
}
private void assertBasicMemberCount(String group, int expected) {
- int count = ((Group) m_userAdmin.getRole(group)).getMembers().length;
+ Assert.assertTrue("Group '" + group + "' is unknown by UserAdmin",
m_userAdmin.getRole(group) != null);
+ int count =
+ ((Group) m_userAdmin.getRole(group)).getMembers() != null ?
((Group) m_userAdmin.getRole(group))
+ .getMembers().length : 0;
if (count != expected) {
Role[] members = ((Group) m_userAdmin.getRole(group)).getMembers();
String sMembers = "";
@@ -184,18 +207,18 @@
}
private void assertRequiredMemberCount(String group, int expected) {
+ Assert.assertTrue("Group '" + group + "' is unknown by UserAdmin",
m_userAdmin.getRole(group) != null);
int count =
((Group) m_userAdmin.getRole(group)).getRequiredMembers() != null
? ((Group) m_userAdmin.getRole(group))
- .getRequiredMembers().length
- : 0;
- if (count != expected) {
- Role[] members = ((Group)
m_userAdmin.getRole(group)).getRequiredMembers();
- String sMembers = "";
- for (Role role : members) {
- sMembers += role.getName() + " ";
- }
- Assert.assertTrue("Group '" + group + "' has " + count
+ " required members. Expected: " + expected
- + ". Members found: " + sMembers, false);
- }
+ .getRequiredMembers().length : 0;
+ if (count != expected) {
+ Role[] members = ((Group)
m_userAdmin.getRole(group)).getRequiredMembers();
+ String sMembers = "";
+ for (Role role : members) {
+ sMembers += role.getName() + " ";
+ }
+ Assert.assertTrue("Group '" + group + "' has " + count + "
required members. Expected: " + expected
+ + ". Members found: " + sMembers, false);
+ }
}
}
Modified: trunk/platform-bundles/cassandra-application/pom.xml
==============================================================================
--- trunk/platform-bundles/cassandra-application/pom.xml (original)
+++ trunk/platform-bundles/cassandra-application/pom.xml Thu Oct 21
11:49:40 2010
@@ -21,12 +21,12 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra</artifactId>
- <version>0.6.1</version>
+ <version>${cassandra.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>1.2.15</version>
+ <version>1.2.16</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -68,71 +68,43 @@
<Bundle-SymbolicName>org.amdatu.platform.cassandra-application</Bundle-SymbolicName>
<Include-Resource>
{maven-resources},
- lib=src/main/resources/lib
+ lib=src/main/resources/lib,
+
@src/main/resources/cassandra/apache-cassandra-${cassandra.version}.jar
</Include-Resource>
<Bundle-ClassPath>
.,
lib/antlr-3.1.3.jar,
- lib/apache-cassandra-0.6.4.jar,
- lib/avro-1.2.0-dev.jar,
+ lib/avro-1.4.0-rc4.jar,
lib/clhm-production.jar,
lib/commons-cli-1.1.jar,
lib/commons-codec-1.2.jar,
lib/commons-collections-3.2.1.jar,
lib/commons-lang-2.4.jar,
lib/google-collections-1.0.jar,
- lib/hadoop-core-0.20.1.jar,
+ lib/guava-r05.jar,
lib/high-scale-lib.jar,
- lib/ivy-2.1.0.jar,
lib/jackson-core-asl-1.4.0.jar,
lib/jackson-mapper-asl-1.4.0.jar,
- lib/jline-0.9.94.jar,
+ lib/jetty-6.1.21.jar,
+ lib/jetty-util-6.1.21.jar,
lib/json-simple-1.1.jar,
- lib/libthrift-r917130.jar,
- lib/log4j-1.2.15.jar,
+ lib/jug-2.0.0.jar,
+ lib/libthrift-r959516.jar,
+ lib/log4j-1.2.16.jar,
+ lib/servlet-api-2.5-20081211.jar,
lib/slf4j-api-1.5.8.jar,
- lib/slf4j-log4j12-1.5.8.jar
+ lib/slf4j-log4j12-1.5.8.jar,
+ lib/snakeyaml-1.6.jar
</Bundle-ClassPath>
- <DynamicImport-Package>
- com.jcraft.jsch,
- com.sun.jdmk.comm,
- com.thoughtworks.paranamer,
- javax.mail,
- javax.mail.internet,
- joptsimple,
- junit.framework,
- javax.jms,
- org.znerd.xmlenc,
- org.apache.commons.httpclient,
- org.apache.commons.httpclient.auth,
- org.apache.commons.httpclient.methods,
- org.apache.commons.httpclient.params,
- org.apache.commons.logging,
- org.apache.commons.logging.impl,
- org.apache.commons.net.ftp,
- org.apache.commons.vfs,
- org.apache.commons.vfs.impl,
- org.apache.jasper.runtime,
- org.apache.oro.text,
- org.apache.oro.text.regex,
- org.apache.tools.ant,
- org.apache.tools.ant.filters,
- org.apache.tools.ant.taskdefs,
- org.apache.tools.ant.types,
- org.apache.tools.ant.util,
- org.jets3t.service,
- org.jets3t.service.impl.rest.httpclient,
- org.jets3t.service.model,
- org.jets3t.service.security,
- org.joda.time,
- org.kosmix.kosmosfs.access,
- </DynamicImport-Package>
+ <Import-Package>
+ *;resolution:=optional
+ </Import-Package>
<_exportcontents>
org.apache.cassandra.*,
org.apache.thrift.*,
org.amdatu.platform.cassandra.application.*
</_exportcontents>
- </instructions>
+ </instructions>
</configuration>
</plugin>
Modified:
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraConfigurationService.java
==============================================================================
---
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraConfigurationService.java
(original)
+++
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraConfigurationService.java
Thu Oct 21 11:49:40 2010
@@ -18,14 +18,15 @@
/**
* Interface for the Cassandra Configuration Service.
+ *
* @author ivol
*/
public interface CassandraConfigurationService {
/**
* The filename of the storage config xml file of Cassandra.
*/
- public static final String STORAGE_CONF_XML = "storage-conf.xml";
-
+ public static final String STORAGE_CONF_XML = "cassandra.yaml";
+
/**
* The filename of the log4j properties file used by Cassandra.
*/
@@ -40,5 +41,4 @@
* Configuration key for the working directory to use for Cassandra
*/
public static final String CONFIG_WORKDIR = "workdir";
-
}
Modified:
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraDaemonService.java
==============================================================================
---
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraDaemonService.java
(original)
+++
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/CassandraDaemonService.java
Thu Oct 21 11:49:40 2010
@@ -17,15 +17,71 @@
package org.amdatu.platform.cassandra.application;
import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.thrift.TException;
/**
* This interface defines the cassandra daemon service.
+ *
* @author ivol
*/
public interface CassandraDaemonService {
/**
* Returns the Cassandra server.
+ *
* @return the Cassandra server.
*/
CassandraServer getCassandraServer();
+
+ /**
+ * Verifies if a keyspace with the specified name exists. The check is
case-sensitive.
+ *
+ * @param keyspaceName Name of the keyspace to check its existence for
+ * @return <code>true</code> if the keyspace exists, <code>false</code>
otherwise
+ * @throws TException In case an error occurred while checking the
existence
+ */
+ boolean keyspaceExists(String keyspaceName) throws TException;
+
+ /**
+ * Adds a keyspace with the specified name and throws an
IllegalArgumentException if
+ * the keyspace already exists.
+ *
+ * @param name Name of thekeyspace to add (case-sensitive)
+ * @throws InvalidRequestException In case an error occurred while adding
the keyspace
+ * @throws TException In case an error occurred while adding the keyspace
+ */
+ void addKeyspace(String name) throws InvalidRequestException, TException;
+
+ /**
+ * Verifies if the specified keyspace contains a ColumnFamily with the
specified name.
+ * The check is case-sensitive.
+ *
+ * @param keyspaceName Name of the keyspace to check
+ * @param columnFamilyName Name of the ColumnFamily to look for
+ * @return <code>true</code> if the specified keyspace contains a
ColumnFamily with the
+ * specified name.
+ */
+ boolean columnFamilyExists(String keyspaceName, String columnFamilyName)
throws NotFoundException;
+
+ /**
+ * Adds a new ColumnFamily to the specified keyspace. If a ColumnFamily
with that name already exists
+ * an exception is thrown.
+ *
+ * @param keyspace
+ * @param cfName
+ * @param columnType
+ * @param comparatorType
+ * @param subComparatorType
+ * @throws InvalidRequestException
+ * @throws TException
+ */
+ void addColumnFamily(String keyspace, String cfName, String columnType,
String comparatorType,
+ String subComparatorType) throws InvalidRequestException, TException;
+
+ boolean isColumnFamilyChanged(String keyspace, String cfName, String
columnType, String comparatorType,
+ String subComparatorType) throws NotFoundException,
InvalidRequestException, TException;
+
+ void updateColumnFamily(String keyspace, String cfName, String columnType,
String comparatorType,
+ String subComparatorType) throws InvalidRequestException, TException;
}
Modified:
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
---
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraConfigurationServiceImpl.java
(original)
+++
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraConfigurationServiceImpl.java
Thu Oct 21 11:49:40 2010
@@ -26,27 +26,25 @@
import org.apache.log4j.PropertyConfigurator;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
import org.osgi.service.log.LogService;
/**
* This class provides utility methods to prepare Cassandra configuration
before starting it.
+ *
* @author ivol
*/
public class CassandraConfigurationServiceImpl implements
CassandraConfigurationService, ManagedService {
// Statics
- private static final String STORAGE_CONF_SOURCE = "conf/storage-conf.xml";
+ private static final String STORAGE_CONF_SOURCE = "conf/cassandra.yaml";
private static final String LOG4J_CONF_SOURCE = "conf/log4j.properties";
// Reference to the logservice
private volatile LogService m_logService;
private volatile ConfigTemplateManager m_configTemplateManager;
private volatile BundleContext m_bundleContext;
- private volatile ConfigurationAdmin m_configurationAdmin;
-
+
// Private members
private File m_workDir;
@@ -69,8 +67,9 @@
m_configTemplateManager.writeConfiguration(url,
storageConfigFile);
}
// Cassandra uses this system property to find its storage
location.
- System.setProperty("storage-config",
storageConfigFile.getParent());
- } catch (IOException e) {
+ System.setProperty("cassandra.config",
storageConfigFile.toURI().toString());
+ }
+ catch (IOException e) {
m_logService.log(LogService.LOG_ERROR, "Could not replace
configuration entries in storage-conf.xml", e);
}
@@ -81,10 +80,15 @@
m_configTemplateManager.writeConfiguration(url,
log4jPropertiesFile);
PropertyConfigurator.configure(log4jPropertiesFile.getAbsolutePath());
log4jPropertiesFile.deleteOnExit();
- } catch (IOException e) {
+ }
+ catch (IOException e) {
m_logService.log(LogService.LOG_ERROR, "Could not replace
configuration entries in storage-conf.xml", e);
}
+ // cassandra-foreground must be set to true, otherwise Cassandra will
close System err and out streams such
+ // that entries will not be visible in the console anymore.
+ System.setProperty("cassandra-foreground", "true");
+
m_logService.log(LogService.LOG_DEBUG, "Cassandra configuration
preparation completed");
}
Modified:
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/platform-bundles/cassandra-application/src/main/java/org/amdatu/platform/cassandra/application/service/CassandraDaemonServiceImpl.java
Thu Oct 21 11:49:40 2010
@@ -16,184 +16,159 @@
*/
package org.amdatu.platform.cassandra.application.service;
-import javax.management.*;
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
import org.amdatu.platform.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.avro.CassandraDaemon;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CassandraServer;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.thrift.TException;
import org.osgi.service.log.LogService;
/**
* This class provides the Cassandra daemon service.
+ *
* @author ivol
*/
public class CassandraDaemonServiceImpl implements CassandraDaemonService {
+ // The default placement strategy
+ private final String DEFAULT_PLACEMENT_STRATEGY =
"org.apache.cassandra.locator.SimpleStrategy";
+
// Service dependencies, injected by the framework
private volatile LogService m_logService;
- // Instance variables
- private TThreadPoolServer m_serverEngine;
- private TServerSocket m_tServerSocket;
+ // The CassandraDaemon cannot be stopped/started without stopping and
updating (to enforce classloader
+ // to be destroyed) this bundle. For that reason we block any attempts to
stop/start this service since
+ // that will fail.
+ private static boolean m_daemonHasShutdown = false;
+ private CassandraDaemon m_daemon = null;
private CassandraServer m_cassandraServer = null;
/**
* The init() method is invoked by the Felix dependency manager.
*/
public void init() {
+ if (m_daemonHasShutdown) {
+ throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
+ }
+
m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
initialized");
-
+
try {
- // Setup the cassandra instance
- setup();
+ // Setup the cassandra daemon
+ m_daemon = new CassandraDaemon();
m_logService.log(LogService.LOG_INFO, getClass().getName() + "
service started.");
- } catch (IOException e) {
- m_logService.log(LogService.LOG_ERROR, "An error occurred while
starting Cassandra service", e);
- } catch (TTransportException e) {
- m_logService.log(LogService.LOG_ERROR, "An error occurred while
starting Cassandra service", e);
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
m_logService.log(LogService.LOG_ERROR, "An error occurred while
starting Cassandra service", t);
}
}
- /**
- * The start() method is invoked by the Felix dependency manager.
- */
public void start() {
- m_logService.log(LogService.LOG_INFO, "Starting " +
getClass().getName() + " service");
- }
-
- /**
- * The stop() method is invoked by the Felix dependency manager.
- */
- public void stop() {
- m_logService.log(LogService.LOG_INFO, "Stopping " +
getClass().getName() + " service");
- if (m_serverEngine != null) {
- m_serverEngine.stop();
- m_serverEngine = null;
+ if (m_daemonHasShutdown) {
+ throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
}
- if (m_tServerSocket != null) {
- m_tServerSocket.close();
- m_tServerSocket = null;
- }
- m_cassandraServer = null;
-
- try {
- // Unregister cassandra MBeans
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- Set<ObjectInstance> cacheMBeans = mbs.queryMBeans(null, new
ObjectName("org.apache.cassandra.*:*"));
- for (ObjectInstance objectInstance : cacheMBeans) {
- mbs.unregisterMBean(objectInstance.getObjectName());
- }
+ m_daemon.activate();
+ m_cassandraServer = new CassandraServer();
+ m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started");
+ }
- m_logService.log(LogService.LOG_DEBUG, "Cassandra MBeans
unregistered");
- } catch (MalformedObjectNameException e) {
- m_logService.log(LogService.LOG_ERROR, "Failed to unregister
org.apache.cassandra MBeans", e);
- } catch (MBeanRegistrationException e) {
- m_logService.log(LogService.LOG_ERROR, "Failed to unregister
org.apache.cassandra MBeans", e);
- } catch (InstanceNotFoundException e) {
- m_logService.log(LogService.LOG_ERROR, "Failed to unregister
org.apache.cassandra MBeans", e);
- } catch (NullPointerException e) {
- m_logService.log(LogService.LOG_ERROR, "Failed to unregister
org.apache.cassandra MBeans", e);
- }
- m_logService.log(LogService.LOG_INFO, "Service " +
getClass().getName() + " stopped");
+ public void stop() {
+ m_daemon.deactivate();
+ m_daemonHasShutdown = true;
+ m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
}
- /**
- * The destroy() method is invoked by the Felix dependency manager.
- */
public void destroy() {
- m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
destroyed");
m_cassandraServer = null;
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
destroyed");
}
- /**
- * Setup the Cassandra server instance.
- * @throws IOException
- * @throws TTransportException
- */
- private void setup() throws IOException, TTransportException {
- int listenPort = DatabaseDescriptor.getThriftPort();
- InetAddress listenAddr = DatabaseDescriptor.getThriftAddress();
-
- // If ThriftAddress was left completely unconfigured, then assume the
same default as ListenAddress
- if (listenAddr == null) {
- listenAddr = FBUtilities.getLocalAddress();
- }
+ public CassandraServer getCassandraServer() {
+ return m_cassandraServer;
+ }
- // initialize tables in keyspace
- for (String table : DatabaseDescriptor.getTables()) {
- m_logService.log(LogService.LOG_DEBUG, "Opening keyspace table '"
+ table + "'");
- Table.open(table);
+ public boolean keyspaceExists(String keyspaceName) throws TException {
+ List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
+ for (KsDef keyspace : keyspaces) {
+ if (keyspace.getName().equals(keyspaceName)) {
+ return true;
+ }
}
+ return false;
+ }
- // replay the log if necessary and check for compaction candidates
- CommitLog.recover();
- CompactionManager.instance.checkAllColumnFamilies();
-
- // start server internals
- StorageService.instance.initServer();
-
- // now we start listening for clients
- m_cassandraServer = new CassandraServer();
- Cassandra.Processor processor = new
Cassandra.Processor(m_cassandraServer);
-
- // Transport
- m_tServerSocket = new TServerSocket(new InetSocketAddress(listenAddr,
listenPort));
+ public void addKeyspace(String name) throws InvalidRequestException,
TException {
+ List<CfDef> empty = new ArrayList<CfDef>();
+ KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY, 1, empty);
+ m_cassandraServer.system_add_keyspace(ksDef);
+ }
- m_logService.log(LogService.LOG_INFO, String.format("Binding thrift
service to %s:%s", listenAddr, listenPort));
+ public boolean columnFamilyExists(String keyspaceName, String
columnFamilyName) throws NotFoundException {
+ KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
+ List<CfDef> cfDefs = ksDef.getCf_defs();
+ for (CfDef cfDef : cfDefs) {
+ if (cfDef.getName().equals(columnFamilyName)) {
+ return true;
+ }
+ }
+ return false;
+ }
- // Protocol factory
- TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory();
-
- // Transport factory
- TTransportFactory inTransportFactory, outTransportFactory;
- if (DatabaseDescriptor.isThriftFramed()) {
- inTransportFactory = new TFramedTransport.Factory();
- outTransportFactory = new TFramedTransport.Factory();
-
- } else {
- inTransportFactory = new TTransportFactory();
- outTransportFactory = new TTransportFactory();
- }
-
- // ThreadPool Server
- TThreadPoolServer.Options options = new TThreadPoolServer.Options();
- options.minWorkerThreads = 64;
- m_serverEngine =
- new TThreadPoolServer(new TProcessorFactory(processor),
m_tServerSocket, inTransportFactory,
- outTransportFactory, tProtocolFactory,
tProtocolFactory, options);
-
- // Remove pidfile on exit
- String pidFile = System.getProperty("cassandra-pidfile");
- if (pidFile != null) {
- new File(pidFile).deleteOnExit();
+ public void addColumnFamily(String keyspace, String cfName, String
columnType, String comparatorType,
+ String subComparatorType) throws InvalidRequestException, TException {
+ if (keyspace.equals(Table.SYSTEM_TABLE)) {
+ throw new InvalidRequestException("ColumnFamily's cannot be added
to Cassandra's system keyspace");
+ }
+ CfDef cfDef = new CfDef(keyspace, cfName);
+ cfDef.column_type = columnType;
+ cfDef.comparator_type = comparatorType;
+ cfDef.subcomparator_type = subComparatorType;
+
+ m_cassandraServer.set_keyspace(keyspace);
+ m_cassandraServer.system_add_column_family(cfDef);
+ }
+
+ public boolean isColumnFamilyChanged(String keyspace, String cfName,
String columnType, String comparatorType,
+ String subComparatorType) throws NotFoundException,
InvalidRequestException, TException {
+ m_cassandraServer.set_keyspace(keyspace);
+ KsDef ksDef = m_cassandraServer.describe_keyspace(keyspace);
+ List<CfDef> cfDefs = ksDef.getCf_defs();
+ String marshalPackage = "org.apache.cassandra.db.marshal.";
+ for (CfDef cfDef : cfDefs) {
+ if (cfDef.getName().equals(cfName)) {
+ if (!cfDef.column_type.equals(columnType)
+ ||
(!cfDef.comparator_type.equals(comparatorType)
+ &&
!cfDef.comparator_type.equals(marshalPackage + comparatorType)
+ ||
(!cfDef.subcomparator_type.equals(subComparatorType)
+ &&
!cfDef.subcomparator_type.equals(marshalPackage + subComparatorType)))) {
+ return true;
+ }
+ }
}
+ return false;
}
- public CassandraServer getCassandraServer() {
- return m_cassandraServer;
+ public void updateColumnFamily(String keyspace, String cfName, String
columnType, String comparatorType,
+ String subComparatorType) throws InvalidRequestException, TException {
+ CfDef cfDef = new CfDef(keyspace, cfName);
+ cfDef.column_type = columnType;
+ cfDef.comparator_type = comparatorType;
+ cfDef.subcomparator_type = subComparatorType;
+
+ // Cassandra does not support changes of column_type, comparator_type
or subcomparator_type in
+ // existing ColumnFamily's. The update method is intended only for
minor changes, like
+ // comment, row_cache_size, preload_row_cache and key_cache_size for
which we always use the default
+ // values
+ m_cassandraServer.set_keyspace(keyspace);
+ m_cassandraServer.system_update_column_family(cfDef);
}
}
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/cassandra/apache-cassandra-0.7.0-beta2.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/conf/cassandra.yaml
==============================================================================
--- (empty file)
+++
trunk/platform-bundles/cassandra-application/src/main/resources/conf/cassandra.yaml
Thu Oct 21 11:49:40 2010
@@ -0,0 +1,251 @@
+# Cassandra storage config YAML
+
+#NOTE !!!!!!!! NOTE
+# See http://wiki.apache.org/cassandra/StorageConfiguration for
+# full explanations of configuration directives
+#NOTE !!!!!!!! NOTE
+
+# The name of the cluster. This is mainly used to prevent machines in
+# one logical cluster from joining another.
+cluster_name: 'Amdatu Cluster'
+
+# If you are using an order-preserving partitioner and you know your key
+# distribution, you can specify the token for this node to use. (Keys
+# are sent to the node with the "closest" token, so distributing your
+# tokens equally along the key distribution space will spread keys
+# evenly across your cluster.) This setting is only checked the first
+# time a node is started.
+
+# This can also be useful with RandomPartitioner to force equal spacing
+# of tokens around the hash space, especially for clusters with a small
+# number of nodes.
+initial_token:
+
+# Set to true to make new [non-seed] nodes automatically migrate data
+# to themselves from the pre-existing nodes in the cluster. Defaults
+# to false because you can only bootstrap N machines at a time from
+# an existing cluster of N, so if you are bringing up a cluster of
+# 10 machines with 3 seeds you would have to do it in stages. Leaving
+# this off for the initial start simplifies that.
+auto_bootstrap: false
+
+# See http://wiki.apache.org/cassandra/HintedHandoff
+hinted_handoff_enabled: true
+
+# authentication backend, implementing IAuthenticator; used to identify users
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
+
+# authorization backend, implementing IAuthority; used to limit access/provide
permissions
+authority: org.apache.cassandra.auth.AllowAllAuthority
+
+# any IPartitioner may be used, including your own as long as it is on
+# the classpath. Out of the box, Cassandra provides
+# org.apache.cassandra.dht.RandomPartitioner
+# org.apache.cassandra.dht.ByteOrderedPartitioner,
+# org.apache.cassandra.dht.OrderPreservingPartitioner, and
+# org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+# (CollatingOPP colates according to EN,US rules, not naive byte
+# ordering. Use this as an example if you need locale-aware collation.)
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+
+# directories where Cassandra should store data on disk.
+data_file_directories:
+ - ${org.amdatu.platform.cassandra.application/datafiledir}
+
+# commit log
+commitlog_directory: ${org.amdatu.platform.cassandra.application/commitlogdir}
+
+# Size to allow commitlog to grow to before creating a new segment
+commitlog_rotation_threshold_in_mb: 128
+
+# commitlog_sync may be either "periodic" or "batch."
+# When in batch mode, Cassandra won't ack writes until the commit log
+# has been fsynced to disk. It will wait up to
+# CommitLogSyncBatchWindowInMS milliseconds for other writes, before
+# performing the sync.
+commitlog_sync: periodic
+
+# the other option is "timed," where writes may be acked immediately
+# and the CommitLog is simply synced every commitlog_sync_period_in_ms
+# milliseconds.
+commitlog_sync_period_in_ms: 10000
+
+# Addresses of hosts that are deemed contact points.
+# Cassandra nodes use this list of hosts to find each other and learn
+# the topology of the ring. You must change this if you are running
+# multiple nodes!
+seeds:
+ - 127.0.0.1
+
+# Access mode. mmapped i/o is substantially faster, but only practical on
+# a 64bit machine (which notably does not include EC2 "small" instances)
+# or relatively small datasets. "auto", the safe choice, will enable
+# mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only"
+# (which may allow you to get part of the benefits of mmap on a 32bit
+# machine by mmapping only index files) and "standard".
+# (The buffer size settings that follow only apply to standard,
+# non-mmapped i/o.)
+disk_access_mode: auto
+
+# Unlike most systems, in Cassandra writes are faster than reads, so
+# you can afford more of those in parallel. A good rule of thumb is 2
+# concurrent reads per processor core. Increase ConcurrentWrites to
+# the number of clients writing at once if you enable CommitLogSync +
+# CommitLogSyncDelay. -->
+concurrent_reads: 8
+concurrent_writes: 32
+
+# This sets the amount of memtable flush writer threads. These will
+# be blocked by disk io, and each one will hold a memtable in memory
+# while blocked. If you have a large heap and many data directories,
+# you can increase this value for better flush performance.
+# By default this will be set to the amount of data directories defined.
+#memtable_flush_writers: 1
+
+# Buffer size to use when performing contiguous column slices.
+# Increase this to the size of the column slices you typically perform
+sliced_buffer_size_in_kb: 64
+
+# TCP port, for commands and data
+storage_port: 7000
+
+# Address to bind to and tell other nodes to connect to. You _must_
+# change this if you want multiple nodes to be able to communicate!
+listen_address: localhost
+
+# The address to bind the Thrift RPC service to
+rpc_address: localhost
+# port for Thrift to listen on
+rpc_port: 9160
+
+# enable or disable keepalive on rpc connections
+rpc_keepalive: true
+
+# uncomment to set socket buffer sizes on rpc connections
+# rpc_send_buff_size_in_bytes:
+# rpc_recv_buff_size_in_bytes:
+
+# Frame size for thrift (maximum field length).
+# 0 disables TFramedTransport in favor of TSocket. This option
+# is deprecated; we strongly recommend using Framed mode.
+thrift_framed_transport_size_in_mb: 15
+
+# The max length of a thrift message, including all fields and
+# internal thrift overhead.
+thrift_max_message_length_in_mb: 16
+
+# Whether or not to take a snapshot before each compaction. Be
+# careful using this option, since Cassandra won't clean up the
+# snapshots for you. Mostly useful if you're paranoid when there
+# is a data format change.
+snapshot_before_compaction: false
+
+# change this to increase the compaction thread's priority. In java, 1 is the
+# lowest priority and that is our default.
+# compaction_thread_priority: 1
+
+# The threshold size in megabytes the binary memtable must grow to,
+# before it's submitted for flushing to disk.
+binary_memtable_throughput_in_mb: 256
+# The maximum time to leave a dirty memtable unflushed.
+# (While any affected columnfamilies have unflushed data from a
+# commit log segment, that segment cannot be deleted.)
+# This needs to be large enough that it won't cause a flush storm
+# of all your memtables flushing at once because none has hit
+# the size or count thresholds yet.
+# defaults to 60
+#memtable_flush_after_mins: 60
+# Size of the memtable in memory before it is flushed
+# if left undefined, 1/8 of the heap will be used
+#memtable_throughput_in_mb: 256
+# Number of objects in millions in the memtable before it is flushed
+# if left undefined, the memtable_throughput_in_mb / 64 * 0.3 will be used
+#memtable_operations_in_millions: 1.2
+
+# Add column indexes to a row after its contents reach this size.
+# Increase if your column values are large, or if you have a very large
+# number of columns. The competing causes are, Cassandra has to
+# deserialize this much of the row to read a single column, so you want
+# it to be small - at least if you do many partial-row reads - but all
+# the index data is read for each access, so you don't want to generate
+# that wastefully either.
+column_index_size_in_kb: 64
+
+# Size limit for rows being compacted in memory. Larger rows will spill
+# over to disk and use a slower two-pass compaction process. A message
+# will be logged specifying the row key.
+in_memory_compaction_limit_in_mb: 64
+
+# Time to wait for a reply from other nodes before failing the command
+rpc_timeout_in_ms: 10000
+
+# phi value that must be reached for a host to be marked down.
+# most users should never need to adjust this.
+# phi_convict_threshold: 8
+
+# endpoint_snitch -- Set this to a class that implements
+# IEndpointSnitch, which will let Cassandra know enough
+# about your network topology to route requests efficiently.
+# Out of the box, Cassandra provides
+# - org.apache.cassandra.locator.SimpleSnitch:
+# Treats Strategy order as proximity. This improves cache locality
+# when disabling read repair, which can further improve throughput.
+# - org.apache.cassandra.locator.RackInferringSnitch:
+# Proximity is determined by rack and data center, which are
+# assumed to correspond to the 3rd and 2nd octet of each node's
+# IP address, respectively
+# org.apache.cassandra.locator.PropertyFileSnitch:
+# - Proximity is determined by rack and data center, which are
+# explicitly configured in cassandra-rack.properties.
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+
+# dynamic_snitch -- This boolean controls whether the above snitch is
+# wrapped with a dynamic snitch, which will monitor read latencies
+# and avoid reading from hosts that have slowed (due to compaction,
+# for instance)
+dynamic_snitch: true
+
+# request_scheduler -- Set this to a class that implements
+# RequestScheduler, which will schedule incoming client requests
+# according to the specific policy. This is useful for multi-tenancy
+# with a single Cassandra cluster.
+# NOTE: This is specifically for requests from the client and does
+# not affect inter node communication.
+# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
+# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
+# client requests to a node with a separate queue for each
+# request_scheduler_id. The scheduler is further customized by
+# request_scheduler_options as described below.
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+
+# Scheduler Options vary based on the type of scheduler
+# NoScheduler - Has no options
+# RoundRobin
+# - throttle_limit -- The throttle_limit is the number of in-flight
+# requests per client. Requests beyond
+# that limit are queued up until
+# running requests can complete.
+# The value of 80 here is twice the number of
+# concurrent_reads + concurrent_writes.
+# - default_weight -- default_weight is optional and allows for
+# overriding the default which is 1.
+# - weights -- Weights are optional and will default to 1 or the
+# overridden default_weight. The weight translates into how
+# many requests are handled during each turn of the
+# RoundRobin, based on the scheduler id.
+#
+# request_scheduler_options:
+# throttle_limit: 80
+# default_weight: 5
+# weights:
+# Keyspace1: 1
+# Keyspace2: 5
+
+# request_scheduler_id -- An identifer based on which to perform
+# the request scheduling. Currently the only valid option is keyspace.
+# request_scheduler_id: keyspace
+
+# The Index Interval determines how large the sampling of row keys
+# is for a given SSTable. The larger the sampling, the more effective
+# the index is at the cost of space.
+index_interval: 128
\ No newline at end of file
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/avro-1.4.0-rc4.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/guava-r05.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jetty-6.1.21.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jetty-util-6.1.21.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/jug-2.0.0.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/libthrift-r959516.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/log4j-1.2.16.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/servlet-api-2.5-20081211.jar
==============================================================================
Binary file. No diff available.
Added:
trunk/platform-bundles/cassandra-application/src/main/resources/lib/snakeyaml-1.6.jar
==============================================================================
Binary file. No diff available.
Modified:
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/osgi/Activator.java
==============================================================================
---
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/osgi/Activator.java
(original)
+++
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/osgi/Activator.java
Thu Oct 21 11:49:40 2010
@@ -17,37 +17,40 @@
package org.amdatu.platform.cassandra.listener.osgi;
import org.amdatu.platform.cassandra.application.CassandraDaemonService;
+import org.amdatu.platform.cassandra.listener.ColumnFamilyProvider;
import
org.amdatu.platform.cassandra.listener.service.CassandraDaemonServiceListener;
-import
org.amdatu.platform.cassandra.listener.service.ColumnFamilyListenerServiceImpl;
+import
org.amdatu.platform.cassandra.listener.service.ColumnFamilyProviderListener;
import
org.amdatu.platform.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
import org.osgi.framework.BundleContext;
import org.osgi.service.log.LogService;
-import org.osgi.service.packageadmin.PackageAdmin;
/**
* This is the OSGi activator for this Cassandra listener bundle.
+ *
* @author ivol
*/
public class Activator extends DependencyActivatorBase {
@Override
public void init(BundleContext context, DependencyManager manager) throws
Exception {
- // Register the Cassandra listener service
- manager.add(
- createComponent()
- .setImplementation(ColumnFamilyListenerServiceImpl.class)
-
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-
.add(createServiceDependency().setService(PackageAdmin.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
-
// Register the cassandra daemon service listener
manager.add(
createComponent()
.setImplementation(CassandraDaemonServiceListener.class)
.add(createServiceDependency().setService(LogService.class).setRequired(true))
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
+
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
+
+ // Register the CassandraColumnFamilyProvider listener
+ manager
+ .add(
+ createComponent()
+ .setImplementation(ColumnFamilyProviderListener.class)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
+ .add(
+
createServiceDependency().setService(ColumnFamilyProvider.class).setCallbacks("onAdded",
"onRemoved")));
}
@Override
Modified:
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/CassandraDaemonServiceListener.java
==============================================================================
---
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/CassandraDaemonServiceListener.java
(original)
+++
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/CassandraDaemonServiceListener.java
Thu Oct 21 11:49:40 2010
@@ -16,99 +16,82 @@
*/
package org.amdatu.platform.cassandra.listener.service;
-import static
org.amdatu.platform.cassandra.listener.util.ColumnFamilyUpdateHandler.STORAGE_CONFIG_FILENAME;
-
-import java.io.File;
-import java.util.ArrayList;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import org.amdatu.platform.cassandra.application.CassandraDaemonService;
import org.amdatu.platform.cassandra.listener.ColumnFamilyAvailable;
-import org.amdatu.platform.cassandra.listener.util.ColumnFamilyUpdateHandler;
-import org.amdatu.platform.cassandra.listener.util.DOMDocumentHandler;
-import org.amdatu.platform.cassandra.listener.util.DOMDocumentParser;
import
org.amdatu.platform.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.TException;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.log.LogService;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
+/**
+ * This class listens to the Cassandra Daemon service and when this service
becomes available, it:
+ * -1- Creates and registers Persistence Manager service instances for each
available keyspace
+ * -2- Creates and registers ColumnFamilyAvailable service instances for each
available ColumnFamily
+ *
+ * @author ivol
+ */
public class CassandraDaemonServiceListener {
// Members initialized in the constructor
private volatile BundleContext m_bundleContext;
private volatile LogService m_logService;
private volatile DependencyManager m_dependencyManager;
+ private volatile CassandraDaemonService m_daemonService;
private volatile CassandraPersistenceManagerFactory m_pmFactory;
public void start() {
- synchronized (this) {
- DOMDocumentHandler handler = new DOMDocumentHandler() {
- public boolean handle(Document document) {
- // First loop over available keyspaces and register a
Cassandra Persistence Manager
- // for each available keyspace
- NodeList keyspaces =
document.getElementsByTagName(ColumnFamilyUpdateHandler.KEYSPACE);
- for (int i = 0; i < keyspaces.getLength(); i++) {
- String name =
-
keyspaces.item(i).getAttributes().getNamedItem(ColumnFamilyUpdateHandler.NAME)
- .getNodeValue();
- m_pmFactory.createCassandraPersistenceManager(name);
- }
-
- // Retrieve the current list of available column families
(by name)
- NodeList columnFamilies =
document.getElementsByTagName(ColumnFamilyUpdateHandler.COLUMN_FAMILY);
-
- List<String> availableColumnFamilies = new
ArrayList<String>();
- for (int i = 0; i < columnFamilies.getLength(); i++) {
- String name =
-
columnFamilies.item(i).getAttributes().getNamedItem(ColumnFamilyUpdateHandler.NAME)
- .getNodeValue();
- if (!availableColumnFamilies.contains(name)) {
- availableColumnFamilies.add(name);
+ try {
+ // First loop over available keyspaces and register a Cassandra
Persistence Manager
+ // for each available keyspace
+ List<KsDef> keyspaces =
m_daemonService.getCassandraServer().describe_keyspaces();
+ for (KsDef keyspace : keyspaces) {
+ String name = keyspace.getName();
+ m_pmFactory.createCassandraPersistenceManager(name);
+
+ // Now loop over all ColumnFamily's registered in this keyspace
+ List<CfDef> columnFamilies = keyspace.getCf_defs();
+ for (CfDef columnFamily : columnFamilies) {
+ try {
+ // Register ColumnFamilyAvailable services
+ ServiceReference[] servRefs =
+
m_bundleContext.getServiceReferences(ColumnFamilyAvailable.class.getName(), "("
+ + ColumnFamilyAvailable.FILTER_NAME + "=" +
columnFamily.getName() + ")");
+ if (servRefs == null || servRefs.length == 0) {
+ // Service does not yet exist, instantiate it
+ Dictionary<String, String> serviceProps = new
Hashtable<String, String>();
+
serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily.getName());
+ Component component =
m_dependencyManager.createComponent();
+
component.setImplementation(ColumnFamilyAvailableImpl.class);
+
component.setInterface(ColumnFamilyAvailable.class.getName(), serviceProps);
+
+ // Add a service dependency with the Cassandra
Daemon service such that it is destroyed
+ // automatically
+
component.add(m_dependencyManager.createServiceDependency().setService(
+
CassandraDaemonService.class).setRequired(true));
+ m_dependencyManager.add(component);
+ m_logService.log(LogService.LOG_INFO,
"ColumnFamily '" + columnFamily.getName()
+ + "' is now available");
}
- }
- // Instantiate services for each ColumnFamily available
now but was not available before
- for (String colFam : availableColumnFamilies) {
- try {
- // Register ColumnFamilyAvailable services
- ServiceReference[] servRefs =
-
m_bundleContext.getServiceReferences(ColumnFamilyAvailable.class.getName(), "("
- +
ColumnFamilyAvailable.FILTER_NAME + "=" + colFam + ")");
- if (servRefs == null || servRefs.length == 0) {
- // Service does not yet exist, instantiate it
- Dictionary<String, String> serviceProps = new
Hashtable<String, String>();
-
serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, colFam);
- Component component =
m_dependencyManager.createComponent();
-
component.setImplementation(ColumnFamilyAvailableImpl.class);
-
component.setInterface(ColumnFamilyAvailable.class.getName(), serviceProps);
-
- // Add a service dependency with the Cassandra
Daemon service such that it is destroyed
- // automatically
-
component.add(m_dependencyManager.createServiceDependency().setService(
-
CassandraDaemonService.class).setRequired(true));
- m_dependencyManager.add(component);
- m_logService.log(LogService.LOG_INFO,
"ColumnFamily '" + colFam + "' is now available");
- }
-
- } catch (InvalidSyntaxException e) {
- m_logService.log(LogService.LOG_ERROR,
- "Could not start ColumnFamilyAvailable
service for '" + colFam + "'", e);
- }
}
-
- return false;
+ catch (InvalidSyntaxException e) {
+ m_logService.log(LogService.LOG_ERROR,
+ "Could not start ColumnFamilyAvailable service for
'" + columnFamily.getName() + "'", e);
+ }
}
- };
-
- String storageConfig = System.getProperty("storage-config");
- File targetFile = new File(storageConfig + File.separator +
STORAGE_CONFIG_FILENAME);
- new DOMDocumentParser(handler, m_logService).parse(targetFile,
true);
+ }
+ }
+ catch (TException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to start
CassandraDaemonServiceListener", e);
}
}
}
Copied:
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderListener.java
(from r195,
/trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderServiceTracker.java)
==============================================================================
---
/trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderServiceTracker.java
(original)
+++
trunk/platform-bundles/cassandra-listener/src/main/java/org/amdatu/platform/cassandra/listener/service/ColumnFamilyProviderListener.java
Thu Oct 21 11:49:40 2010
@@ -16,204 +16,101 @@
*/
package org.amdatu.platform.cassandra.listener.service;
-import static
org.amdatu.platform.cassandra.listener.util.ColumnFamilyUpdateHandler.STORAGE_CONFIG_FILENAME;
-import static
org.amdatu.platform.cassandra.listener.util.ColumnFamilyUpdateHandler.STORAGE_NEW_CONFIG_FILENAME;
-
-import java.io.File;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+import org.amdatu.platform.cassandra.application.CassandraDaemonService;
import org.amdatu.platform.cassandra.listener.ColumnFamilyDefinition;
import org.amdatu.platform.cassandra.listener.ColumnFamilyProvider;
-import org.amdatu.platform.cassandra.listener.util.ColumnFamilyUpdateHandler;
-import org.amdatu.platform.cassandra.listener.util.DOMDocumentHandler;
-import org.amdatu.platform.cassandra.listener.util.DOMDocumentParser;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleException;
-import org.osgi.framework.ServiceReference;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.thrift.TException;
import org.osgi.service.log.LogService;
-import org.osgi.service.packageadmin.PackageAdmin;
-import org.osgi.util.tracker.ServiceTracker;
+
/**
* This class tracks ColumnFamilyProvider services that come available or
unavailable and acts upon.
+ *
* @author ivol
*/
-public class ColumnFamilyProviderServiceTracker extends ServiceTracker {
- // Capacity of our ColumnFamily update queue
- private static final int QUEUE_CAPACITY = 200;
-
+public class ColumnFamilyProviderListener {
// Members initialized in the constructor
- private BundleContext m_bundleContext;
- private LogService m_logService;
- private PackageAdmin m_packageAdmin;
-
- // This queue represents the queue of required ColumnFamily updates
- private ArrayBlockingQueue<ColumnFamilyProvider> m_updateQueue =
- new ArrayBlockingQueue<ColumnFamilyProvider>(QUEUE_CAPACITY);
-
- // The thread that polls the queue each n seconds
- private ColumnFamilyUpdateQueueListener m_queueListener;
-
- /**
- * Constructor.
- */
- public ColumnFamilyProviderServiceTracker(BundleContext bundleContext,
LogService logService,
- PackageAdmin packageAdmin) {
- super(bundleContext, ColumnFamilyProvider.class.getName(), null);
- m_bundleContext = bundleContext;
- m_logService = logService;
- m_packageAdmin = packageAdmin;
-
- // Create a timer task such that each n seconds, required updates from
the ColumnFamily queue are executed
- m_queueListener = new ColumnFamilyUpdateQueueListener();
- m_queueListener.start();
- }
+ private volatile LogService m_logService;
+ private volatile CassandraDaemonService m_daemonService;
- @Override
- public void close() {
- super.close();
- m_queueListener.interrupt();
- }
-
- @Override
- public Object addingService(ServiceReference reference) {
- Object service = m_bundleContext.getService(reference);
- if (service instanceof ColumnFamilyProvider) {
- m_updateQueue.add((ColumnFamilyProvider) service);
- }
- return service;
- }
+ public void onAdded(ColumnFamilyProvider provider) {
+ try {
+ CassandraServer cs = m_daemonService.getCassandraServer();
+ List<KsDef> keyspaceDefinitions = cs.describe_keyspaces();
+ ColumnFamilyDefinition[] colDefs = provider.getColumnFamilies();
+ for (ColumnFamilyDefinition colDef : colDefs) {
+
+ // First create keyspaces if they do not yet exist
+ String[] ksNames = colDef.getKeyspaces();
+ if (ksNames != null) {
+ for (String ksName : ksNames) {
+ // Create if it does not yet exist
+ if (!m_daemonService.keyspaceExists(ksName)) {
+ // Add keyspace and ColumnFamily since they do not
yet exist
+ m_daemonService.addKeyspace(ksName);
+ m_logService.log(LogService.LOG_INFO, "Keyspace '"
+ ksName + "' added");
+ }
- class ColumnFamilyUpdateQueueListener extends Thread {
- private boolean m_interrupt = false;
- private static final int POLL_INTERVAL = 2000;
- private boolean m_restartCassandra = false;
-
- @Override
- public void run() {
- while (!m_interrupt) {
- synchronized (this) {
- try {
- wait(POLL_INTERVAL);
- } catch (InterruptedException e) {
- m_interrupt = true;
+ // Never add ColumnFamily's to Cassandra's system
keyspace, this is a reserved keyspace
+ if (!Table.SYSTEM_TABLE.equals(ksName)) {
+ addOrUpdateColumnFamily(ksName, colDef);
+ }
}
}
-
- // Process queue
- List<ColumnFamilyDefinition> updates = new
ArrayList<ColumnFamilyDefinition>();
- synchronized (m_updateQueue) {
- if (m_updateQueue.size() > 0 && !m_interrupt) {
- // There are ColumnFamily's in the queue that need to
be updated. But is the
- // Cassandra bundle active?
- ServiceReference serviceRef =
- m_bundleContext
-
.getServiceReference("org.amdatu.platform.cassandra.application.CassandraDaemonService");
- if (serviceRef != null && !m_interrupt) {
- if (serviceRef.getBundle().getState() ==
Bundle.ACTIVE
- && System.getProperty("storage-config") !=
null && !m_interrupt) {
- // Yes it is. Now update the storage-cof.xml
for all updates in the queue. Lock the
- // bundle until the update of the storage conf
is completed
- synchronized (serviceRef.getBundle()) {
- ColumnFamilyUpdateHandler handler = new
ColumnFamilyUpdateHandler(m_logService);
- while (m_updateQueue.size() > 0) {
- ColumnFamilyProvider provider =
m_updateQueue.poll();
- for (ColumnFamilyDefinition colFamDef
: provider.getColumnFamilies()) {
- updates.add(colFamDef);
- }
- }
-
- DOMDocumentHandler domHandler =
handler.createUpdateHandler(updates);
-
- // Write the new storage config file to a
new config file
- String storageConfig =
System.getProperty("storage-config");
- File targetFile = new File(storageConfig +
File.separator + STORAGE_NEW_CONFIG_FILENAME);
- boolean restartRequired = new
DOMDocumentParser(domHandler, m_logService).parse(targetFile, false);
- m_restartCassandra = m_restartCassandra ||
restartRequired;
- if (restartRequired) {
- m_logService.log(LogService.LOG_INFO,
"Restart of Cassandra scheduled");
- }
- }
- }
+ else {
+ for (KsDef keyspaceDef : keyspaceDefinitions) {
+ // Never add ColumnFamily's to Cassandra's system
keyspace, this is a reserved keyspace
+ if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName()))
{
+ addOrUpdateColumnFamily(keyspaceDef.getName(),
colDef);
}
}
}
-
- if (m_restartCassandra) {
- // Now restart Cassandra
- restartCassandra();
- }
}
}
-
- @Override
- public void interrupt() {
- m_interrupt = true;
+ catch (TException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
+ }
+ catch (InvalidRequestException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
}
+ catch (NotFoundException e) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to register
keyspaces and/or ColumnFamily's for provider '"
+ + provider.toString() + "'", e);
+ }
+ }
- /**
- * Restarts the Cassandra bundle.
- */
- private void restartCassandra() {
- // Be absolutely sure that no bundle is in status "starting" or
"stopping"
- // before restarting the cassandra bundle
- for (Bundle bundle : m_bundleContext.getBundles()) {
- if (bundle.getState() == Bundle.STARTING || bundle.getState()
== Bundle.STOPPING) {
- return;
- }
- }
+ public void onRemoved(ColumnFamilyProvider provider) {
+ }
- // Note that we must restart the entire Cassandra application
bundle. Restarting the service is not enough
- // since
- // some Cassandra classes load configuration in a static context
(i.e. DatabaseDescriptor). This
- // initialization is thus executed when the class is loaded by the
classloader. Since we cannot unload one
- // single class, the complete classloader of that bundle must be
reloaded and this the complete OSGi bundle
- // must be restarted.
- ServiceReference serviceRef =
- m_bundleContext
-
.getServiceReference("org.amdatu.platform.cassandra.application.CassandraDaemonService");
- if (serviceRef != null) {
- long bundleId = serviceRef.getBundle().getBundleId();
- if (serviceRef.getBundle().getState() == Bundle.ACTIVE) {
- m_logService.log(LogService.LOG_INFO,
- "Cassandra restart triggered by new or updated
ColumnFamilyProvider");
- try {
- // This is where the restart happens
- m_restartCassandra = false;
-
- Bundle bundle = m_bundleContext.getBundle(bundleId);
- bundle.stop();
- m_logService.log(LogService.LOG_DEBUG, "Cassandra
application bundle stopped");
-
- bundle.update();
- m_logService.log(LogService.LOG_DEBUG, "Cassandra
application bundle updated");
-
- m_packageAdmin.refreshPackages(new Bundle[]{bundle});
- m_logService.log(LogService.LOG_DEBUG, "Cassandra
application bundle packages refreshed");
-
- // Copy new config file over old config file and
remove it
- String storageConfig =
System.getProperty("storage-config");
- File targetFile = new File(storageConfig +
File.separator + STORAGE_CONFIG_FILENAME);
- File sourceFile = new File(storageConfig +
File.separator + STORAGE_NEW_CONFIG_FILENAME);
- targetFile.delete();
- sourceFile.renameTo(targetFile);
-
- bundle.start();
- m_logService.log(LogService.LOG_DEBUG, "Cassandra
application bundle started");
- } catch (BundleException e) {
- m_logService.log(LogService.LOG_ERROR, "An error
occurred while restarting Cassandra");
- } catch (NullPointerException e) {
- m_logService.log(LogService.LOG_ERROR, "An error
occurred while restarting Cassandra");
- }
- } else {
- m_logService.log(LogService.LOG_ERROR,
- "A restart of Cassandra was required but not
executed, Cassandra application bundle ("
- + bundleId + ") is not active but its
state currently is "
- + serviceRef.getBundle().getState());
- }
+ private void addOrUpdateColumnFamily(String ksName, ColumnFamilyDefinition
colDef) throws InvalidRequestException,
+ TException, NotFoundException {
+ String cfName = colDef.getName();
+ String columnType = colDef.getColumnType().value;
+ String comparatorType = colDef.getCompareWith().value;
+ String subComparatorType = colDef.getCompareSubcolumnsWith().value;
+
+ if (!m_daemonService.columnFamilyExists(ksName, cfName)) {
+ m_daemonService.addColumnFamily(ksName, cfName, columnType,
comparatorType, subComparatorType);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' added");
+ }
+ else {
+ // Since Cassandra does not (yet) support updating columnType,
comparatorType or subComparatorType
+ // of existing ColumnFamily's, we throw an exception if one of
these has been changed by the provider.
+ // If there are no changes, we do nothing
+ if (m_daemonService.isColumnFamilyChanged(ksName, cfName,
columnType, comparatorType, subComparatorType)) {
+ throw new InvalidRequestException("Definition of ColumnFamily
'" + cfName
+ + "' has been changed, but changes in columnType,
comparatorType "
+ + "and subComparatorType are not supported by Cassandra");
}
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName +
"' not changed");
}
}
}
Modified:
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
==============================================================================
---
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
(original)
+++
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/CassandraPersistenceManager.java
Thu Oct 21 11:49:40 2010
@@ -53,7 +53,7 @@
* </pre>
*/
String KEYSPACE_AWARE_KEY = "keyspaceid";
-
+
/**
* Returns if the specified ColumnFamily exists.
* @param columnFamilyName The name of the ColumnFamily to check its
existence for
@@ -262,6 +262,4 @@
// WTF is this?
List<byte[]> getSuperRow(String table, String key, List<byte[]> columns);
-
- void batch_insert(String table, String key, String supercolum, String
column, String[] values);
}
Modified:
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
==============================================================================
---
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
(original)
+++
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
Thu Oct 21 11:49:40 2010
@@ -32,18 +32,18 @@
public class CassandraPersistenceManagerFactoryImpl implements
CassandraPersistenceManagerFactory {
// Instances injected by the Felix dependency manager
private volatile DependencyManager m_dependencyManager;
-
public void createCassandraPersistenceManager(String keyspaceId) {
Dictionary<String, String> serviceProperties = new Hashtable<String,
String>();
serviceProperties.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspaceId);
-
+
Component component = m_dependencyManager.createComponent();
component.setImplementation(CassandraPersistenceManagerImpl.class);
component.setInterface(CassandraPersistenceManager.class.getName(),
serviceProperties);
component.add(m_dependencyManager.createServiceDependency().setService(LogService.class).setRequired(true));
-
component.add(m_dependencyManager.createServiceDependency().setService(CassandraDaemonService.class).setRequired(true));
-
+
component.add(m_dependencyManager.createServiceDependency().setService(CassandraDaemonService.class)
+ .setRequired(true));
+
m_dependencyManager.add(component);
}
}
Modified:
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
==============================================================================
---
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
(original)
+++
trunk/platform-bundles/cassandra-persistencemanager/src/main/java/org/amdatu/platform/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
Thu Oct 21 11:49:40 2010
@@ -17,14 +17,28 @@
package org.amdatu.platform.cassandra.persistencemanager.service;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.Map.Entry;
import org.amdatu.platform.cassandra.application.CassandraDaemonService;
import org.amdatu.platform.cassandra.persistencemanager.CassandraException;
import
org.amdatu.platform.cassandra.persistencemanager.CassandraPersistenceManager;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.thrift.Cassandra.Iface;
-import org.apache.cassandra.thrift.*;
import org.apache.felix.dm.Component;
import org.osgi.service.log.LogService;
@@ -35,6 +49,7 @@
* on the tenant service. The tenant service however stores the tenants in
Cassandra and needs
* the persistence manager for that. Using this base class both bundles can
use their own
* implementation without needless code duplication.
+ *
* @author ivol
*/
public class CassandraPersistenceManagerImpl implements
CassandraPersistenceManager {
@@ -44,6 +59,9 @@
// Maximum amount of columns to retrieve in queries
private final static int COLUMN_LIMIT = 1000000;
+ // Default characterset to use
+ private final static String DEFAULT_CHARSET = "UTF-8";
+
// Default supercolumn name
private final static String DEFAULT_SUPERCOLUMN_NAME = "Default";
@@ -61,15 +79,20 @@
* The init() method is invoked by the Felix dependency manager.
*/
public void init() {
- m_keyspace =
m_component.getServiceProperties().get(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+ Object keyspace =
m_component.getServiceProperties().get(CassandraPersistenceManager.KEYSPACE_AWARE_KEY);
+ if (keyspace != null) {
+ m_keyspace = keyspace.toString();
+ }
}
-
+
public void start() {
- m_logService.log(LogService.LOG_INFO, "Cassandra persistence manager
for keyspace '" + m_keyspace + "' is started");
+ m_logService.log(LogService.LOG_INFO, "Cassandra persistence manager
for keyspace '" + m_keyspace
+ + "' is started");
}
-
+
public void stop() {
- m_logService.log(LogService.LOG_INFO, "Cassandra persistence manager
for keyspace '" + m_keyspace + "' is stopped");
+ m_logService.log(LogService.LOG_INFO, "Cassandra persistence manager
for keyspace '" + m_keyspace
+ + "' is stopped");
}
public boolean exists(String columnFamilyName) throws CassandraException {
@@ -80,18 +103,21 @@
p.setSlice_range(sliceRange);
KeyRange range = new KeyRange(100000);
- range.setStart_key("");
- range.setEnd_key("");
+ range.setStart_key(new byte[0]);
+ range.setEnd_key(new byte[0]);
Iface cs = m_daemonService.getCassandraServer();
- cs.get_range_slices(m_keyspace, columnParent, p, range,
ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ cs.get_range_slices(columnParent, p, range, ConsistencyLevel.ONE);
return true;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName);
- } else {
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public boolean exists(String columnFamilyName, String rowKey) throws
CassandraException {
@@ -102,40 +128,49 @@
p.setSlice_range(sliceRange);
KeyRange range = new KeyRange(1);
- range.setStart_key(rowKey);
- range.setEnd_key(rowKey);
+ range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
+ range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
Iface cs = m_daemonService.getCassandraServer();
- List<KeySlice> keySlices = cs.get_range_slices(m_keyspace,
columnParent, p, range, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ List<KeySlice> keySlices = cs.get_range_slices(columnParent, p,
range, ConsistencyLevel.ONE);
return keySlices != null && keySlices.size() > 0 &&
keySlices.get(0).getColumns().size() > 0;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public boolean exists(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnPath columnPath = new ColumnPath(columnFamilyName);
- columnPath.setSuper_column(superColumnName.getBytes("UTF-8"));
- columnPath.setColumn(columnName.getBytes("UTF-8"));
+
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+ columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
Iface cs = m_daemonService.getCassandraServer();
- ColumnOrSuperColumn sosc = cs.get(m_keyspace, rowKey, columnPath,
ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ ColumnOrSuperColumn sosc =
cs.get(rowKey.getBytes(DEFAULT_CHARSET), columnPath, ConsistencyLevel.ONE);
sosc.getColumn();
return true;
- } catch (NotFoundException e) {
+ }
+ catch (NotFoundException e) {
return false;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName).setColumn(columnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName).setColumn(columnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
/**
@@ -152,29 +187,33 @@
// Set key range to maximum ROW_LIMIT results
KeyRange range = new KeyRange(ROW_LIMIT);
- range.setStart_key("");
- range.setEnd_key("");
+ range.setStart_key(new byte[0]);
+ range.setEnd_key(new byte[0]);
Iface cs = m_daemonService.getCassandraServer();
- List<KeySlice> keySlices = cs.get_range_slices(m_keyspace,
columnParent, p, range, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ List<KeySlice> keySlices = cs.get_range_slices(columnParent, p,
range, ConsistencyLevel.ONE);
List<String> keys = new ArrayList<String>();
for (KeySlice keySlice : keySlices) {
// This may be a bangling row key marked for removal, check
that!
if (keySlice.getColumnsSize() > 0) {
- keys.add(keySlice.getKey());
+ keys.add(new String(keySlice.getKey(), "UTF-8"));
}
}
- m_logService.log(LogService.LOG_DEBUG, "Found " + keys.size() + "
keys for ColumnFamily '" + columnFamilyName
- + "' in getRowKeys");
+ m_logService.log(LogService.LOG_DEBUG, "Found " + keys.size() + "
keys for ColumnFamily '"
+ + columnFamilyName
+ + "' in getRowKeys");
return keys;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName);
- } else {
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public List<String> getColumnNames(String columnFamilyName, String rowKey)
throws CassandraException {
@@ -188,8 +227,9 @@
predicate.setSlice_range(range);
Iface cs = m_daemonService.getCassandraServer();
+ cs.set_keyspace(m_keyspace);
List<ColumnOrSuperColumn> slice =
- cs.get_slice(m_keyspace, rowKey, columnParent, predicate,
ConsistencyLevel.ONE);
+ cs.get_slice(rowKey.getBytes(DEFAULT_CHARSET), columnParent,
predicate, ConsistencyLevel.ONE);
for (ColumnOrSuperColumn columnOrSuperColumn : slice) {
if (columnOrSuperColumn.isSetSuper_column()) {
@@ -201,16 +241,20 @@
}
}
return result;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public Map<String, Map<String, Map<String, byte[]>>> getSuperValues(String
columnFamilyName) throws CassandraException {
+ public Map<String, Map<String, Map<String, byte[]>>> getSuperValues(String
columnFamilyName)
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
@@ -218,145 +262,173 @@
p.setSlice_range(sliceRange);
KeyRange range = new KeyRange(100000);
- range.setStart_key("");
- range.setEnd_key("zzzzz");
+ range.setStart_key(new byte[0]);
+ range.setEnd_key("zzzzz".getBytes(DEFAULT_CHARSET));
Iface cs = m_daemonService.getCassandraServer();
+ cs.set_keyspace(m_keyspace);
List<KeySlice> getRangeSlices =
- cs.get_range_slices(m_keyspace, columnParent, p, range,
ConsistencyLevel.ONE);
+ cs.get_range_slices(columnParent, p, range,
ConsistencyLevel.ONE);
return flattenSuper(getRangeSlices);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName);
- } else {
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public Map<String, Map<String, byte[]>> getSuperValues(String
columnFamilyName, String rowKey) throws CassandraException {
+ public Map<String, Map<String, byte[]>> getSuperValues(String
columnFamilyName, String rowKey)
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
- SliceRange sliceRange = new SliceRange(new byte[0], new byte[0],
false, COLUMN_LIMIT);
+ SliceRange sliceRange = new SliceRange(new byte[0], new byte[0],
false, COLUMN_LIMIT);
p.setSlice_range(sliceRange);
KeyRange range = new KeyRange(ROW_LIMIT);
- range.setStart_key(rowKey);
- range.setEnd_key(rowKey);
+ range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
+ range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
List<KeySlice> getRangeSlices;
Iface cs = m_daemonService.getCassandraServer();
- getRangeSlices = cs.get_range_slices(m_keyspace, columnParent, p,
range, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ getRangeSlices = cs.get_range_slices(columnParent, p, range,
ConsistencyLevel.ONE);
Map<String, Map<String, Map<String, byte[]>>> flattenSuper =
flattenSuper(getRangeSlices);
Set<Entry<String, Map<String, Map<String, byte[]>>>> entrySet =
flattenSuper.entrySet();
for (Entry<String, Map<String, Map<String, byte[]>>> first :
entrySet) {
return first.getValue();
}
return null;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public Map<String, Map<String, String>> getSuperStringValues(String
columnFamilyName, String rowKey)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
- SliceRange sliceRange = new SliceRange(new byte[0], new byte[0],
false, COLUMN_LIMIT);
+ SliceRange sliceRange = new SliceRange(new byte[0], new byte[0],
false, COLUMN_LIMIT);
p.setSlice_range(sliceRange);
KeyRange range = new KeyRange(ROW_LIMIT);
- range.setStart_key(rowKey);
- range.setEnd_key(rowKey);
+ range.setStart_key(rowKey.getBytes(DEFAULT_CHARSET));
+ range.setEnd_key(rowKey.getBytes(DEFAULT_CHARSET));
List<KeySlice> getRangeSlices;
Iface cs = m_daemonService.getCassandraServer();
- getRangeSlices = cs.get_range_slices(m_keyspace, columnParent, p,
range, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ getRangeSlices = cs.get_range_slices(columnParent, p, range,
ConsistencyLevel.ONE);
Map<String, Map<String, Map<String, String>>> flattenSuper =
flattenStringSuper(getRangeSlices);
Set<Entry<String, Map<String, Map<String, String>>>> entrySet =
flattenSuper.entrySet();
for (Entry<String, Map<String, Map<String, String>>> first :
entrySet) {
return first.getValue();
}
return null;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public byte[] getValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName) throws CassandraException {
+ public byte[] getValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
+ throws CassandraException {
try {
ColumnPath columnPath = new ColumnPath(columnFamilyName);
- columnPath.setSuper_column(superColumnName.getBytes("UTF-8"));
- columnPath.setColumn(columnName.getBytes("UTF-8"));
+
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+ columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
Iface cs = m_daemonService.getCassandraServer();
- ColumnOrSuperColumn columnOrSuperColumn = cs.get(m_keyspace,
rowKey, columnPath, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ ColumnOrSuperColumn columnOrSuperColumn =
cs.get(rowKey.getBytes(), columnPath, ConsistencyLevel.ONE);
byte[] value = columnOrSuperColumn.getColumn().getValue();
return value;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName).setColumn(columnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName).setColumn(columnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public String getStringValue(String columnFamilyName, String rowKey,
String superColumnName, String columnName) throws CassandraException {
+ public String getStringValue(String columnFamilyName, String rowKey,
String superColumnName, String columnName)
+ throws CassandraException {
try {
byte[] value = getValue(columnFamilyName, rowKey, superColumnName,
columnName);
if (value != null) {
- return new String(value, "UTF-8");
- } else {
+ return new String(value, DEFAULT_CHARSET);
+ }
+ else {
return null;
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName).setColumn(columnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName).setColumn(columnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public Map<String, byte[]> getValues(String columnFamilyName, String
rowKey, String superColumnName) throws CassandraException {
+ public Map<String, byte[]> getValues(String columnFamilyName, String
rowKey, String superColumnName)
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
- columnParent.setSuper_column(superColumnName.getBytes("UTF-8"));
+
columnParent.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
// read entire row
SliceRange range = new SliceRange(new byte[0], new byte[0], false,
COLUMN_LIMIT);
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(range);
Iface cs = m_daemonService.getCassandraServer();
- List<ColumnOrSuperColumn> slice = cs.get_slice(m_keyspace, rowKey,
columnParent, predicate, ConsistencyLevel.ONE);
+ cs.set_keyspace(m_keyspace);
+ List<ColumnOrSuperColumn> slice =
+ cs.get_slice(rowKey.getBytes(), columnParent, predicate,
ConsistencyLevel.ONE);
Map<String, byte[]> result = new HashMap<String, byte[]>();
for (ColumnOrSuperColumn columnOrSuperColumn : slice) {
- String name = new
String(columnOrSuperColumn.getColumn().getName(), "UTF-8");
+ String name = new
String(columnOrSuperColumn.getColumn().getName(), DEFAULT_CHARSET);
byte[] value = columnOrSuperColumn.getColumn().getValue();
result.put(name, value);
}
return result;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public Map<String, String> getStringValues(String columnFamilyName, String
rowKey, String superColumnName)
- throws CassandraException {
+ throws CassandraException {
try {
Map<String, byte[]> byteValues = getValues(columnFamilyName,
rowKey, superColumnName);
if (byteValues != null) {
@@ -365,53 +437,66 @@
stringValues.put(key, new String(byteValues.get(key),
"UTF-8"));
}
return stringValues;
- } else {
+ }
+ else {
return null;
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public void setValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName, byte[] value) throws CassandraException {
+ public void setValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName, byte[] value)
+ throws CassandraException {
try {
long timestamp = System.currentTimeMillis();
- ColumnPath columnPath = new ColumnPath(columnFamilyName);
- columnPath.setSuper_column(superColumnName.getBytes("UTF-8"));
- columnPath.setColumn(columnName.getBytes("UTF-8"));
+ ColumnParent column_parent = new ColumnParent(columnFamilyName);
+
column_parent.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
+ Column column = new Column(columnName.getBytes(), value,
timestamp);
Iface cs = m_daemonService.getCassandraServer();
- cs.insert(m_keyspace, rowKey, columnPath, value, timestamp,
ConsistencyLevel.ONE);
- } catch (Exception e) {
+ cs.set_keyspace(m_keyspace);
+ cs.insert(rowKey.getBytes(DEFAULT_CHARSET), column_parent, column,
ConsistencyLevel.ONE);
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
-
.setSuperColumn(superColumnName).setColumn(columnName).setValue(value);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+
.setSuperColumn(superColumnName).setColumn(columnName).setValue(value);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public void setStringValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName, String value) throws CassandraException {
+ public void setStringValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName,
+ String value) throws CassandraException {
try {
byte[] bytes = null;
if (value != null) {
- bytes = value.getBytes("UTF-8");
+ bytes = value.getBytes(DEFAULT_CHARSET);
}
setValue(columnFamilyName, rowKey, superColumnName, columnName,
bytes);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
-
.setSuperColumn(superColumnName).setColumn(columnName).setValue(value);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+
.setSuperColumn(superColumnName).setColumn(columnName).setValue(value);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public void deleteRow(String columnFamilyName, String rowKey) throws
CassandraException {
@@ -424,37 +509,44 @@
}
}
- public void deleteSuperColumn(String columnFamilyName, String rowKey,
String superColumnName) throws CassandraException {
+ public void deleteSuperColumn(String columnFamilyName, String rowKey,
String superColumnName)
+ throws CassandraException {
deleteColumn(columnFamilyName, rowKey, superColumnName, null);
}
- public void deleteColumn(String columnFamilyName, String rowKey, String
superColumnName, String columnName) throws CassandraException {
+ public void deleteColumn(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
+ throws CassandraException {
try {
long timestamp = System.currentTimeMillis();
ColumnPath columnPath = new ColumnPath(columnFamilyName);
if (superColumnName != null) {
- columnPath.setSuper_column(superColumnName.getBytes("UTF-8"));
+
columnPath.setSuper_column(superColumnName.getBytes(DEFAULT_CHARSET));
}
if (columnName != null) {
- columnPath.setColumn(columnName.getBytes("UTF-8"));
+ columnPath.setColumn(columnName.getBytes(DEFAULT_CHARSET));
}
Iface cs = m_daemonService.getCassandraServer();
- cs.remove(m_keyspace, rowKey, columnPath, timestamp,
ConsistencyLevel.ONE);
- } catch (Exception e) {
+ cs.set_keyspace(m_keyspace);
+ cs.remove(rowKey.getBytes(), columnPath, timestamp,
ConsistencyLevel.ONE);
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(rowKey)
- .setSuperColumn(superColumnName).setColumn(columnName);
- } else {
+ throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName).setRowKey(
+ rowKey)
+ .setSuperColumn(superColumnName).setColumn(columnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
public <T> T loadBean(Class<T> clazz, String id) throws CassandraException
{
return loadBean(clazz, id, clazz.getName(), DEFAULT_SUPERCOLUMN_NAME);
}
- public <T> T loadBean(Class<T> clazz, String id, String columnFamilyName,
String superColumnName) throws CassandraException {
+ public <T> T loadBean(Class<T> clazz, String id, String columnFamilyName,
String superColumnName)
+ throws CassandraException {
boolean exists = false;
T bean;
try {
@@ -472,7 +564,8 @@
if
(method.getParameterTypes()[0].equals(String.class)) {
method.invoke(bean, value);
exists = true;
- } else if
(method.getParameterTypes()[0].equals(Integer.class)) {
+ }
+ else if
(method.getParameterTypes()[0].equals(Integer.class)) {
method.invoke(bean, Integer.parseInt(value));
}
}
@@ -482,14 +575,16 @@
if (exists) {
return bean;
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName)
- .setSuperColumn(superColumnName);
- } else {
+ .setSuperColumn(superColumnName);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
return null;
}
@@ -501,16 +596,19 @@
String id = idMethod.invoke(bean).toString();
persistBean(bean, id, bean.getClass().getName(),
DEFAULT_SUPERCOLUMN_NAME);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new
CassandraException(e).setRowKey(bean.getClass().getName());
- } else {
+ throw new
CassandraException(e).setRowKey(bean.getClass().getName());
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
- public void persistBean(Object bean, String id, String columnFamilyName,
String superColumnName) throws CassandraException {
+ public void persistBean(Object bean, String id, String columnFamilyName,
String superColumnName)
+ throws CassandraException {
// FIXME: For now, just persist all bean properties for which getters
are available
Method[] methods = bean.getClass().getMethods();
for (Method method : methods) {
@@ -522,13 +620,15 @@
String value = oVal.toString();
setStringValue(columnFamilyName, id, superColumnName,
key, value);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
if (!(e instanceof RuntimeException)) {
- throw new CassandraException(e).setRowKey(id);
- } else {
+ throw new CassandraException(e).setRowKey(id);
+ }
+ else {
throw (RuntimeException) e;
}
- }
+ }
}
}
}
@@ -540,10 +640,11 @@
p.setColumn_names(columnNames);
KeyRange range = new KeyRange(1);
- range.setStart_key(key);
- range.setEnd_key(key);
+ range.setStart_key(key.getBytes(DEFAULT_CHARSET));
+ range.setEnd_key(key.getBytes(DEFAULT_CHARSET));
Iface cs = m_daemonService.getCassandraServer();
- List<ColumnOrSuperColumn> getSlice = cs.get_slice(m_keyspace, key,
columnParent, p, ConsistencyLevel.ONE);
+ List<ColumnOrSuperColumn> getSlice =
+ cs.get_slice(key.getBytes(DEFAULT_CHARSET), columnParent, p,
ConsistencyLevel.ONE);
// byte[][] result = new byte[getSlice.size()][0];
List<byte[]> result = new ArrayList<byte[]>(getSlice.size());
@@ -552,51 +653,20 @@
result.add(name);
}
return result;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
m_logService.log(LogService.LOG_ERROR, "An error occurred for
getSuperRow('" + table + "', '" + key
- + "', '" + columnNames + "')", e);
+ + "', '" + columnNames + "')", e);
}
return null;
}
- public void batch_insert(String table, String key, String supercolum,
String column, String[] values) {
- try {
- long timestamp = System.currentTimeMillis();
-
- Map<String, List<ColumnOrSuperColumn>> insertDataMap = new
HashMap<String, List<ColumnOrSuperColumn>>();
- List<ColumnOrSuperColumn> rowData = new
ArrayList<ColumnOrSuperColumn>();
- List<Column> columns = new ArrayList<Column>();
-
- // These are the supercolumn columns
- for (String value : values) {
- columns.add(new Column(value.getBytes("UTF-8"),
value.getBytes("UTF-8"), timestamp));
- columns.add(new Column(value.getBytes("UTF-8"),
value.getBytes("UTF-8"), timestamp));
- columns.add(new Column(value.getBytes("UTF-8"),
value.getBytes("UTF-8"), timestamp));
- }
-
- // Here the super column
- SuperColumn superColumn = new
SuperColumn(supercolum.getBytes("UTF-8"), columns);
- ColumnOrSuperColumn columnOrSuperColumn = new
ColumnOrSuperColumn();
- columnOrSuperColumn.setSuper_column(superColumn);
-
- rowData.add(columnOrSuperColumn);
- insertDataMap.put(table, rowData);
-
- Iface cs = m_daemonService.getCassandraServer();
- cs.batch_insert(m_keyspace, key, insertDataMap,
ConsistencyLevel.ONE);
-
- } catch (Exception e) {
- m_logService.log(LogService.LOG_ERROR, "An error occurred for
batch_insert('" + table + "','" + key + "','"
- + supercolum + "','" + column + "','" + values + "')");
- }
- }
-
private Map<String, Map<String, Map<String, byte[]>>>
flattenSuper(List<KeySlice> getRangeSlices) {
Map<String, Map<String, Map<String, byte[]>>> result = new
HashMap<String, Map<String, Map<String, byte[]>>>();
try {
for (KeySlice slice : getRangeSlices) {
- String key = slice.getKey();
+ String key = new String(slice.getKey(), DEFAULT_CHARSET);
Map<String, Map<String, byte[]>> value = new HashMap<String,
Map<String, byte[]>>();
List<ColumnOrSuperColumn> columns = slice.getColumns();
for (ColumnOrSuperColumn column : columns) {
@@ -604,15 +674,16 @@
List<Column> columns2 = superColumn.getColumns();
Map<String, byte[]> subvalue = new HashMap<String,
byte[]>();
for (Column column3 : columns2) {
- String name = new String(column3.getName(), "UTF-8");
+ String name = new String(column3.getName(),
DEFAULT_CHARSET);
subvalue.put(name, column3.getValue());
}
- value.put(new String(superColumn.getName(), "UTF-8"),
subvalue);
+ value.put(new String(superColumn.getName(),
DEFAULT_CHARSET), subvalue);
}
result.put(key, value);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
m_logService.log(LogService.LOG_ERROR, "An error occurred for
flattenSuper('" + getRangeSlices + "')");
}
@@ -623,7 +694,7 @@
Map<String, Map<String, Map<String, String>>> result = new
HashMap<String, Map<String, Map<String, String>>>();
try {
for (KeySlice slice : getRangeSlices) {
- String key = slice.getKey();
+ String key = new String(slice.getKey(), DEFAULT_CHARSET);
Map<String, Map<String, String>> value = new HashMap<String,
Map<String, String>>();
List<ColumnOrSuperColumn> columns = slice.getColumns();
for (ColumnOrSuperColumn column : columns) {
@@ -631,15 +702,16 @@
List<Column> columns2 = superColumn.getColumns();
Map<String, String> subvalue = new HashMap<String,
String>();
for (Column column3 : columns2) {
- String name = new String(column3.getName(), "UTF-8");
- subvalue.put(name, new String(column3.getValue(),
"UTF-8"));
+ String name = new String(column3.getName(),
DEFAULT_CHARSET);
+ subvalue.put(name, new String(column3.getValue(),
DEFAULT_CHARSET));
}
- value.put(new String(superColumn.getName(), "UTF-8"),
subvalue);
+ value.put(new String(superColumn.getName(),
DEFAULT_CHARSET), subvalue);
}
result.put(key, value);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
m_logService.log(LogService.LOG_ERROR, "An error occurred for
flattenSuper('" + getRangeSlices + "')");
}
Modified:
trunk/platform-bundles/filebased-configuration/src/main/resources/conf/org.amdatu.platform.cassandra.application.cfg
==============================================================================
---
trunk/platform-bundles/filebased-configuration/src/main/resources/conf/org.amdatu.platform.cassandra.application.cfg
(original)
+++
trunk/platform-bundles/filebased-configuration/src/main/resources/conf/org.amdatu.platform.cassandra.application.cfg
Thu Oct 21 11:49:40 2010
@@ -5,4 +5,7 @@
commitlogdir=work/cassandra/commitlog
# Directory in which the data files are stored
-datafiledir=work/cassandra/data
\ No newline at end of file
+datafiledir=work/cassandra/data
+
+# Directory in which the caches are saved
+savedcachesdir=work/cassandra/saved_caches
\ No newline at end of file
Modified:
trunk/platform-bundles/useradmin-cassandra-store/src/main/java/org/amdatu/platform/useradmin/store/cassandra/service/CassandraStorageProvider.java
==============================================================================
---
trunk/platform-bundles/useradmin-cassandra-store/src/main/java/org/amdatu/platform/useradmin/store/cassandra/service/CassandraStorageProvider.java
(original)
+++
trunk/platform-bundles/useradmin-cassandra-store/src/main/java/org/amdatu/platform/useradmin/store/cassandra/service/CassandraStorageProvider.java
Thu Oct 21 11:49:40 2010
@@ -66,7 +66,7 @@
private static final String USER_KEY_PREFIX = "user_";
private static final String GROUP_KEY_PREFIX = "group_";
-
+
private static final String STRING_TYPE = "string";
private static final String BYTES_TYPE = "bytes";
@@ -76,11 +76,11 @@
// Service dependencies injected by the OSGi framework
private volatile LogService m_logService;
private volatile CassandraPersistenceManager m_pm;
-
+
public void start() {
m_logService.log(LogService.LOG_INFO, "Cassandra storage provider
service started");
}
-
+
public void stop() {
m_logService.log(LogService.LOG_INFO, "Cassandra storage provider
service stopped");
}
@@ -158,10 +158,11 @@
// Try to load the role assuming it is a user
return internalLoadRole(userAdminFactory, USER_KEY_PREFIX +
roleName, null);
} catch (StorageException e) {
+ // Try to load the role assuming it is a group
try {
- // Try to load the role assuming it is a group
return internalLoadRole(userAdminFactory, GROUP_KEY_PREFIX +
roleName, null);
- } catch (StorageException e2) {
+ }
+ catch (StorageException e2) {
return null;
}
}
@@ -264,7 +265,7 @@
// Load all super columns of the Role ColumnFamily
Map<String, Map<String, byte[]>> superColumns =
m_pm.getSuperValues(CF_ROLE, key);
Map<String, byte[]> basic = superColumns.get(SUPER_COLUMN_BASIC);
-
+
Map<String, Object> properties =
toStringObjectMap(superColumns.get(SUPER_COLUMN_PROPERTIES),
superColumns.get(SUPER_COLUMN_PROPERTY_TYPES));
Map<String, byte[]> bCredentials =
superColumns.get(SUPER_COLUMN_CREDENTIALS);
Map<String, Object> credentials;
@@ -289,7 +290,7 @@
}
} else {
throw new StorageException("ColumnFamily " + CF_ROLE + " is
missing column '" + SUPER_COLUMN_BASIC
- + "'");
+ + "'");
}
} catch (UnsupportedEncodingException e) {
throw new StorageException(e.getMessage());
@@ -460,7 +461,7 @@
private Object fromJson(String object, Class<?> clazz) {
return new Gson().fromJson(object, clazz);
}
-
+
// Converts the Json string to an object
private Object fromJson(String object, Type type) {
return new Gson().fromJson(object, type);
Modified: trunk/pom.xml
==============================================================================
--- trunk/pom.xml (original)
+++ trunk/pom.xml Thu Oct 21 11:49:40 2010
@@ -173,7 +173,7 @@
<!-- Version numbers of platform bundles -->
<shindig.version>1.1-BETA5-incubating</shindig.version>
- <cassandra.version>0.6.4</cassandra.version>
+ <cassandra.version>0.7.0-beta2</cassandra.version>
<openrdf.version>2.3.1</openrdf.version>
</properties>
@@ -220,6 +220,14 @@
<name>repo1</name>
<url>http://repo1.maven.org/maven2</url>
</repository>
+ <repository>
+ <id>maven.scale7.org</id>
+ <name>Scale7 Maven Repo</name>
+ <url>http://github.com/s7/mvnrepo/raw/master</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
</repositories>
<dependencies>