Author: [email protected]
Date: Mon Dec 12 11:35:28 2011
New Revision: 1830
Log:
[AMDATUCASSANDRA-136] Added getColumnFamily and updateColumnFamily to the
CassandraDaemonService to support updating CFs.
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
Mon Dec 12 11:35:28 2011
@@ -138,6 +138,18 @@
List<String> getColumnFamilies(String keyspaceName);
/**
+ * Returns the ColumnFamily definition the ColumnFamily with specified
name in the specified keyspace.
+ * The definition returned contains all properties of the ColumnFamily.
+ *
+ * @param keyspaceName
+ * Name of the keyspace to retrieve the ColumnFamily from
+ * @param columnFamilyName
+ * Name of the ColumnFamily to retrieve
+ * @return ColumnFamily definition the specified ColumnFamily
+ */
+ CfDef getColumnFamily(final String keyspaceName, final String
columnFamilyName);
+
+ /**
* Returns the ColumnType of the specified ColumnFamily in the specified
keyspace. Note that the type of
* a ColumnFamily is fixed; Cassandra does not support changing the column
family. If the specified
* Column Family does not exist, null is returned.
@@ -193,11 +205,30 @@
* Name of the keyspace to add the ColumnFamily to
* @param cfDef
* CfDef object holding the properties of the ColumnFamily to add
- * @return true if a new ColumnFamily was added, false if a ColumnFamily
with this name already existed.
+ * @return true if a new ColumnFamily was added, false if a ColumnFamily
with this name already existed.
*/
boolean addColumnFamily(String keyspace, CfDef cfDef);
/**
+ * Updates an existing ColumnFamily in the specified keyspace. Returns
true if a new ColumnFamily was
+ * updated successfully. If a ColumnFamily with that name does not exist,
false is returned. Note that
+ * ColumnFamily's in Cassandra are case-sensitive.
+ * In case the changes in the specified CfDef are not compatible (i.e.
change of type standard to super
+ * ColumnFamily) an unchecked ThriftException is thrown.
+ * Note that the CfDef is supposed to be a 'complete' definition,
including the ColumnDefinitions that
+ * have not been changed for example. If the old CfDef contained a
ColumnDefinition and the passed cfDef
+ * does not, that ColumnDefinition will be removed. Therefore, it might be
useful to retrieve the full
+ * ColumnFamilyDefinition using getColumnFamily(String, String) first.
+ *
+ * @param keyspace
+ * Name of the keyspace to update the ColumnFamily in
+ * @param cfDef
+ * CfDef object holding the properties of the ColumnFamily to update
+ * @return true if a ColumnFamily was updated successfully, false
otherwise.
+ */
+ boolean updateColumnFamily(String keyspace, CfDef cfDef);
+
+ /**
* Verifies if the ColumnFamily specified by keyspace and ColumFamily name
already present in
* Cassandra has the same columnType, comparatorType and subComparatorType
as specified. If no
* ColumnFamily with the specified name exists, false is returned. Only in
case a ColumnFamily
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Mon Dec 12 11:35:28 2011
@@ -347,17 +347,17 @@
throw new ThriftException(e).setKeyspace(keyspace);
}
}
-
- public boolean columnFamilyExists(final String keyspaceName, final String
columnFamilyName) {
+
+ public CfDef getColumnFamily(final String keyspaceName, final String
columnFamilyName) {
try {
KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
List<CfDef> cfDefs = ksDef.getCf_defs();
for (CfDef cfDef : cfDefs) {
if (cfDef.getName().equals(columnFamilyName)) {
- return true;
+ return cfDef;
}
}
- return false;
+ return null;
}
catch (InvalidRequestException e) {
// Rethrow the checked exception as a new unchecked Thrift
exception
@@ -369,6 +369,11 @@
}
}
+ public boolean columnFamilyExists(final String keyspaceName, final String
columnFamilyName) {
+ CfDef cfDef = getColumnFamily(keyspaceName, columnFamilyName);
+ return cfDef != null;
+ }
+
public String getColumnType(final String keyspaceName, final String
columnFamilyName) {
Map<String, String> columnFamilyTypeMap;
if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
@@ -469,6 +474,43 @@
throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
}
}
+
+ @Override
+ public boolean updateColumnFamily(String keyspace, CfDef cfDef) {
+ // Before we create the columnFamily, the cluster must agree upon the
schema
+ waitForSchemaAgreement();
+
+ String cfName = cfDef.getName();
+ cfDef.setKeyspace(keyspace);
+ if (keyspace.equals(Table.SYSTEM_TABLE)) {
+ throw new ThriftException("ColumnFamily's cannot be updated in
Cassandra's system keyspace");
+ }
+ try {
+ CfDef oldCfDef = getColumnFamily(keyspace, cfName);
+ if (oldCfDef != null) {
+ m_cassandraServer.set_keyspace(keyspace);
+ cfDef.id = oldCfDef.id;
+ m_cassandraServer.system_update_column_family(cfDef);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName + "' has been updated in the keyspace '"
+ + keyspace + "'");
+ return true;
+ }
+ else {
+ m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" +
cfName + "' was not updated in the keyspace '"
+ + keyspace + "' since it does not exist");
+ return false;
+ }
+ }
+ catch (InvalidRequestException e) {
+ throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
+ }
+ catch (TException e) {
+ throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
+ }
+ catch (SchemaDisagreementException e) {
+ throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
+ }
+ }
private boolean equalComparator(String cfDefCompType, String compType) {
String marshalPackage = "org.apache.cassandra.db.marshal.";
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Mon Dec 12 11:35:28 2011
@@ -96,23 +96,21 @@
final ColumnFamilyDefinition colDef) throws InvalidRequestException,
TException, NotFoundException {
CfDef cfDef = colDef.getCfDef();
-
- if (!daemonService.addColumnFamily(ksName, cfDef)) {
- // Since Cassandra does not (yet) support updating columnType,
comparatorType or subComparatorType
- // of existing ColumnFamily's, we throw an exception if one of
these has been changed by the provider.
- // If there are no changes, we do nothing
- final String cfName = cfDef.getName();
- String columnType = cfDef.getColumn_type();
- String comparatorType = cfDef.getComparator_type();
- String subComparatorType = cfDef.getSubcomparator_type();
- if (daemonService.isColumnFamilyChanged(ksName, cfName,
columnType, comparatorType, subComparatorType)) {
- throw new InvalidRequestException("Definition of ColumnFamily
'" + cfName
- + "' has been changed, but changes in columnType,
comparatorType "
- + "and subComparatorType are not supported by Cassandra");
+
+ if (!daemonService.columnFamilyExists(ksName, colDef.getName())) {
+ // CF did not yet exist, try to add it
+ if (!daemonService.addColumnFamily(ksName, cfDef)) {
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
colDef.getName()
+ + "' could not be added to keyspace '" + ksName + "'");
+ }
+ }
+ else {
+ // Try to update the CF
+ if (!daemonService.updateColumnFamily(ksName, cfDef)) {
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
colDef.getCfDef().getName()
+ + "' not changed in keyspace '"
+ + ksName + "'");
}
- m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
colDef.getCfDef().getName() + "' not changed in keyspace '"
- + ksName
- + "'");
}
}
}
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
Mon Dec 12 11:35:28 2011
@@ -15,6 +15,8 @@
*/
package org.amdatu.cassandra.test.unit.listener;
+import java.nio.ByteBuffer;
+
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
@@ -26,7 +28,11 @@
import org.amdatu.cassandra.test.unit.framework.UnitTestFramework;
import org.amdatu.cassandra.test.unit.listener.mock.BundleContextMock;
import org.amdatu.cassandra.test.unit.listener.mock.CPMFactoryMock;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.thrift.IndexType;
import org.apache.felix.dm.DependencyManager;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,7 +50,7 @@
private static final String TEST_CF_4 = "TestCF4";
// Timeout for service registrations to take place after schema changes
- private static final long TIMEOUT = 10000;
+ private static final long TIMEOUT = 15000;
protected CassandraDaemonService m_daemon;
@@ -90,6 +96,9 @@
// Test add global ColumnFamily
testAddGlobalColumnFamily();
+
+ // Test update ColumnFamily
+ testUpdateColumnFamily();
// Test removing the keyspace
testRemoveKeyspace();
@@ -164,6 +173,38 @@
Assert.assertTrue(m_daemon.getColumnFamilies(TEST_KS).contains(TEST_CF_3));
}
+ private void testUpdateColumnFamily() throws Exception {
+ // Update with the same properties, should succeed
+ CfDef cfDef = new CfDef(TEST_KS, TEST_CF);
+ cfDef.setColumn_type(ColumnType.STANDARD.getValue());
+ cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
+ Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+
+ // Update with change standard -> super, should throw an exception
+ cfDef.setColumn_type(ColumnType.SUPER.getValue());
+ try {
+ m_daemon.updateColumnFamily(TEST_KS, cfDef);
+ Assert.fail("Incompatible update of ColumnFamily '" + TEST_CF + "'
does not throw an exception");
+ } catch (Exception e) {
+ }
+
+ // Add a ColumnDefinition, should succeed
+ cfDef.setColumn_type(ColumnType.STANDARD.getValue());
+ ByteBuffer name = ByteBuffer.wrap("indexcol".getBytes("UTF-8"));
+ ColumnDef cDef = new ColumnDef(name, UTF8Type.class.getName());
+ cDef.setIndex_type(IndexType.KEYS);
+ cfDef.addToColumn_metadata(cDef);
+ Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+
+ // Update the ColumnDefinition without the ColDef, the ColDef should
be removed
+ cfDef = new CfDef(TEST_KS, TEST_CF);
+ cfDef.setColumn_type(ColumnType.STANDARD.getValue());
+ cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
+ m_daemon.updateColumnFamily(TEST_KS, cfDef);
+ CfDef def = m_daemon.getColumnFamily(TEST_KS, TEST_CF);
+ Assert.assertTrue(def.getColumn_metadata().size() == 0);
+ }
+
private void testRemoveKeyspace() throws Exception {
// Drop the keyspace
m_daemon.dropKeyspace(TEST_KS);
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits