Author: gdusbabek
Date: Thu May 20 17:13:17 2010
New Revision: 946717
URL: http://svn.apache.org/viewvc?rev=946717&view=rev
Log:
service call to check for schema agreement. patch by gdusbabek, reviewed by
jbellis. CASSANDRA-1075
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Modified:
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Thu May 20 17:13:17 2010
@@ -435,6 +435,14 @@ service Cassandra {
// Meta-APIs -- APIs to get information about the node or cluster,
// rather than user data. The nodeprobe program provides usage examples.
+
+ /**
+ * ask the cluster if they all are using the same migration id. returns a
map of version->hosts-on-that-version.
+ * hosts that did not respond will be under the key
DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
+ * by checking if the size of the map is 1.
+ */
+ map<string, list<string>> check_schema_agreement()
+ throws (1: InvalidRequestException ire),
/** list the defined keyspaces in this cluster */
set<string> describe_keyspaces(),
Modified:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
---
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(original)
+++
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
Thu May 20 17:13:17 2010
@@ -162,6 +162,13 @@ public class Cassandra {
public void truncate(String keyspace, String cfname) throws
InvalidRequestException, UnavailableException, TException;
/**
+ * ask the cluster if they all are using the same migration id. returns a
map of version->hosts-on-that-version.
+ * hosts that did not respond will be under the key
DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
+ * by checking if the size of the map is 1.
+ */
+ public Map<String,List<String>> check_schema_agreement() throws
InvalidRequestException, TException;
+
+ /**
* list the defined keyspaces in this cluster
*/
public Set<String> describe_keyspaces() throws TException;
@@ -757,6 +764,41 @@ public class Cassandra {
return;
}
+ public Map<String,List<String>> check_schema_agreement() throws
InvalidRequestException, TException
+ {
+ send_check_schema_agreement();
+ return recv_check_schema_agreement();
+ }
+
+ public void send_check_schema_agreement() throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("check_schema_agreement",
TMessageType.CALL, seqid_));
+ check_schema_agreement_args args = new check_schema_agreement_args();
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public Map<String,List<String>> recv_check_schema_agreement() throws
InvalidRequestException, TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ check_schema_agreement_result result = new
check_schema_agreement_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ if (result.ire != null) {
+ throw result.ire;
+ }
+ throw new TApplicationException(TApplicationException.MISSING_RESULT,
"check_schema_agreement failed: unknown result");
+ }
+
public Set<String> describe_keyspaces() throws TException
{
send_describe_keyspaces();
@@ -1177,6 +1219,7 @@ public class Cassandra {
processMap_.put("remove", new remove());
processMap_.put("batch_mutate", new batch_mutate());
processMap_.put("truncate", new truncate());
+ processMap_.put("check_schema_agreement", new check_schema_agreement());
processMap_.put("describe_keyspaces", new describe_keyspaces());
processMap_.put("describe_cluster_name", new describe_cluster_name());
processMap_.put("describe_version", new describe_version());
@@ -1715,6 +1758,44 @@ public class Cassandra {
}
+ private class check_schema_agreement implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws
TException
+ {
+ check_schema_agreement_args args = new check_schema_agreement_args();
+ try {
+ args.read(iprot);
+ } catch (TProtocolException e) {
+ iprot.readMessageEnd();
+ TApplicationException x = new
TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ oprot.writeMessageBegin(new TMessage("check_schema_agreement",
TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ iprot.readMessageEnd();
+ check_schema_agreement_result result = new
check_schema_agreement_result();
+ try {
+ result.success = iface_.check_schema_agreement();
+ } catch (InvalidRequestException ire) {
+ result.ire = ire;
+ } catch (Throwable th) {
+ LOGGER.error("Internal error processing check_schema_agreement", th);
+ TApplicationException x = new
TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error
processing check_schema_agreement");
+ oprot.writeMessageBegin(new TMessage("check_schema_agreement",
TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ oprot.writeMessageBegin(new TMessage("check_schema_agreement",
TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
private class describe_keyspaces implements ProcessFunction {
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws
TException
{
@@ -13999,6 +14080,598 @@ public class Cassandra {
}
+ public static class check_schema_agreement_args implements
TBase<check_schema_agreement_args._Fields>, java.io.Serializable, Cloneable,
Comparable<check_schema_agreement_args> {
+ private static final TStruct STRUCT_DESC = new
TStruct("check_schema_agreement_args");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(check_schema_agreement_args.class,
metaDataMap);
+ }
+
+ public check_schema_agreement_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public check_schema_agreement_args(check_schema_agreement_args other) {
+ }
+
+ public check_schema_agreement_args deepCopy() {
+ return new check_schema_agreement_args(this);
+ }
+
+ @Deprecated
+ public check_schema_agreement_args clone() {
+ return new check_schema_agreement_args(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof check_schema_agreement_args)
+ return this.equals((check_schema_agreement_args)that);
+ return false;
+ }
+
+ public boolean equals(check_schema_agreement_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(check_schema_agreement_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ check_schema_agreement_args typedOther =
(check_schema_agreement_args)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("check_schema_agreement_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class check_schema_agreement_result implements
TBase<check_schema_agreement_result._Fields>, java.io.Serializable, Cloneable
{
+ private static final TStruct STRUCT_DESC = new
TStruct("check_schema_agreement_result");
+
+ private static final TField SUCCESS_FIELD_DESC = new TField("success",
TType.MAP, (short)0);
+ private static final TField IRE_FIELD_DESC = new TField("ire",
TType.STRUCT, (short)1);
+
+ public Map<String,List<String>> success;
+ public InvalidRequestException ire;
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+ SUCCESS((short)0, "success"),
+ IRE((short)1, "ire");
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ put(_Fields.SUCCESS, new FieldMetaData("success",
TFieldRequirementType.DEFAULT,
+ new MapMetaData(TType.MAP,
+ new FieldValueMetaData(TType.STRING),
+ new ListMetaData(TType.LIST,
+ new FieldValueMetaData(TType.STRING)))));
+ put(_Fields.IRE, new FieldMetaData("ire", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRUCT)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(check_schema_agreement_result.class,
metaDataMap);
+ }
+
+ public check_schema_agreement_result() {
+ }
+
+ public check_schema_agreement_result(
+ Map<String,List<String>> success,
+ InvalidRequestException ire)
+ {
+ this();
+ this.success = success;
+ this.ire = ire;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public check_schema_agreement_result(check_schema_agreement_result other) {
+ if (other.isSetSuccess()) {
+ Map<String,List<String>> __this__success = new
HashMap<String,List<String>>();
+ for (Map.Entry<String, List<String>> other_element :
other.success.entrySet()) {
+
+ String other_element_key = other_element.getKey();
+ List<String> other_element_value = other_element.getValue();
+
+ String __this__success_copy_key = other_element_key;
+
+ List<String> __this__success_copy_value = new ArrayList<String>();
+ for (String other_element_value_element : other_element_value) {
+ __this__success_copy_value.add(other_element_value_element);
+ }
+
+ __this__success.put(__this__success_copy_key,
__this__success_copy_value);
+ }
+ this.success = __this__success;
+ }
+ if (other.isSetIre()) {
+ this.ire = new InvalidRequestException(other.ire);
+ }
+ }
+
+ public check_schema_agreement_result deepCopy() {
+ return new check_schema_agreement_result(this);
+ }
+
+ @Deprecated
+ public check_schema_agreement_result clone() {
+ return new check_schema_agreement_result(this);
+ }
+
+ public int getSuccessSize() {
+ return (this.success == null) ? 0 : this.success.size();
+ }
+
+ public void putToSuccess(String key, List<String> val) {
+ if (this.success == null) {
+ this.success = new HashMap<String,List<String>>();
+ }
+ this.success.put(key, val);
+ }
+
+ public Map<String,List<String>> getSuccess() {
+ return this.success;
+ }
+
+ public check_schema_agreement_result setSuccess(Map<String,List<String>>
success) {
+ this.success = success;
+ return this;
+ }
+
+ public void unsetSuccess() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been asigned a value) and
false otherwise */
+ public boolean isSetSuccess() {
+ return this.success != null;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public InvalidRequestException getIre() {
+ return this.ire;
+ }
+
+ public check_schema_agreement_result setIre(InvalidRequestException ire) {
+ this.ire = ire;
+ return this;
+ }
+
+ public void unsetIre() {
+ this.ire = null;
+ }
+
+ /** Returns true if field ire is set (has been asigned a value) and false
otherwise */
+ public boolean isSetIre() {
+ return this.ire != null;
+ }
+
+ public void setIreIsSet(boolean value) {
+ if (!value) {
+ this.ire = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unsetSuccess();
+ } else {
+ setSuccess((Map<String,List<String>>)value);
+ }
+ break;
+
+ case IRE:
+ if (value == null) {
+ unsetIre();
+ } else {
+ setIre((InvalidRequestException)value);
+ }
+ break;
+
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return getSuccess();
+
+ case IRE:
+ return getIre();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return isSetSuccess();
+ case IRE:
+ return isSetIre();
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof check_schema_agreement_result)
+ return this.equals((check_schema_agreement_result)that);
+ return false;
+ }
+
+ public boolean equals(check_schema_agreement_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ boolean this_present_ire = true && this.isSetIre();
+ boolean that_present_ire = true && that.isSetIre();
+ if (this_present_ire || that_present_ire) {
+ if (!(this_present_ire && that_present_ire))
+ return false;
+ if (!this.ire.equals(that.ire))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 0: // SUCCESS
+ if (field.type == TType.MAP) {
+ {
+ TMap _map73 = iprot.readMapBegin();
+ this.success = new HashMap<String,List<String>>(2*_map73.size);
+ for (int _i74 = 0; _i74 < _map73.size; ++_i74)
+ {
+ String _key75;
+ List<String> _val76;
+ _key75 = iprot.readString();
+ {
+ TList _list77 = iprot.readListBegin();
+ _val76 = new ArrayList<String>(_list77.size);
+ for (int _i78 = 0; _i78 < _list77.size; ++_i78)
+ {
+ String _elem79;
+ _elem79 = iprot.readString();
+ _val76.add(_elem79);
+ }
+ iprot.readListEnd();
+ }
+ this.success.put(_key75, _val76);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 1: // IRE
+ if (field.type == TType.STRUCT) {
+ this.ire = new InvalidRequestException();
+ this.ire.read(iprot);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.isSetSuccess()) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.LIST,
this.success.size()));
+ for (Map.Entry<String, List<String>> _iter80 :
this.success.entrySet())
+ {
+ oprot.writeString(_iter80.getKey());
+ {
+ oprot.writeListBegin(new TList(TType.STRING,
_iter80.getValue().size()));
+ for (String _iter81 : _iter80.getValue())
+ {
+ oprot.writeString(_iter81);
+ }
+ oprot.writeListEnd();
+ }
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ } else if (this.isSetIre()) {
+ oprot.writeFieldBegin(IRE_FIELD_DESC);
+ this.ire.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("check_schema_agreement_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("ire:");
+ if (this.ire == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.ire);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
public static class describe_keyspaces_args implements
TBase<describe_keyspaces_args._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_keyspaces_args> {
private static final TStruct STRUCT_DESC = new
TStruct("describe_keyspaces_args");
@@ -14420,13 +15093,13 @@ public class Cassandra {
case 0: // SUCCESS
if (field.type == TType.SET) {
{
- TSet _set73 = iprot.readSetBegin();
- this.success = new HashSet<String>(2*_set73.size);
- for (int _i74 = 0; _i74 < _set73.size; ++_i74)
+ TSet _set82 = iprot.readSetBegin();
+ this.success = new HashSet<String>(2*_set82.size);
+ for (int _i83 = 0; _i83 < _set82.size; ++_i83)
{
- String _elem75;
- _elem75 = iprot.readString();
- this.success.add(_elem75);
+ String _elem84;
+ _elem84 = iprot.readString();
+ this.success.add(_elem84);
}
iprot.readSetEnd();
}
@@ -14452,9 +15125,9 @@ public class Cassandra {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeSetBegin(new TSet(TType.STRING, this.success.size()));
- for (String _iter76 : this.success)
+ for (String _iter85 : this.success)
{
- oprot.writeString(_iter76);
+ oprot.writeString(_iter85);
}
oprot.writeSetEnd();
}
@@ -15961,14 +16634,14 @@ public class Cassandra {
case 0: // SUCCESS
if (field.type == TType.LIST) {
{
- TList _list77 = iprot.readListBegin();
- this.success = new ArrayList<TokenRange>(_list77.size);
- for (int _i78 = 0; _i78 < _list77.size; ++_i78)
+ TList _list86 = iprot.readListBegin();
+ this.success = new ArrayList<TokenRange>(_list86.size);
+ for (int _i87 = 0; _i87 < _list86.size; ++_i87)
{
- TokenRange _elem79;
- _elem79 = new TokenRange();
- _elem79.read(iprot);
- this.success.add(_elem79);
+ TokenRange _elem88;
+ _elem88 = new TokenRange();
+ _elem88.read(iprot);
+ this.success.add(_elem88);
}
iprot.readListEnd();
}
@@ -15994,9 +16667,9 @@ public class Cassandra {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeListBegin(new TList(TType.STRUCT, this.success.size()));
- for (TokenRange _iter80 : this.success)
+ for (TokenRange _iter89 : this.success)
{
- _iter80.write(oprot);
+ _iter89.write(oprot);
}
oprot.writeListEnd();
}
@@ -16617,27 +17290,27 @@ public class Cassandra {
case 0: // SUCCESS
if (field.type == TType.MAP) {
{
- TMap _map81 = iprot.readMapBegin();
- this.success = new
HashMap<String,Map<String,String>>(2*_map81.size);
- for (int _i82 = 0; _i82 < _map81.size; ++_i82)
+ TMap _map90 = iprot.readMapBegin();
+ this.success = new
HashMap<String,Map<String,String>>(2*_map90.size);
+ for (int _i91 = 0; _i91 < _map90.size; ++_i91)
{
- String _key83;
- Map<String,String> _val84;
- _key83 = iprot.readString();
+ String _key92;
+ Map<String,String> _val93;
+ _key92 = iprot.readString();
{
- TMap _map85 = iprot.readMapBegin();
- _val84 = new HashMap<String,String>(2*_map85.size);
- for (int _i86 = 0; _i86 < _map85.size; ++_i86)
+ TMap _map94 = iprot.readMapBegin();
+ _val93 = new HashMap<String,String>(2*_map94.size);
+ for (int _i95 = 0; _i95 < _map94.size; ++_i95)
{
- String _key87;
- String _val88;
- _key87 = iprot.readString();
- _val88 = iprot.readString();
- _val84.put(_key87, _val88);
+ String _key96;
+ String _val97;
+ _key96 = iprot.readString();
+ _val97 = iprot.readString();
+ _val93.put(_key96, _val97);
}
iprot.readMapEnd();
}
- this.success.put(_key83, _val84);
+ this.success.put(_key92, _val93);
}
iprot.readMapEnd();
}
@@ -16671,15 +17344,15 @@ public class Cassandra {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeMapBegin(new TMap(TType.STRING, TType.MAP,
this.success.size()));
- for (Map.Entry<String, Map<String,String>> _iter89 :
this.success.entrySet())
+ for (Map.Entry<String, Map<String,String>> _iter98 :
this.success.entrySet())
{
- oprot.writeString(_iter89.getKey());
+ oprot.writeString(_iter98.getKey());
{
- oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING,
_iter89.getValue().size()));
- for (Map.Entry<String, String> _iter90 :
_iter89.getValue().entrySet())
+ oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING,
_iter98.getValue().size()));
+ for (Map.Entry<String, String> _iter99 :
_iter98.getValue().entrySet())
{
- oprot.writeString(_iter90.getKey());
- oprot.writeString(_iter90.getValue());
+ oprot.writeString(_iter99.getKey());
+ oprot.writeString(_iter99.getValue());
}
oprot.writeMapEnd();
}
@@ -17435,13 +18108,13 @@ public class Cassandra {
case 0: // SUCCESS
if (field.type == TType.LIST) {
{
- TList _list91 = iprot.readListBegin();
- this.success = new ArrayList<String>(_list91.size);
- for (int _i92 = 0; _i92 < _list91.size; ++_i92)
+ TList _list100 = iprot.readListBegin();
+ this.success = new ArrayList<String>(_list100.size);
+ for (int _i101 = 0; _i101 < _list100.size; ++_i101)
{
- String _elem93;
- _elem93 = iprot.readString();
- this.success.add(_elem93);
+ String _elem102;
+ _elem102 = iprot.readString();
+ this.success.add(_elem102);
}
iprot.readListEnd();
}
@@ -17467,9 +18140,9 @@ public class Cassandra {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
- for (String _iter94 : this.success)
+ for (String _iter103 : this.success)
{
- oprot.writeString(_iter94);
+ oprot.writeString(_iter103);
}
oprot.writeListEnd();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu
May 20 17:13:17 2010
@@ -51,6 +51,8 @@ import org.apache.cassandra.service.Stor
import static org.apache.cassandra.utils.FBUtilities.UTF8;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.*;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.avro.AvroRecordFactory.*;
@@ -573,4 +575,10 @@ public class CassandraServer implements
{
return API_VERSION;
}
+
+ public Map<String, List<String>> check_schema_agreement()
+ {
+ logger.debug("checking schema agreement");
+ return StorageProxy.checkSchemaAgreement();
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu May 20 17:13:17 2010
@@ -84,7 +84,7 @@ public class DatabaseDescriptor
private final static String STORAGE_CONF_FILE = "cassandra.yaml";
- private static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type
nibble set to 1, everything else to zero.
+ public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type
nibble set to 1, everything else to zero.
private static UUID defsVersion = INITIAL_VERSION;
/**
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=946717&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Thu May 20 17:13:17 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class SchemaCheckVerbHandler implements IVerbHandler
+{
+ private final Logger logger =
LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
+
+ @Override
+ public void doVerb(Message message)
+ {
+ logger.debug("Received schema check request.");
+ Message response = message.getReply(FBUtilities.getLocalAddress(),
DatabaseDescriptor.getDefsVersion().toString().getBytes());
+ MessagingService.instance.sendOneWay(response, message.getFrom());
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu
May 20 17:13:17 2010
@@ -26,12 +26,17 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -55,6 +60,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -565,6 +571,76 @@ public class StorageProxy implements Sto
}
/**
+ * initiate a request/response session with each live node to check
whether or not everybody is using the same
+ * migration id. This is useful for determining if a schema change has
propagated through the cluster. Disagreement
+ * is assumed if any node fails to respond.
+ */
+ public static Map<String, List<String>> checkSchemaAgreement()
+ {
+ final Map<String, List<String>> results = new HashMap<String,
List<String>>();
+
+ final String myVersion =
DatabaseDescriptor.getDefsVersion().toString();
+ final Map<InetAddress, UUID> versions = new
ConcurrentHashMap<InetAddress, UUID>();
+ final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
+ final Message msg = new Message(FBUtilities.getLocalAddress(),
StageManager.MIGRATION_STAGE, StorageService.Verb.SCHEMA_CHECK,
ArrayUtils.EMPTY_BYTE_ARRAY);
+ final CountDownLatch latch = new CountDownLatch(liveHosts.size());
+ // an empty message acts as a request to the SchemaCheckVerbHandler.
+ MessagingService.instance.sendRR(msg, liveHosts.toArray(new
InetAddress[]{}), new IAsyncCallback()
+ {
+ @Override
+ public void response(Message msg)
+ {
+ // record the response from the remote node.
+ logger.debug("Received schema check response from " +
msg.getFrom().getHostAddress());
+ UUID theirVersion = UUID.fromString(new
String(msg.getMessageBody()));
+ versions.put(msg.getFrom(), theirVersion);
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ // wait for as long as possible. timeout-1s if possible.
+ latch.await(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError("This latch shouldn't have been
interrupted.");
+ }
+
+ logger.debug("My version is " + myVersion);
+
+ // first, indicate any hosts that did not respond.
+ final Set<InetAddress> ackedHosts = versions.keySet();
+ if (ackedHosts.size() < liveHosts.size())
+ {
+ Set<InetAddress> missingHosts = new
HashSet<InetAddress>(liveHosts);
+ missingHosts.removeAll(ackedHosts);
+ assert missingHosts.size() > 0;
+ List<String> missingHostNames = new
ArrayList<String>(missingHosts.size());
+ for (InetAddress host : missingHosts)
+ missingHostNames.add(host.getHostAddress());
+ results.put(DatabaseDescriptor.INITIAL_VERSION.toString(),
missingHostNames);
+ logger.debug("Hosts not in agreement. Didn't get a response from
everybody: " + StringUtils.join(missingHostNames, ","));
+ }
+
+ // check for version disagreement. log the hosts that don't agree.
+ for (InetAddress host : ackedHosts)
+ {
+ String uuid = versions.get(host).toString();
+ if (!results.containsKey(uuid))
+ results.put(uuid, new ArrayList<String>());
+ results.get(uuid).add(host.getHostAddress());
+ if (!uuid.equals(myVersion))
+ logger.debug("%s disagrees (%s)", host.getHostAddress(), uuid);
+ }
+ if (results.size() == 1)
+ logger.debug("Schemas are in agreement.");
+
+ return results;
+ }
+
+ /**
* returns an iterator that will return ranges in ring order, starting
with the one that contains the start token
*/
private static Iterable<Pair<AbstractBounds, List<InetAddress>>>
getRangeIterator(final List<Pair<AbstractBounds, List<InetAddress>>> ranges,
Token start)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Thu May 20 17:13:17 2010
@@ -111,6 +111,7 @@ public class StorageService implements I
DEFINITIONS_ANNOUNCE,
DEFINITIONS_UPDATE_RESPONSE,
TRUNCATE,
+ SCHEMA_CHECK,
;
// remember to add new verbs at the end, since we serialize by ordinal
}
@@ -236,6 +237,7 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new
DefinitionsAnnounceVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
new DefinitionsUpdateResponseVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new
TruncateVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new
SchemaCheckVerbHandler());
replicationStrategies = new HashMap<String,
AbstractReplicationStrategy>();
for (String table : DatabaseDescriptor.getNonSystemTables())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=946717&r1=946716&r2=946717&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Thu May 20 17:13:17 2010
@@ -856,5 +856,12 @@ public class CassandraServer implements
keySpace.set(keyspace);
}
+ @Override
+ public Map<String, List<String>> check_schema_agreement() throws
TException, InvalidRequestException
+ {
+ logger.debug("checking schema agreement");
+ return StorageProxy.checkSchemaAgreement();
+ }
+
// main method moved to CassandraDaemon
}