Author: jbellis
Date: Fri Feb 12 22:02:44 2010
New Revision: 909631
URL: http://svn.apache.org/viewvc?rev=909631&view=rev
Log:
sub splits
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342
Modified:
incubator/cassandra/trunk/interface/cassandra.thrift
incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Fri Feb 12 22:02:44
2010
@@ -434,6 +434,7 @@
to list of endpoints, because you can't use Thrift structs as
map keys:
https://issues.apache.org/jira/browse/THRIFT-162
+
for the same reason, we can't return a set here, even though
order is neither important nor predictable. */
list<TokenRange> describe_ring(1:required string keyspace),
@@ -441,4 +442,13 @@
/** describe specified keyspace */
map<string, map<string, string>> describe_keyspace(1:required string
keyspace)
throws (1:NotFoundException nfe),
+
+ /** experimental API for hadoop/parallel query support.
+ may change violently and without warning.
+
+ returns list of token strings such that first subrange is (list[0],
list[1]],
+ next is (list[1], list[2]], etc. */
+ list<string> describe_splits(1:required string start_token,
+ 2:required string end_token,
+ 3:required i32 keys_per_split),
}
Modified:
incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(original)
+++
incubator/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
Fri Feb 12 22:02:44 2010
@@ -187,6 +187,7 @@
* to list of endpoints, because you can't use Thrift structs as
* map keys:
* https://issues.apache.org/jira/browse/THRIFT-162
+ *
* for the same reason, we can't return a set here, even though
* order is neither important nor predictable.
*
@@ -201,6 +202,19 @@
*/
public Map<String,Map<String,String>> describe_keyspace(String keyspace)
throws NotFoundException, TException;
+ /**
+ * experimental API for hadoop/parallel query support.
+ * may change violently and without warning.
+ *
+ * returns list of token strings such that first subrange is (list[0],
list[1]],
+ * next is (list[1], list[2]], etc.
+ *
+ * @param start_token
+ * @param end_token
+ * @param keys_per_split
+ */
+ public List<String> describe_splits(String start_token, String end_token,
int keys_per_split) throws TException;
+
}
public static class Client implements Iface {
@@ -992,6 +1006,41 @@
throw new TApplicationException(TApplicationException.MISSING_RESULT,
"describe_keyspace failed: unknown result");
}
+ public List<String> describe_splits(String start_token, String end_token,
int keys_per_split) throws TException
+ {
+ send_describe_splits(start_token, end_token, keys_per_split);
+ return recv_describe_splits();
+ }
+
+ public void send_describe_splits(String start_token, String end_token, int
keys_per_split) throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("describe_splits",
TMessageType.CALL, seqid_));
+ describe_splits_args args = new describe_splits_args();
+ args.start_token = start_token;
+ args.end_token = end_token;
+ args.keys_per_split = keys_per_split;
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public List<String> recv_describe_splits() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ describe_splits_result result = new describe_splits_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ throw new TApplicationException(TApplicationException.MISSING_RESULT,
"describe_splits failed: unknown result");
+ }
+
}
public static class Processor implements TProcessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(Processor.class.getName());
@@ -1017,6 +1066,7 @@
processMap_.put("describe_version", new describe_version());
processMap_.put("describe_ring", new describe_ring());
processMap_.put("describe_keyspace", new describe_keyspace());
+ processMap_.put("describe_splits", new describe_splits());
}
protected static interface ProcessFunction {
@@ -1553,6 +1603,22 @@
}
+ private class describe_splits implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws
TException
+ {
+ describe_splits_args args = new describe_splits_args();
+ args.read(iprot);
+ iprot.readMessageEnd();
+ describe_splits_result result = new describe_splits_result();
+ result.success = iface_.describe_splits(args.start_token,
args.end_token, args.keys_per_split);
+ oprot.writeMessageBegin(new TMessage("describe_splits",
TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
}
public static class login_args implements TBase<login_args._Fields>,
java.io.Serializable, Cloneable {
@@ -19052,4 +19118,781 @@
}
+ public static class describe_splits_args implements
TBase<describe_splits_args._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_splits_args> {
+ private static final TStruct STRUCT_DESC = new
TStruct("describe_splits_args");
+
+ private static final TField START_TOKEN_FIELD_DESC = new
TField("start_token", TType.STRING, (short)1);
+ private static final TField END_TOKEN_FIELD_DESC = new TField("end_token",
TType.STRING, (short)2);
+ private static final TField KEYS_PER_SPLIT_FIELD_DESC = new
TField("keys_per_split", TType.I32, (short)3);
+
+ public String start_token;
+ public String end_token;
+ public int keys_per_split;
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+ START_TOKEN((short)1, "start_token"),
+ END_TOKEN((short)2, "end_token"),
+ KEYS_PER_SPLIT((short)3, "keys_per_split");
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __KEYS_PER_SPLIT_ISSET_ID = 0;
+ private BitSet __isset_bit_vector = new BitSet(1);
+
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ put(_Fields.START_TOKEN, new FieldMetaData("start_token",
TFieldRequirementType.REQUIRED,
+ new FieldValueMetaData(TType.STRING)));
+ put(_Fields.END_TOKEN, new FieldMetaData("end_token",
TFieldRequirementType.REQUIRED,
+ new FieldValueMetaData(TType.STRING)));
+ put(_Fields.KEYS_PER_SPLIT, new FieldMetaData("keys_per_split",
TFieldRequirementType.REQUIRED,
+ new FieldValueMetaData(TType.I32)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(describe_splits_args.class,
metaDataMap);
+ }
+
+ public describe_splits_args() {
+ }
+
+ public describe_splits_args(
+ String start_token,
+ String end_token,
+ int keys_per_split)
+ {
+ this();
+ this.start_token = start_token;
+ this.end_token = end_token;
+ this.keys_per_split = keys_per_split;
+ setKeys_per_splitIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public describe_splits_args(describe_splits_args other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ if (other.isSetStart_token()) {
+ this.start_token = other.start_token;
+ }
+ if (other.isSetEnd_token()) {
+ this.end_token = other.end_token;
+ }
+ this.keys_per_split = other.keys_per_split;
+ }
+
+ public describe_splits_args deepCopy() {
+ return new describe_splits_args(this);
+ }
+
+ @Deprecated
+ public describe_splits_args clone() {
+ return new describe_splits_args(this);
+ }
+
+ public String getStart_token() {
+ return this.start_token;
+ }
+
+ public describe_splits_args setStart_token(String start_token) {
+ this.start_token = start_token;
+ return this;
+ }
+
+ public void unsetStart_token() {
+ this.start_token = null;
+ }
+
+ /** Returns true if field start_token is set (has been asigned a value)
and false otherwise */
+ public boolean isSetStart_token() {
+ return this.start_token != null;
+ }
+
+ public void setStart_tokenIsSet(boolean value) {
+ if (!value) {
+ this.start_token = null;
+ }
+ }
+
+ public String getEnd_token() {
+ return this.end_token;
+ }
+
+ public describe_splits_args setEnd_token(String end_token) {
+ this.end_token = end_token;
+ return this;
+ }
+
+ public void unsetEnd_token() {
+ this.end_token = null;
+ }
+
+ /** Returns true if field end_token is set (has been asigned a value) and
false otherwise */
+ public boolean isSetEnd_token() {
+ return this.end_token != null;
+ }
+
+ public void setEnd_tokenIsSet(boolean value) {
+ if (!value) {
+ this.end_token = null;
+ }
+ }
+
+ public int getKeys_per_split() {
+ return this.keys_per_split;
+ }
+
+ public describe_splits_args setKeys_per_split(int keys_per_split) {
+ this.keys_per_split = keys_per_split;
+ setKeys_per_splitIsSet(true);
+ return this;
+ }
+
+ public void unsetKeys_per_split() {
+ __isset_bit_vector.clear(__KEYS_PER_SPLIT_ISSET_ID);
+ }
+
+ /** Returns true if field keys_per_split is set (has been asigned a value)
and false otherwise */
+ public boolean isSetKeys_per_split() {
+ return __isset_bit_vector.get(__KEYS_PER_SPLIT_ISSET_ID);
+ }
+
+ public void setKeys_per_splitIsSet(boolean value) {
+ __isset_bit_vector.set(__KEYS_PER_SPLIT_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case START_TOKEN:
+ if (value == null) {
+ unsetStart_token();
+ } else {
+ setStart_token((String)value);
+ }
+ break;
+
+ case END_TOKEN:
+ if (value == null) {
+ unsetEnd_token();
+ } else {
+ setEnd_token((String)value);
+ }
+ break;
+
+ case KEYS_PER_SPLIT:
+ if (value == null) {
+ unsetKeys_per_split();
+ } else {
+ setKeys_per_split((Integer)value);
+ }
+ break;
+
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case START_TOKEN:
+ return getStart_token();
+
+ case END_TOKEN:
+ return getEnd_token();
+
+ case KEYS_PER_SPLIT:
+ return new Integer(getKeys_per_split());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ case START_TOKEN:
+ return isSetStart_token();
+ case END_TOKEN:
+ return isSetEnd_token();
+ case KEYS_PER_SPLIT:
+ return isSetKeys_per_split();
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof describe_splits_args)
+ return this.equals((describe_splits_args)that);
+ return false;
+ }
+
+ public boolean equals(describe_splits_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_start_token = true && this.isSetStart_token();
+ boolean that_present_start_token = true && that.isSetStart_token();
+ if (this_present_start_token || that_present_start_token) {
+ if (!(this_present_start_token && that_present_start_token))
+ return false;
+ if (!this.start_token.equals(that.start_token))
+ return false;
+ }
+
+ boolean this_present_end_token = true && this.isSetEnd_token();
+ boolean that_present_end_token = true && that.isSetEnd_token();
+ if (this_present_end_token || that_present_end_token) {
+ if (!(this_present_end_token && that_present_end_token))
+ return false;
+ if (!this.end_token.equals(that.end_token))
+ return false;
+ }
+
+ boolean this_present_keys_per_split = true;
+ boolean that_present_keys_per_split = true;
+ if (this_present_keys_per_split || that_present_keys_per_split) {
+ if (!(this_present_keys_per_split && that_present_keys_per_split))
+ return false;
+ if (this.keys_per_split != that.keys_per_split)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(describe_splits_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ describe_splits_args typedOther = (describe_splits_args)other;
+
+ lastComparison =
Boolean.valueOf(isSetStart_token()).compareTo(isSetStart_token());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison = TBaseHelper.compareTo(start_token,
typedOther.start_token);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison =
Boolean.valueOf(isSetEnd_token()).compareTo(isSetEnd_token());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison = TBaseHelper.compareTo(end_token, typedOther.end_token);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison =
Boolean.valueOf(isSetKeys_per_split()).compareTo(isSetKeys_per_split());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison = TBaseHelper.compareTo(keys_per_split,
typedOther.keys_per_split);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ _Fields fieldId = _Fields.findByThriftId(field.id);
+ if (fieldId == null) {
+ TProtocolUtil.skip(iprot, field.type);
+ } else {
+ switch (fieldId) {
+ case START_TOKEN:
+ if (field.type == TType.STRING) {
+ this.start_token = iprot.readString();
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case END_TOKEN:
+ if (field.type == TType.STRING) {
+ this.end_token = iprot.readString();
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case KEYS_PER_SPLIT:
+ if (field.type == TType.I32) {
+ this.keys_per_split = iprot.readI32();
+ setKeys_per_splitIsSet(true);
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ if (!isSetKeys_per_split()) {
+ throw new TProtocolException("Required field 'keys_per_split' was not
found in serialized data! Struct: " + toString());
+ }
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.start_token != null) {
+ oprot.writeFieldBegin(START_TOKEN_FIELD_DESC);
+ oprot.writeString(this.start_token);
+ oprot.writeFieldEnd();
+ }
+ if (this.end_token != null) {
+ oprot.writeFieldBegin(END_TOKEN_FIELD_DESC);
+ oprot.writeString(this.end_token);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(KEYS_PER_SPLIT_FIELD_DESC);
+ oprot.writeI32(this.keys_per_split);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("describe_splits_args(");
+ boolean first = true;
+
+ sb.append("start_token:");
+ if (this.start_token == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.start_token);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("end_token:");
+ if (this.end_token == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.end_token);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("keys_per_split:");
+ sb.append(this.keys_per_split);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ if (start_token == null) {
+ throw new TProtocolException("Required field 'start_token' was not
present! Struct: " + toString());
+ }
+ if (end_token == null) {
+ throw new TProtocolException("Required field 'end_token' was not
present! Struct: " + toString());
+ }
+ // alas, we cannot check 'keys_per_split' because it's a primitive and
you chose the non-beans generator.
+ }
+
+ }
+
+ public static class describe_splits_result implements
TBase<describe_splits_result._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_splits_result> {
+ private static final TStruct STRUCT_DESC = new
TStruct("describe_splits_result");
+
+ private static final TField SUCCESS_FIELD_DESC = new TField("success",
TType.LIST, (short)0);
+
+ public List<String> success;
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+ SUCCESS((short)0, "success");
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ put(_Fields.SUCCESS, new FieldMetaData("success",
TFieldRequirementType.DEFAULT,
+ new ListMetaData(TType.LIST,
+ new FieldValueMetaData(TType.STRING))));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(describe_splits_result.class,
metaDataMap);
+ }
+
+ public describe_splits_result() {
+ }
+
+ public describe_splits_result(
+ List<String> success)
+ {
+ this();
+ this.success = success;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public describe_splits_result(describe_splits_result other) {
+ if (other.isSetSuccess()) {
+ List<String> __this__success = new ArrayList<String>();
+ for (String other_element : other.success) {
+ __this__success.add(other_element);
+ }
+ this.success = __this__success;
+ }
+ }
+
+ public describe_splits_result deepCopy() {
+ return new describe_splits_result(this);
+ }
+
+ @Deprecated
+ public describe_splits_result clone() {
+ return new describe_splits_result(this);
+ }
+
+ public int getSuccessSize() {
+ return (this.success == null) ? 0 : this.success.size();
+ }
+
+ public java.util.Iterator<String> getSuccessIterator() {
+ return (this.success == null) ? null : this.success.iterator();
+ }
+
+ public void addToSuccess(String elem) {
+ if (this.success == null) {
+ this.success = new ArrayList<String>();
+ }
+ this.success.add(elem);
+ }
+
+ public List<String> getSuccess() {
+ return this.success;
+ }
+
+ public describe_splits_result setSuccess(List<String> success) {
+ this.success = success;
+ return this;
+ }
+
+ public void unsetSuccess() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been asigned a value) and
false otherwise */
+ public boolean isSetSuccess() {
+ return this.success != null;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unsetSuccess();
+ } else {
+ setSuccess((List<String>)value);
+ }
+ break;
+
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return getSuccess();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return isSetSuccess();
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof describe_splits_result)
+ return this.equals((describe_splits_result)that);
+ return false;
+ }
+
+ public boolean equals(describe_splits_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(describe_splits_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ describe_splits_result typedOther = (describe_splits_result)other;
+
+ lastComparison =
Boolean.valueOf(isSetSuccess()).compareTo(isSetSuccess());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ lastComparison = TBaseHelper.compareTo(success, typedOther.success);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ _Fields fieldId = _Fields.findByThriftId(field.id);
+ if (fieldId == null) {
+ TProtocolUtil.skip(iprot, field.type);
+ } else {
+ switch (fieldId) {
+ case SUCCESS:
+ if (field.type == TType.LIST) {
+ {
+ TList _list100 = iprot.readListBegin();
+ this.success = new ArrayList<String>(_list100.size);
+ for (int _i101 = 0; _i101 < _list100.size; ++_i101)
+ {
+ String _elem102;
+ _elem102 = iprot.readString();
+ this.success.add(_elem102);
+ }
+ iprot.readListEnd();
+ }
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.isSetSuccess()) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new TList(TType.STRING, this.success.size()));
+ for (String _iter103 : this.success)
+ {
+ oprot.writeString(_iter103);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("describe_splits_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Fri Feb 12 22:02:44 2010
@@ -205,12 +205,11 @@
public void doVerb(Message message)
{
StorageService ss = StorageService.instance;
- List<String> tokens = ss.getSplits(2);
- assert tokens.size() == 3 : tokens.size();
+ String tokenString = ss.getBootstrapToken().toString();
Message response;
try
{
- response = message.getReply(FBUtilities.getLocalAddress(),
tokens.get(1).getBytes("UTF-8"));
+ response = message.getReply(FBUtilities.getLocalAddress(),
tokenString.getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Feb 12 22:02:44 2010
@@ -1,16 +1,22 @@
package org.apache.cassandra.hadoop;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
+import java.net.InetAddress;
+import java.util.*;
import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TDeserializer;
@@ -113,16 +119,63 @@
predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
validateConfiguration();
- List<TokenRange> map = getRangeMap();
+ // cannonical ranges and nodes holding replicas
+ List<TokenRange> masterRangeNodes = getRangeMap();
+
+ // cannonical ranges, split into pieces:
+ // for each range, pick a live owner and ask it to compute bite-sized
splits
+ // TODO parallelize this thread-per-range
+ Map<TokenRange, List<String>> splitRanges = new HashMap<TokenRange,
List<String>>();
+ for (TokenRange range : masterRangeNodes)
+ {
+ splitRanges.put(range, getSubSplits(range));
+ }
+
+ // turn the sub-ranges into InputSplits
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
- for (TokenRange entry : map)
+ for (Map.Entry<TokenRange, List<String>> entry :
splitRanges.entrySet())
{
- if (logger.isDebugEnabled())
- logger.debug("split range is [" + entry.start_token + ", " +
entry.end_token + "]");
- String[] endpoints = entry.endpoints.toArray(new String[0]);
- splits.add(new ColumnFamilySplit(keyspace, columnFamily,
predicate, entry.start_token, entry.end_token, endpoints));
+ TokenRange range = entry.getKey();
+ List<String> tokens = entry.getValue();
+ String[] endpoints = range.endpoints.toArray(new
String[range.endpoints.size()]);
+
+ int i = 1;
+ for ( ; i < tokens.size(); i++)
+ {
+ ColumnFamilySplit split = new ColumnFamilySplit(keyspace,
columnFamily, predicate, tokens.get(i - 1), tokens.get(i), endpoints);
+ logger.info("adding " + split);
+ splits.add(split);
+ }
}
+ assert splits.size() > 0;
+
+ return splits;
+ }
+ private List<String> getSubSplits(TokenRange range) throws IOException
+ {
+ // TODO handle failure of range replicas & retry
+ TSocket socket = new TSocket(range.endpoints.get(0),
+ DatabaseDescriptor.getThriftPort());
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false,
false);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ try
+ {
+ socket.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new IOException(e);
+ }
+ List<String> splits;
+ try
+ {
+ splits = client.describe_splits(range.start_token,
range.end_token, 128); // TODO make split size configurable
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
return splits;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Feb 12 22:02:44 2010
@@ -3,6 +3,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -111,7 +112,20 @@
dataNodes[i] = in.readUTF();
}
}
-
+
+ @Override
+ public String toString()
+ {
+ return "ColumnFamilySplit{" +
+ "startToken='" + startToken + '\'' +
+ ", endToken='" + endToken + '\'' +
+ ", table='" + table + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ ", dataNodes=" + (dataNodes == null ? null :
Arrays.asList(dataNodes)) +
+ ", predicate=" + predicate +
+ '}';
+ }
+
public static ColumnFamilySplit read(DataInput in) throws IOException
{
ColumnFamilySplit w = new ColumnFamilySplit();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Fri Feb 12 22:02:44 2010
@@ -40,6 +40,7 @@
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -102,6 +103,7 @@
GOSSIP_DIGEST_SYN,
GOSSIP_DIGEST_ACK,
GOSSIP_DIGEST_ACK2,
+ ;
// remember to add new verbs at the end, since we serialize by ordinal
}
public static final Verb[] VERBS = Verb.values();
@@ -1219,20 +1221,13 @@
}
/**
- * @param splits: number of ranges to break into. Minimum 2.
- * @return list of Tokens (_not_ keys!) breaking up the data this node is
responsible for into `splits` pieces.
- * There will be 1 more token than splits requested. So for splits of 2,
tokens T1 T2 T3 will be returned,
- * where (T1, T2] is the first range and (T2, T3] is the second. The
first token will always be the left
- * Token of this node's primary range, and the last will always be the
Right token of that range.
- */
- public List<String> getSplits(int splits)
+ * @return list of Tokens (_not_ keys!) breaking up the data this node is
responsible for into pieces of roughly keysPerSplit
+ */
+ public List<Token> getSplits(Range range, int keysPerSplit)
{
- assert splits > 1;
+ List<Token> tokens = new ArrayList<Token>();
// we use the actual Range token for the first and last brackets of
the splits to ensure correctness
- // (we're only operating on 1/128 of the keys remember)
- Range range = getLocalPrimaryRange();
- List<String> tokens = new ArrayList<String>();
- tokens.add(range.left.toString());
+ tokens.add(range.left);
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
@@ -1243,28 +1238,41 @@
keys.add(info.key);
}
}
- Collections.sort(keys);
+ FBUtilities.sortSampledKeys(keys, range);
+ int splits = keys.size() * SSTableReader.indexInterval() /
keysPerSplit;
- if (keys.size() < splits)
+ if (keys.size() >= splits)
{
- // not enough keys to generate good splits -- generate random ones
instead
- // (since this only happens when we don't have many keys, it
doesn't really matter that the splits are poor)
for (int i = 1; i < splits; i++)
{
- tokens.add(partitioner_.getRandomToken().toString());
+ int index = i * (keys.size() / splits);
+ tokens.add(keys.get(index).token);
}
}
- else
+
+ tokens.add(range.right);
+ return tokens;
+ }
+
+ /** return a token to which if a node bootstraps it will get about 1/2 of
this node's range */
+ public Token getBootstrapToken()
+ {
+ Range range = getLocalPrimaryRange();
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- for (int i = 1; i < splits; i++)
+ for (SSTable.KeyPosition info: cfs.allIndexPositions())
{
- int index = i * (keys.size() / splits);
- tokens.add(keys.get(index).token.toString());
+ if (range.contains(info.key.token))
+ keys.add(info.key);
}
}
+ FBUtilities.sortSampledKeys(keys, range);
- tokens.add(range.right.toString());
- return tokens;
+ if (keys.size() < 3)
+ return partitioner_.getRandomToken();
+ else
+ return keys.get(keys.size() / 2).token;
}
/**
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Fri Feb 12 22:02:44 2010
@@ -633,6 +633,18 @@
return ranges;
}
+ public List<String> describe_splits(String start_token, String end_token,
int keys_per_split) throws TException
+ {
+ Token.TokenFactory tf =
StorageService.getPartitioner().getTokenFactory();
+ List<Token> tokens = StorageService.instance.getSplits(new
Range(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split);
+ List<String> splits = new ArrayList<String>(tokens.size());
+ for (Token token : tokens)
+ {
+ splits.add(token.toString());
+ }
+ return splits;
+ }
+
public void login(String keyspace, AuthenticationRequest auth_request)
throws AuthenticationException, AuthorizationException, TException
{
DatabaseDescriptor.getAuthenticator().login(keyspace, auth_request);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=909631&r1=909630&r2=909631&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Fri Feb 12 22:02:44 2010
@@ -35,6 +35,9 @@
import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
@@ -384,4 +387,32 @@
throw new IOException(ex);
}
}
+
+ public static void sortSampledKeys(List<DecoratedKey> keys, Range range)
+ {
+ if (range.left.compareTo(range.right) >= 0)
+ {
+ // range wraps. have to be careful that we sort in the same order
as the range to find the right midpoint.
+ final Token right = range.right;
+ Comparator<DecoratedKey> comparator = new
Comparator<DecoratedKey>()
+ {
+ public int compare(DecoratedKey o1, DecoratedKey o2)
+ {
+ if ((right.compareTo(o1.token) < 0 &&
right.compareTo(o2.token) < 0)
+ || (right.compareTo(o1.token) > 0 &&
right.compareTo(o2.token) > 0))
+ {
+ // both tokens are on the same side of the wrap point
+ return o1.compareTo(o2);
+ }
+ return -o1.compareTo(o2);
+ }
+ };
+ Collections.sort(keys, comparator);
+ }
+ else
+ {
+ // unwrapped range (left < right). standard sort is all we need.
+ Collections.sort(keys);
+ }
+ }
}