Author: [email protected]
Date: Tue Nov 1 11:59:42 2011
New Revision: 1698
Log:
[AMDATUCASSANDRA-127] Merged fix from 0.2.1 branch
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
Tue Nov 1 11:59:42 2011
@@ -113,6 +113,11 @@
* Configuration key that stores the name of the cluster that this node is
part of.
*/
String CLUSTER_NAME = "clustername";
+
+ /**
+ * Configuration key that stores the schema agreement timeout (in seconds).
+ */
+ String SCHEMA_AGREEMENT_TIMEOUT = "schema_agreement_timeout";
/**
* Returns the default replication factor for new keyspaces. The
replication factor determines
@@ -186,4 +191,13 @@
* @return the name of the cluster that this node is part of.
*/
String getClustername();
+
+ /**
+ * Returns the timeout (in seconds) to wait for schema agreement. When a
Keyspace or ColumnFamily is added or dropped,
+ * the thread is blocked until the while cluster agrees upon the schema
version (neglecting unreachable nodes). The thread
+ * is however no longer blocked then the schema agreement timeout. If the
timeout kicks in, the Column Family or Keyspace
+ * is created or dropped, resulting in a possible
SchemaDisagreementException internally thrown by Cassandra.
+ * @return the timeout (in seconds) to wait for schema agreement
+ */
+ int getSchemaAgreementTimeout();
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
Tue Nov 1 11:59:42 2011
@@ -40,6 +40,8 @@
// Statics
private static final String STORAGE_CONF_SOURCE = "conf/cassandra.yaml";
private static final String LOG4J_CONF_SOURCE = "conf/log4j.properties";
+ private static final int DEFAULT_SCHEMA_AGREEMENT_TIMEOUT = 30;
+
// Reference to the LogService
private volatile LogService m_logService;
@@ -56,6 +58,7 @@
private int m_rpcPort;
private int m_storagePort;
private String m_clusterName;
+ private int m_schemaAgreementTimeout;
/**
* The init() method is invoked by the Felix dependency manager. It allows
us to initialize our service. In this
@@ -130,6 +133,12 @@
m_rpcPort = toInt(dictionary.get(RPC_PORT));
m_storagePort = toInt(dictionary.get(STORAGE_PORT));
m_clusterName = dictionary.get(CLUSTER_NAME).toString();
+ if (dictionary.get(SCHEMA_AGREEMENT_TIMEOUT) != null) {
+ m_schemaAgreementTimeout =
toInt(dictionary.get(SCHEMA_AGREEMENT_TIMEOUT));
+ } else {
+ // For backwards compatibility
+ m_schemaAgreementTimeout = DEFAULT_SCHEMA_AGREEMENT_TIMEOUT;
+ }
}
}
@@ -206,4 +215,8 @@
public String getClustername() {
return m_clusterName;
}
+
+ public int getSchemaAgreementTimeout() {
+ return m_schemaAgreementTimeout;
+ }
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Tue Nov 1 11:59:42 2011
@@ -24,6 +24,7 @@
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.application.ThriftException;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
@@ -67,7 +68,7 @@
m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
initialized");
}
- public void start() {
+ public void start() throws TException, InvalidRequestException {
m_cassandraServer = new CassandraServer();
}
@@ -113,12 +114,96 @@
}
}
+ private void waitForSchemaAgreement() {
+ String agreed = checkSchemaAgreement();
+ long expires = System.currentTimeMillis() + 1000 *
m_configuration.getSchemaAgreementTimeout();
+ boolean pastAgreement = (agreed != null);
+ while (agreed == null && System.currentTimeMillis() < expires) {
+ m_logService.log(LogService.LOG_INFO,
+ "Schema definitions are not yet promulgated throughout the
cluster, waiting for schema agreement. Cluster schema's:"
+ + getSchemaVersions());
+ try {
+ // Wait for 2 seconds, then try again
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e) {}
+ agreed = checkSchemaAgreement();
+ }
+ if (System.currentTimeMillis() >= expires) {
+ m_logService.log(LogService.LOG_WARNING,
+ "Schema agreement timeout reached (which is " +
m_configuration.getSchemaAgreementTimeout()
+ + " seconds. This may cause ScmeDisagreementExceptions to
be thrown.");
+ } else if (pastAgreement) {
+ // Schema agreement was not an issue, so log on debug level
+ m_logService.log(LogService.LOG_DEBUG, "Schema agreed between all
nodes in the cluster, agreed version='"
+ + agreed + "'");
+ } else {
+ // Schema agreement was an issue, so log on info level
+ m_logService.log(LogService.LOG_INFO, "Schema agreed between all
nodes in the cluster, agreed version='"
+ + agreed + "'");
+ }
+ }
+
+ private String checkSchemaAgreement() {
+ try {
+ Map<String, List<String>> schemas =
m_cassandraServer.describe_schema_versions();
+ String version = null;
+ for (String key : schemas.keySet()) {
+ if (StorageProxy.UNREACHABLE.equals(key)) {
+ m_logService.log(LogService.LOG_DEBUG, "Ignoring version
of unreacheable node(s): " + key + "="
+ + schemas.get(key));
+ }
+ else {
+ if (version == null) {
+ version = key;
+ }
+ else {
+ if (!version.equals(key)) {
+ return null;
+ }
+ }
+ }
+ }
+ return version;
+ }
+ catch (TException e) {
+ return null;
+ }
+ catch (InvalidRequestException e) {
+ return null;
+ }
+ }
+
+ private String getSchemaVersions() {
+ try {
+ String result = "";
+ Map<String, List<String>> schemas =
m_cassandraServer.describe_schema_versions();
+ for (String key : schemas.keySet()) {
+ result += "version '" + key + "' : ";
+ for (String entry : schemas.get(key)) {
+ result += entry + " ";
+ }
+ result += System.getProperty("line.separator");
+ }
+ return result;
+ }
+ catch (TException e) {
+ return null;
+ }
+ catch (InvalidRequestException e) {
+ return null;
+ }
+ }
+
public synchronized boolean addKeyspace(final String name) {
return addKeyspace(name,
m_configuration.getDefaultReplicationFactor());
}
public synchronized boolean addKeyspace(final String name, final int
replicationFactor) {
try {
+ // Before we create the keyspace, the cluster must agree upon the
schema
+ waitForSchemaAgreement();
+
if (!keyspaceExists(name)) {
List<CfDef> empty = new ArrayList<CfDef>();
KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY,
empty);
@@ -163,7 +248,7 @@
private boolean compareReplicationFactor(KsDef keyspace, int rf) {
if (keyspace.getStrategy_optionsSize() > 0
- &&
keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION)) {
+ && keyspace.getStrategy_options().containsKey(RF_STRATEGY_OPTION))
{
int currentRF =
Integer.parseInt(keyspace.getStrategy_options().get(RF_STRATEGY_OPTION));
return currentRF == rf;
}
@@ -224,6 +309,9 @@
public synchronized boolean dropKeyspace(final String keyspace) {
try {
+ // Before we drop the keyspace, the cluster must agree upon the
schema
+ waitForSchemaAgreement();
+
if (keyspaceExists(keyspace)) {
m_cassandraServer.system_drop_keyspace(keyspace);
@@ -326,6 +414,9 @@
public synchronized boolean addColumnFamily(final String keyspace, final
String cfName, final String columnType,
final String comparatorType, final String subComparatorType) {
+ // Before we create the columnFamily, the cluster must agree upon the
schema
+ waitForSchemaAgreement();
+
if (keyspace.equals(Table.SYSTEM_TABLE)) {
throw new ThriftException("ColumnFamily's cannot be added to
Cassandra's system keyspace");
}
Modified:
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
(original)
+++
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/test/java/org/amdatu/cassandra/persistencemanager/hector/mock/CassandraConfigurationServiceMock.java
Tue Nov 1 11:59:42 2011
@@ -55,5 +55,9 @@
public String getClustername() {
return "Amdatu Unit Test Cluster";
+ }
+
+ public int getSchemaAgreementTimeout() {
+ return 30;
}
}
Modified:
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
==============================================================================
---
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
(original)
+++
trunk/amdatu-cassandra/config/src/main/resources/org.amdatu.core.cassandra.application.cfg
Tue Nov 1 11:59:42 2011
@@ -96,4 +96,11 @@
rpc_port=${cassandra.rpc_port}
# storage port
-storage_port=${cassandra.storage_port}
\ No newline at end of file
+storage_port=${cassandra.storage_port}
+
+# The timeout (in seconds) to wait for schema agreement. When a Keyspace or
ColumnFamily is added or dropped,
+# the thread is blocked until the while cluster agrees upon the schema version
(neglecting unreachable nodes). The thread
+# is however no longer blocked then the schema agreement timeout. If the
timeout kicks in, the Column Family or Keyspace
+# is created or dropped, resulting in a possible SchemaDisagreementException
internally thrown by Cassandra.
+# The default is 30 seconds, schema agreement usually takes place within 5-10
seconds.
+schema_agreement_timeout=30
\ No newline at end of file
Modified:
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
==============================================================================
---
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
(original)
+++
trunk/amdatu-cassandra/test-unit/framework/src/main/java/org/amdatu/cassandra/test/unit/framework/mock/CassandraConfigurationServiceMock.java
Tue Nov 1 11:59:42 2011
@@ -57,4 +57,8 @@
public String getClustername() {
return "Amdatu Unit Test Cluster";
}
+
+ public int getSchemaAgreementTimeout() {
+ return 30;
+ }
}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits