Author: [email protected]
Date: Fri Dec 16 14:40:32 2011
New Revision: 1852
Log:
[AMDATUCASSANDRA-138] Added support for concurrent schema modifications. The
internal Cassandra client in the CassandraDaemonService is replaced by a Thrift
client to the first node in the lexicographic sorted list of IP addresses of
live nodes. As a result all schema modifications on all nodes in a cluster are
send to the same node (to which we refer as the schema master).
Added:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
Fri Dec 16 14:40:32 2011
@@ -118,6 +118,11 @@
* Configuration key that stores the schema agreement timeout (in seconds).
*/
String SCHEMA_AGREEMENT_TIMEOUT = "schema_agreement_timeout";
+
+ /**
+ * Configuration key that stores the Thrift timeout (in seconds).
+ */
+ String THRIFT_TIMEOUT = "thrift_timeout";
/**
* Returns the default replication factor for new keyspaces. The
replication factor determines
@@ -200,4 +205,17 @@
* @return the timeout (in seconds) to wait for schema agreement
*/
int getSchemaAgreementTimeout();
+
+ /**
+ * The timeout (in seconds) for Thrift connections used by the Cassandra
daemon service. All schema changes
+ * are handled by this service, which opens a Thrift client to a central
'schema master' to send all changes to
+ * (schema modifications cannot be handles by different nodes
concurrently).
+ * This timeout determines how long the client will wait for a response.
When a timeout occurs, the schema update
+ * will fail.
+ * Note that if the central 'schema master' is down, this timeout
determines how long it will take for the daemon
+ * to detect that it is down and switch to the new schema master. It is
therefore recommended not to increase this
+ * timeout too much.
+ * @return the timeout (in seconds) to wait for a Thrift response
+ */
+ int getThriftTimeout();
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraDaemonService.java
Fri Dec 16 14:40:32 2011
@@ -17,7 +17,6 @@
import java.util.List;
-import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
/**
@@ -46,14 +45,6 @@
String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
/**
- * Returns the Cassandra server which represents the Cassandra Thrift API.
- *
- * @see http://wiki.apache.org/cassandra/API
- * @return the Cassandra server.
- */
- CassandraServer getCassandraServer();
-
- /**
* Returns a list of all available keyspaces. Note that keyspace names are
case-sensitive
* in Cassandra.
*
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
Fri Dec 16 14:40:32 2011
@@ -41,7 +41,7 @@
private static final String STORAGE_CONF_SOURCE = "conf/cassandra.yaml";
private static final String LOG4J_CONF_SOURCE = "conf/log4j.properties";
private static final int DEFAULT_SCHEMA_AGREEMENT_TIMEOUT = 30;
-
+ private static final int DEFAULT_THRIFT_TIMEOUT = 10;
// Reference to the LogService
private volatile LogService m_logService;
@@ -59,6 +59,7 @@
private int m_storagePort;
private String m_clusterName;
private int m_schemaAgreementTimeout;
+ private int m_thriftTimeout;
/**
* The init() method is invoked by the Felix dependency manager. It allows
us to initialize our service. In this
@@ -139,6 +140,13 @@
// For backwards compatibility
m_schemaAgreementTimeout = DEFAULT_SCHEMA_AGREEMENT_TIMEOUT;
}
+
+ if (dictionary.get(THRIFT_TIMEOUT) != null) {
+ m_thriftTimeout = toInt(dictionary.get(THRIFT_TIMEOUT));
+ } else {
+ // For backwards compatibility
+ m_thriftTimeout = DEFAULT_THRIFT_TIMEOUT;
+ }
}
}
@@ -219,4 +227,8 @@
public int getSchemaAgreementTimeout() {
return m_schemaAgreementTimeout;
}
+
+ public int getThriftTimeout() {
+ return m_thriftTimeout;
+ }
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Fri Dec 16 14:40:32 2011
@@ -15,7 +15,9 @@
*/
package org.amdatu.cassandra.application.service;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -26,13 +28,19 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.log.LogService;
@@ -50,7 +58,7 @@
// name for the replication_factor option in Cassandra (internally hard
coded "replication_factor" is used all over
// the place), so we define it here for internal use.
private static final String RF_STRATEGY_OPTION = "replication_factor";
-
+
// Service dependencies, injected by the framework
private volatile LogService m_logService;
private volatile EventAdmin m_eventAdmin;
@@ -59,7 +67,10 @@
// Local cache of ColumnTypes of all ColumnFamilies for each keyspace
private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new
HashMap<String, Map<String, String>>();
- private CassandraServer m_cassandraServer = null;
+ // Thrift connection to a Cassandra daemon (local or remote)
+ private String m_connectString = null;
+ private TTransport m_transport;
+ private Cassandra.Iface m_cassandraClient = null;
/**
* The init() method is invoked by the Felix dependency manager.
@@ -68,33 +79,80 @@
m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
initialized");
}
- public void start() throws TException, InvalidRequestException {
- m_cassandraServer = new CassandraServer();
+ public void start() {
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + "service
started");
}
public void stop() {
m_logService.log(LogService.LOG_INFO, getClass().getName() + "service
stopped");
}
- public void destroy() {
- m_cassandraServer = null;
+ public void destroy() throws Exception {
+ disconnect();
m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
destroyed");
}
public void setConfiguration(CassandraConfigurationService config) {
m_configuration = config;
}
-
+
public void setLogService(LogService logService) {
m_logService = logService;
}
-
+
public void setEventAdmin(EventAdmin eventAdmin) {
m_eventAdmin = eventAdmin;
}
-
- public CassandraServer getCassandraServer() {
- return m_cassandraServer;
+
+ private synchronized Cassandra.Iface getCassandraClient() throws
TTransportException {
+ // This returns the IP addresses of all live nodes in the ring
+ List<String> liveNodes = StorageService.instance.getLiveNodes();
+
+ // Sort the list and pick the first record
+ Collections.sort(liveNodes);
+
+ // Verify if the connect string has changed (which may the case if
nodes joined
+ // or leaved the cluster). If so, reconnect to the new Cassandra node.
+ String host = liveNodes.get(0);
+ int port = m_configuration.getRPCPort();
+ String connectString = host + ":" + port;
+ if (!connectString.equals(m_connectString)) {
+ connect(host, port);
+ }
+ return m_cassandraClient;
+ }
+
+ private void connect(String host, int port) throws TTransportException {
+ // First disconnect from the current daemon, if applicable
+ disconnect();
+
+ m_logService.log(LogService.LOG_INFO, "Establishing Thrift connection
to the Cassandra server on "
+ + host + ":" + port);
+ int timeout = m_configuration.getThriftTimeout()*1000;
+ TSocket socket = new TSocket(host, port, timeout);
+ m_transport = new TFramedTransport(socket);
+ try {
+ m_transport.open();
+ TProtocol proto = new TBinaryProtocol(m_transport);
+ m_cassandraClient = new Cassandra.Client(proto);
+ m_connectString = host + ":" + port;
+ }
+ catch (TTransportException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not establishing
Thrift connection to the " +
+ "Cassandra server on " + host + ":" + port, e);
+ }
+ }
+
+ private void disconnect() throws TTransportException {
+ if (m_transport != null && m_transport.isOpen()) {
+ m_logService.log(LogService.LOG_INFO, "Disconnecting Thrift
connection to the Cassandra server on "
+ + m_connectString);
+ try {
+ m_transport.flush();
+ } finally {
+ m_transport.close();
+ }
+ }
}
public synchronized boolean keyspaceExists(final String keyspaceName) {
@@ -110,7 +168,7 @@
public synchronized List<String> getKeyspaces() {
try {
List<String> keyspaceNames = new ArrayList<String>();
- List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
+ List<KsDef> keyspaces = getCassandraClient().describe_keyspaces();
for (KsDef keyspace : keyspaces) {
keyspaceNames.add(keyspace.getName());
}
@@ -133,7 +191,7 @@
while (agreed == null && System.currentTimeMillis() < expires) {
m_logService.log(LogService.LOG_INFO,
"Schema definitions are not yet promulgated throughout the
cluster, waiting for schema agreement. Cluster schema's:"
- + getSchemaVersions());
+ + getSchemaVersions());
try {
// Wait for 2 seconds, then try again
Thread.sleep(2000);
@@ -144,12 +202,14 @@
if (System.currentTimeMillis() >= expires) {
m_logService.log(LogService.LOG_WARNING,
"Schema agreement timeout reached (which is " +
m_configuration.getSchemaAgreementTimeout()
- + " seconds. This may cause ScmeDisagreementExceptions to
be thrown.");
- } else if (pastAgreement) {
+ + " seconds. This may cause ScmeDisagreementExceptions to be
thrown.");
+ }
+ else if (pastAgreement) {
// Schema agreement was not an issue, so log on debug level
m_logService.log(LogService.LOG_DEBUG, "Schema agreed between all
nodes in the cluster, agreed version='"
- + agreed + "'");
- } else {
+ + agreed + "'");
+ }
+ else {
// Schema agreement was an issue, so log on info level
m_logService.log(LogService.LOG_INFO, "Schema agreed between all
nodes in the cluster, agreed version='"
+ agreed + "'");
@@ -158,12 +218,12 @@
private String checkSchemaAgreement() {
try {
- Map<String, List<String>> schemas =
m_cassandraServer.describe_schema_versions();
+ Map<String, List<String>> schemas =
getCassandraClient().describe_schema_versions();
String version = null;
for (String key : schemas.keySet()) {
if (StorageProxy.UNREACHABLE.equals(key)) {
m_logService.log(LogService.LOG_DEBUG, "Ignoring version
of unreacheable node(s): " + key + "="
- + schemas.get(key));
+ + schemas.get(key));
}
else {
if (version == null) {
@@ -189,7 +249,7 @@
private String getSchemaVersions() {
try {
String result = "";
- Map<String, List<String>> schemas =
m_cassandraServer.describe_schema_versions();
+ Map<String, List<String>> schemas =
getCassandraClient().describe_schema_versions();
for (String key : schemas.keySet()) {
result += "version '" + key + "' : ";
for (String entry : schemas.get(key)) {
@@ -221,7 +281,7 @@
KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY,
empty);
try {
ksDef.putToStrategy_options(RF_STRATEGY_OPTION, new
Integer(replicationFactor).toString());
- m_cassandraServer.system_add_keyspace(ksDef);
+ getCassandraClient().system_add_keyspace(ksDef);
}
catch (InvalidRequestException e) {
// Now this error may appear if some other node in the
cluster added this keyspace
@@ -229,7 +289,7 @@
// keyspace exists after all, and if so, return without
throwing an exception
if (keyspaceExists(name)) {
m_logService.log(LogService.LOG_DEBUG, "Keyspace '" +
name + "' was not added since it "
- + "already existed");
+ + "already existed");
return false;
}
else {
@@ -246,11 +306,13 @@
}
else {
m_logService
- .log(LogService.LOG_DEBUG, "Keyspace '" + name + "' was
not added since it already existed");
+ .log(LogService.LOG_DEBUG, "Keyspace '" + name + "' was not
added since it already existed");
return false;
}
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while adding keyspace '" +
+ name + "'!", e);
throw new ThriftException(e);
}
catch (TException e) {
@@ -260,7 +322,7 @@
private boolean compareReplicationFactor(KsDef keyspace, int rf) {
if (keyspace.getStrategy_optionsSize() > 0
- && keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION))
{
+ &&
keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) {
int currentRF =
Integer.parseInt(keyspace.getStrategy_options().get(RF_STRATEGY_OPTION));
return currentRF == rf;
}
@@ -277,12 +339,13 @@
+ " may never exceed the amount of nodes "
+ "in the cluster, which currently is " + clusterSize);
}
- List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
+ Cassandra.Iface client = getCassandraClient();
+ List<KsDef> keyspaces = client.describe_keyspaces();
for (KsDef keyspace : keyspaces) {
if (!compareReplicationFactor(keyspace, replicationFactor)) {
keyspace.putToStrategy_options(RF_STRATEGY_OPTION, new
Integer(replicationFactor).toString());
keyspace.setCf_defs(null);
- m_cassandraServer.system_update_keyspace(keyspace);
+ client.system_update_keyspace(keyspace);
}
}
}
@@ -293,16 +356,19 @@
throw new ThriftException(e);
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while setting RF to " +
+ replicationFactor + "!", e);
throw new ThriftException(e);
}
}
public synchronized void setReplicationFactor(final String keyspace, final
int replicationFactor) {
try {
- KsDef ksDef = m_cassandraServer.describe_keyspace(keyspace);
+ Cassandra.Iface client = getCassandraClient();
+ KsDef ksDef = client.describe_keyspace(keyspace);
if (!compareReplicationFactor(ksDef, replicationFactor)) {
ksDef.setCf_defs(null);
- m_cassandraServer.system_update_keyspace(ksDef);
+ client.system_update_keyspace(ksDef);
}
}
catch (TException e) {
@@ -315,6 +381,8 @@
throw new ThriftException(e).setKeyspace(keyspace);
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while setting RF to " +
+ replicationFactor + " for keyspace '" + keyspace +
"'!", e);
throw new ThriftException(e).setKeyspace(keyspace);
}
}
@@ -325,7 +393,7 @@
waitForSchemaAgreement();
if (keyspaceExists(keyspace)) {
- m_cassandraServer.system_drop_keyspace(keyspace);
+ getCassandraClient().system_drop_keyspace(keyspace);
// Publish an event that a keyspace has been dropped
Map<String, String> properties = new HashMap<String, String>();
@@ -344,13 +412,15 @@
throw new ThriftException(e).setKeyspace(keyspace);
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while dropping keyspace '" +
+ keyspace + "'!", e);
throw new ThriftException(e).setKeyspace(keyspace);
}
}
-
- public CfDef getColumnFamily(final String keyspaceName, final String
columnFamilyName) {
+
+ public synchronized CfDef getColumnFamily(final String keyspaceName, final
String columnFamilyName) {
try {
- KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
+ KsDef ksDef = getCassandraClient().describe_keyspace(keyspaceName);
List<CfDef> cfDefs = ksDef.getCf_defs();
for (CfDef cfDef : cfDefs) {
if (cfDef.getName().equals(columnFamilyName)) {
@@ -367,14 +437,18 @@
// Rethrow the checked exception as a new unchecked Thrift
exception
throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
}
+ catch (TException e) {
+ // Rethrow the checked exception as a new unchecked Thrift
exception
+ throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+ }
}
- public boolean columnFamilyExists(final String keyspaceName, final String
columnFamilyName) {
+ public synchronized boolean columnFamilyExists(final String keyspaceName,
final String columnFamilyName) {
CfDef cfDef = getColumnFamily(keyspaceName, columnFamilyName);
return cfDef != null;
}
- public String getColumnType(final String keyspaceName, final String
columnFamilyName) {
+ public synchronized String getColumnType(final String keyspaceName, final
String columnFamilyName) {
Map<String, String> columnFamilyTypeMap;
if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
columnFamilyTypeMap =
m_keyspaceColumnFamilyTypes.get(keyspaceName);
@@ -387,7 +461,7 @@
}
else {
try {
- KsDef ksDef =
m_cassandraServer.describe_keyspace(keyspaceName);
+ KsDef ksDef =
getCassandraClient().describe_keyspace(keyspaceName);
List<CfDef> cfDefs = ksDef.getCf_defs();
for (CfDef cfDef : cfDefs) {
if (cfDef.getName().equals(columnFamilyName)) {
@@ -406,13 +480,17 @@
// Rethrow the checked exception as a new unchecked Thrift
exception
throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
}
+ catch (TException e) {
+ // Rethrow the checked exception as a new unchecked Thrift
exception
+ throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+ }
}
}
- public List<String> getColumnFamilies(final String keyspaceName) {
+ public synchronized List<String> getColumnFamilies(final String
keyspaceName) {
try {
List<String> cfNames = new ArrayList<String>();
- KsDef ksDef = m_cassandraServer.describe_keyspace(keyspaceName);
+ KsDef ksDef = getCassandraClient().describe_keyspace(keyspaceName);
List<CfDef> cfDefs = ksDef.getCf_defs();
for (CfDef cfDef : cfDefs) {
cfNames.add(cfDef.getName());
@@ -427,6 +505,10 @@
// Rethrow the checked exception as a new unchecked Thrift
exception
throw new ThriftException(e).setKeyspace(keyspaceName);
}
+ catch (TException e) {
+ // Rethrow the checked exception as a new unchecked Thrift
exception
+ throw new ThriftException(e).setKeyspace(keyspaceName);
+ }
}
public synchronized boolean addColumnFamily(final String keyspace, final
String cfName, final String columnType,
@@ -439,9 +521,9 @@
return addColumnFamily(keyspace, cfDef);
}
-
+
@Override
- public boolean addColumnFamily(String keyspace, CfDef cfDef) {
+ public synchronized boolean addColumnFamily(String keyspace, CfDef cfDef) {
// Before we create the columnFamily, the cluster must agree upon the
schema
waitForSchemaAgreement();
@@ -452,15 +534,16 @@
}
try {
if (!columnFamilyExists(keyspace, cfName)) {
- m_cassandraServer.set_keyspace(keyspace);
- m_cassandraServer.system_add_column_family(cfDef);
+ Cassandra.Iface client = getCassandraClient();
+ client.set_keyspace(keyspace);
+ client.system_add_column_family(cfDef);
m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName + "' has been added to keyspace '"
- + keyspace + "'");
+ + keyspace + "'");
return true;
}
else {
m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" +
cfName + "' was not added to keyspace '"
- + keyspace + "' since it already existed");
+ + keyspace + "' since it already existed");
return false;
}
}
@@ -471,12 +554,14 @@
throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while adding '" +
+ cfName + "' to the keyspace '" + keyspace + "'!",
e);
throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
}
}
-
+
@Override
- public boolean updateColumnFamily(String keyspace, CfDef cfDef) {
+ public synchronized boolean updateColumnFamily(String keyspace, CfDef
cfDef) {
// Before we create the columnFamily, the cluster must agree upon the
schema
waitForSchemaAgreement();
@@ -488,15 +573,27 @@
try {
CfDef oldCfDef = getColumnFamily(keyspace, cfName);
if (oldCfDef != null) {
- m_cassandraServer.set_keyspace(keyspace);
- cfDef.setId(oldCfDef.getId());
- m_cassandraServer.system_update_column_family(cfDef);
- m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName + "' has been updated in the keyspace '"
- + keyspace + "'");
- return true;
+ // First compare if there is really a difference.
+ if (!CfDefComparator.equal(oldCfDef, cfDef)) {
+ Cassandra.Iface client = getCassandraClient();
+ client.set_keyspace(keyspace);
+ cfDef.setId(oldCfDef.getId());
+ client.system_update_column_family(cfDef);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamily '" +
cfName
+ + "' has been updated in the keyspace '"
+ + keyspace + "'");
+ return true;
+ }
+ else {
+ m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" +
cfName
+ + "' was not updated in the keyspace '"
+ + keyspace + "', no changes detected");
+ return false;
+ }
}
else {
- m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" +
cfName + "' was not updated in the keyspace '"
+ m_logService.log(LogService.LOG_DEBUG, "ColumnFamily '" +
cfName
+ + "' was not updated in the keyspace '"
+ keyspace + "' since it does not exist");
return false;
}
@@ -508,6 +605,11 @@
throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
}
catch (SchemaDisagreementException e) {
+ m_logService.log(LogService.LOG_ERROR, "A
SchemaDisagreementException occurred while updating CF '" +
+ cfName + "'!", e);
+ throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
+ }
+ catch (UnsupportedEncodingException e) {
throw new
ThriftException(e).setKeyspace(keyspace).setColumnFamily(cfName);
}
}
@@ -529,17 +631,19 @@
return true;
}
- public boolean isColumnFamilyChanged(final String keyspace, final String
cfName, final String columnType,
+ public synchronized boolean isColumnFamilyChanged(final String keyspace,
final String cfName,
+ final String columnType,
final String comparatorType, final String subComparatorType) {
try {
- m_cassandraServer.set_keyspace(keyspace);
- KsDef ksDef = m_cassandraServer.describe_keyspace(keyspace);
+ Cassandra.Iface client = getCassandraClient();
+ client.set_keyspace(keyspace);
+ KsDef ksDef = client.describe_keyspace(keyspace);
List<CfDef> cfDefs = ksDef.getCf_defs();
for (CfDef cfDef : cfDefs) {
if (cfDef.getName().equals(cfName)) {
if (!cfDef.column_type.equals(columnType)
- || !equalComparator(cfDef.comparator_type,
comparatorType)
- || !equalComparator(cfDef.subcomparator_type,
subComparatorType)) {
+ || !equalComparator(cfDef.comparator_type,
comparatorType)
+ ||
!equalComparator(cfDef.subcomparator_type, subComparatorType)) {
return true;
}
}
Added:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CfDefComparator.java
Fri Dec 16 14:40:32 2011
@@ -0,0 +1,313 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.application.service;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+
+/**
+ * This is a helper class to detect if there are any changes in a given
ColumnFamily definition
+ * with respect to an existing ColumnFamily definition. Note that the normal
compareTo method in
+ * CfDef also considers two CfDef's to be different in case the first def has
a columntype and the
+ * second one did not define it. If a column type is left undefined the
special flag 'isSetColumn_type'
+ * is set to false and when updating the CF, this property is ignored. As this
will not result in an
+ * update of the ColumnFamily, we do not consider this as a change.
+ * For that reason, this class implements an equals method that returns only
false if invoking
+ * updateColumnFamily would effectively change anything in that CF.
+ *
+ * @author <a href=mailto:[email protected]>Amdatu Project Team</a>
+ */
+public class CfDefComparator {
+ /**
+ * Returns if there are any properties in the new definition that would
effective change
+ * anything in the ColumnFamily (represented by the old ColumnFamily
definition) when passed
+ * to updateColumnFamily.
+ * @param oldDef The current ColumnFamily definition
+ * @param newDef The new ColumnFamily definition
+ * @return true if passing the newDef to the updateColumnFamily method
would effectively change
+ * anything in the CF
+ * @throws UnsupportedEncodingException
+ */
+ public static boolean equal(CfDef oldDef, CfDef newDef) throws
UnsupportedEncodingException {
+ int lastComparison = 0;
+
+ lastComparison =
Boolean.valueOf(oldDef.isSetKeyspace()).compareTo(newDef.isSetKeyspace());
+ if (lastComparison != 0) {
+ return false;
+ }
+ if (oldDef.isSetKeyspace()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.keyspace, newDef.keyspace);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ lastComparison =
Boolean.valueOf(oldDef.isSetName()).compareTo(newDef.isSetName());
+ if (lastComparison != 0) {
+ return false;
+ }
+ if (oldDef.isSetName()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.name, newDef.name);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetColumn_type()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.column_type, newDef.column_type);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetComparator_type()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.comparator_type,
newDef.comparator_type);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetSubcomparator_type()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.subcomparator_type,
newDef.subcomparator_type);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetComment()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.comment, newDef.comment);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetRow_cache_size()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_size,
newDef.row_cache_size);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetKey_cache_size()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_cache_size,
newDef.key_cache_size);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetRead_repair_chance()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.read_repair_chance,
newDef.read_repair_chance);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetGc_grace_seconds()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.gc_grace_seconds,
newDef.gc_grace_seconds);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetDefault_validation_class()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.default_validation_class,
+ newDef.default_validation_class);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetId()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.id, newDef.id);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMin_compaction_threshold()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.min_compaction_threshold,
+ newDef.min_compaction_threshold);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMax_compaction_threshold()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.max_compaction_threshold,
+ newDef.max_compaction_threshold);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetRow_cache_save_period_in_seconds()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_save_period_in_seconds,
+ newDef.row_cache_save_period_in_seconds);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetKey_cache_save_period_in_seconds()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_cache_save_period_in_seconds,
+ newDef.key_cache_save_period_in_seconds);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMemtable_flush_after_mins()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_flush_after_mins,
+ newDef.memtable_flush_after_mins);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMemtable_throughput_in_mb()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_throughput_in_mb,
+ newDef.memtable_throughput_in_mb);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMemtable_operations_in_millions()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.memtable_operations_in_millions,
+ newDef.memtable_operations_in_millions);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetReplicate_on_write()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.replicate_on_write,
newDef.replicate_on_write);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetMerge_shards_chance()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.merge_shards_chance,
newDef.merge_shards_chance);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetKey_validation_class()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_validation_class,
newDef.key_validation_class);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetRow_cache_provider()) {
+ lastComparison =
+
org.apache.thrift.TBaseHelper.compareTo(oldDef.row_cache_provider,
newDef.row_cache_provider);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetKey_alias()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.key_alias, newDef.key_alias);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ List<ColumnDef> oldColDefs = oldDef.getColumn_metadata();
+ List<ColumnDef> newColDefs = newDef.getColumn_metadata();
+ int newCols = newColDefs == null ? 0 : newColDefs.size();
+ int oldCols = oldColDefs == null ? 0 : oldColDefs.size();
+ if (newCols != oldCols) {
+ return false;
+ }
+ else if (newCols > 0) {
+ // Compare by id
+ int compared = 0;
+ for (ColumnDef oldColDef : oldColDefs) {
+ String oldName = new String(oldColDef.getName(), "UTF-8");
+ for (ColumnDef newColDef : newColDefs) {
+ String newName = new String(newColDef.getName(), "UTF-8");
+ if (oldName.equals(newName)) {
+ compared++;
+ if (!equal(oldColDef, newColDef)) {
+ return false;
+ }
+ }
+ }
+ }
+ return compared == newCols;
+ }
+ return true;
+ }
+
+ private static boolean equal(ColumnDef oldDef, ColumnDef newDef) {
+ int lastComparison = 0;
+
+ lastComparison =
Boolean.valueOf(oldDef.isSetName()).compareTo(newDef.isSetName());
+ if (lastComparison != 0) {
+ return false;
+ }
+ if (newDef.isSetName()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.name, newDef.name);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetValidation_class()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.validation_class,
newDef.validation_class);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetIndex_type()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.index_type, newDef.index_type);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+
+ if (newDef.isSetIndex_name()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(oldDef.index_name, newDef.index_name);
+ if (lastComparison != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/CassandraUnitTestBase.java
Fri Dec 16 14:40:32 2011
@@ -79,10 +79,10 @@
startThread.join();
m_daemonService = new CassandraDaemonServiceImpl();
- m_daemonService.start();
m_daemonService.setConfiguration(new
CassandraConfigurationServiceMock());
m_daemonService.setLogService(new LogServiceMock());
m_daemonService.setEventAdmin(new EventAdminMock());
+ m_daemonService.start();
}
public class CassandraDaemonStartThread extends Thread {
Modified:
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/test/java/org/amdatu/cassandra/test/unit/application/mock/CassandraConfigurationServiceMock.java
Fri Dec 16 14:40:32 2011
@@ -60,4 +60,8 @@
public int getSchemaAgreementTimeout() {
return 30;
}
+
+ public int getThriftTimeout() {
+ return 10;
+ }
}
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/ColumnFamilyDefinition.java
Fri Dec 16 14:40:32 2011
@@ -15,6 +15,12 @@
*/
package org.amdatu.cassandra.listener;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LexicalUUIDType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.thrift.CfDef;
/**
@@ -67,30 +73,32 @@
/**
* A column or super column name comparator of type bytes.
*/
- BYTESTYPE("BytesType"),
+ BYTESTYPE(BytesType.class.getName()),
/**
* A column or super column name comparator of type ASCII.
*/
- ASCIITYPE("AsciiType"),
+ ASCIITYPE(AsciiType.class.getName()),
/**
* A column or super column name comparator of type UTF-8 string.
*/
- UTF8TYPE("UTF8Type"),
+ UTF8TYPE(UTF8Type.class.getName()),
/**
* A column or super column name non-time based comparator. It is
compared lexically, by byte value.
*/
- LEXICALUUIDTYPE("LexicalUUIDType"),
+ LEXICALUUIDTYPE(LexicalUUIDType.class.getName()),
+
/**
* A column or super column name non-time based comparator. It
uses a version 1 UUID.
*/
- TIMEUUIDTYPE("TimeUUIDType"),
+ TIMEUUIDTYPE(TimeUUIDType.class.getName()),
+
/**
* A column or super column name comparator of type Long.
*/
- LONGTYPE("LongType");
+ LONGTYPE(LongType.class.getName());
private String m_value;
@@ -103,22 +111,22 @@
}
public static CompareType get(String value) {
- if (value.equalsIgnoreCase(BYTESTYPE.getValue())) {
+ if (value.equalsIgnoreCase(BYTESTYPE.getValue()) ||
BYTESTYPE.getValue().endsWith("." + value)) {
return BYTESTYPE;
}
- else if (value.equalsIgnoreCase(ASCIITYPE.getValue())) {
+ else if (value.equalsIgnoreCase(ASCIITYPE.getValue()) ||
ASCIITYPE.getValue().endsWith("." + value)) {
return ASCIITYPE;
}
- else if (value.equalsIgnoreCase(UTF8TYPE.getValue())) {
+ else if (value.equalsIgnoreCase(UTF8TYPE.getValue()) ||
UTF8TYPE.getValue().endsWith("." + value)) {
return UTF8TYPE;
}
- else if (value.equalsIgnoreCase(LEXICALUUIDTYPE.getValue())) {
+ else if (value.equalsIgnoreCase(LEXICALUUIDTYPE.getValue()) ||
LEXICALUUIDTYPE.getValue().endsWith("." + value)) {
return LEXICALUUIDTYPE;
}
- else if (value.equalsIgnoreCase(TIMEUUIDTYPE.getValue())) {
+ else if (value.equalsIgnoreCase(TIMEUUIDTYPE.getValue()) ||
TIMEUUIDTYPE.getValue().endsWith("." + value)) {
return TIMEUUIDTYPE;
}
- else if (value.equalsIgnoreCase(LONGTYPE.getValue())) {
+ else if (value.equalsIgnoreCase(LONGTYPE.getValue()) ||
LONGTYPE.getValue().endsWith("." + value)) {
return LONGTYPE;
}
return null;
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
Fri Dec 16 14:40:32 2011
@@ -23,7 +23,6 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;
import org.osgi.service.log.LogService;
@@ -40,9 +39,9 @@
private volatile LogService m_logService;
private volatile CassandraDaemonService m_daemonService;
- public void columnFamilyProviderAdded(final ColumnFamilyProvider provider)
{
+ public synchronized void columnFamilyProviderAdded(final
ColumnFamilyProvider provider) {
try {
- List<KsDef> keyspaceDefinitions =
m_daemonService.getCassandraServer().describe_keyspaces();
+ List<String> keyspaceDefinitions = m_daemonService.getKeyspaces();
ColumnFamilyDefinition[] columnFamilies =
provider.getColumnFamilies();
for (ColumnFamilyDefinition columnFamily : columnFamilies) {
@@ -66,9 +65,9 @@
"it is recommended not to use it anymore. The
feature may be removed in future versions. " +
"For more information, see
http://jira.amdatu.org/jira/browse/AMDATUCASSANDRA-118";
m_logService.log(LogService.LOG_WARNING, msg);
- for (KsDef keyspaceDef : keyspaceDefinitions) {
- if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName()))
{
- addOrUpdateColumnFamily(m_daemonService,
keyspaceDef.getName(), columnFamily);
+ for (String keyspaceDef : keyspaceDefinitions) {
+ if (!Table.SYSTEM_TABLE.equals(keyspaceDef)) {
+ addOrUpdateColumnFamily(m_daemonService,
keyspaceDef, columnFamily);
}
}
}
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/test/java/org/amdatu/cassandra/test/unit/listener/ListenerTest.java
Fri Dec 16 14:40:32 2011
@@ -174,11 +174,11 @@
}
private void testUpdateColumnFamily() throws Exception {
- // Update with the same properties, should succeed
+ // Update with the same properties, should return false (no changes)
CfDef cfDef = new CfDef(TEST_KS, TEST_CF);
cfDef.setColumn_type(ColumnType.STANDARD.getValue());
cfDef.setComparator_type(CompareType.BYTESTYPE.getValue());
- Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+ Assert.assertFalse(m_daemon.updateColumnFamily(TEST_KS, cfDef));
// Update with change standard -> super, should throw an exception
cfDef.setColumn_type(ColumnType.SUPER.getValue());
@@ -196,6 +196,9 @@
cfDef.addToColumn_metadata(cDef);
Assert.assertTrue(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+ // Invoke update without an effective change, should return false
+ Assert.assertFalse(m_daemon.updateColumnFamily(TEST_KS, cfDef));
+
// Update the ColumnDefinition without the ColDef, the ColDef should
be removed
cfDef = new CfDef(TEST_KS, TEST_CF);
cfDef.setColumn_type(ColumnType.STANDARD.getValue());
Modified:
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
==============================================================================
---
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
(original)
+++
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
Fri Dec 16 14:40:32 2011
@@ -103,4 +103,14 @@
# is however no longer blocked then the schema agreement timeout. If the
timeout kicks in, the Column Family or Keyspace
# is created or dropped, resulting in a possible SchemaDisagreementException
internally thrown by Cassandra.
# The default is 30 seconds, schema agreement usually takes place within 5-10
seconds.
-schema_agreement_timeout=30
\ No newline at end of file
+schema_agreement_timeout=30
+
+# The timeout (in seconds) for Thrift connections used by the Cassandra daemon
service. All schema changes
+# are handled by this service, which opens a Thrift client to a central
'schema master' to send all changes to
+# (schema modifications cannot be handles by different nodes concurrently).
+# This timeout determines how long the client will wait for a response. When a
timeout occurs, the schema update
+# will fail.
+# Note that if the central 'schema master' is down, this timeout determines
how long it will take for the daemon
+# to detect that it is down and switch to the new schema master. It is
therefore recommended not to increase this
+# timeout too much.
+thrift_timeout=10
\ No newline at end of file
Modified:
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
==============================================================================
---
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
(original)
+++
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
Fri Dec 16 14:40:32 2011
@@ -61,4 +61,8 @@
public int getSchemaAgreementTimeout() {
return 30;
}
+
+ public int getThriftTimeout() {
+ return 10;
+ }
}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits