Author: jbellis
Date: Thu Jul 15 03:29:50 2010
New Revision: 964293
URL: http://svn.apache.org/viewvc?rev=964293&view=rev
Log:
replace comparator, partitioner configuration variables with introspection of
Cassandra server. patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1047
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
cassandra/branches/cassandra-0.6/interface/cassandra.thrift
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jul 15 03:29:50 2010
@@ -5,6 +5,9 @@
(CASSANDRA-1232)
* extend option to lower compaction priority to hinted handoff
as well (CASSANDRA-1260)
+ * added describe_partitioner Thrift method (CASSANDRA-1047)
+ * Hadoop jobs no longer require the Cassandra storage-conf.xml
+ (CASSANDRA-1280, CASSANDRA-1047)
* log thread pool stats when GC is excessive (CASSANDRA-1275)
Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
(original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Thu
Jul 15 03:29:50 2010
@@ -129,7 +129,7 @@ public class WordCount extends Configure
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX +
i));
ConfigHelper.setThriftContact(conf, "localhost", 9160);
- ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE,
COLUMN_FAMILY, "BytesType", "RandomPartitioner");
+ ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE,
COLUMN_FAMILY);
SlicePredicate predicate = new
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/cassandra.thrift?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/interface/cassandra.thrift (original)
+++ cassandra/branches/cassandra-0.6/interface/cassandra.thrift Thu Jul 15
03:29:50 2010
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
# for every edit that doesn't result in a change to major/minor.
#
# See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "2.1.0"
+const string VERSION = "2.2.0"
#
# data structures
@@ -446,6 +446,9 @@ service Cassandra {
list<TokenRange> describe_ring(1:required string keyspace)
throws (1:InvalidRequestException ire),
+ /** returns the partitioner used by this cluster */
+ string describe_partitioner(),
+
/** describe specified keyspace */
map<string, map<string, string>> describe_keyspace(1:required string
keyspace)
throws (1:NotFoundException nfe),
Modified:
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(original)
+++
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
Thu Jul 15 03:29:50 2010
@@ -228,6 +228,11 @@ public class Cassandra {
public List<TokenRange> describe_ring(String keyspace) throws
InvalidRequestException, TException;
/**
+ * returns the partitioner used by this cluster
+ */
+ public String describe_partitioner() throws TException;
+
+ /**
* describe specified keyspace
*
* @param keyspace
@@ -1005,6 +1010,38 @@ public class Cassandra {
throw new TApplicationException(TApplicationException.MISSING_RESULT,
"describe_ring failed: unknown result");
}
+ public String describe_partitioner() throws TException
+ {
+ send_describe_partitioner();
+ return recv_describe_partitioner();
+ }
+
+ public void send_describe_partitioner() throws TException
+ {
+ oprot_.writeMessageBegin(new TMessage("describe_partitioner",
TMessageType.CALL, seqid_));
+ describe_partitioner_args args = new describe_partitioner_args();
+ args.write(oprot_);
+ oprot_.writeMessageEnd();
+ oprot_.getTransport().flush();
+ }
+
+ public String recv_describe_partitioner() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ describe_partitioner_result result = new describe_partitioner_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ throw new TApplicationException(TApplicationException.MISSING_RESULT,
"describe_partitioner failed: unknown result");
+ }
+
public Map<String,Map<String,String>> describe_keyspace(String keyspace)
throws NotFoundException, TException
{
send_describe_keyspace(keyspace);
@@ -1100,6 +1137,7 @@ public class Cassandra {
processMap_.put("describe_cluster_name", new describe_cluster_name());
processMap_.put("describe_version", new describe_version());
processMap_.put("describe_ring", new describe_ring());
+ processMap_.put("describe_partitioner", new describe_partitioner());
processMap_.put("describe_keyspace", new describe_keyspace());
processMap_.put("describe_splits", new describe_splits());
}
@@ -1802,6 +1840,32 @@ public class Cassandra {
}
+ private class describe_partitioner implements ProcessFunction {
+ public void process(int seqid, TProtocol iprot, TProtocol oprot) throws
TException
+ {
+ describe_partitioner_args args = new describe_partitioner_args();
+ try {
+ args.read(iprot);
+ } catch (TProtocolException e) {
+ iprot.readMessageEnd();
+ TApplicationException x = new
TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ oprot.writeMessageBegin(new TMessage("describe_partitioner",
TMessageType.EXCEPTION, seqid));
+ x.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ return;
+ }
+ iprot.readMessageEnd();
+ describe_partitioner_result result = new describe_partitioner_result();
+ result.success = iface_.describe_partitioner();
+ oprot.writeMessageBegin(new TMessage("describe_partitioner",
TMessageType.REPLY, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+ }
+
+ }
+
private class describe_keyspace implements ProcessFunction {
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws
TException
{
@@ -18727,6 +18791,476 @@ public class Cassandra {
}
+ public static class describe_partitioner_args implements
TBase<describe_partitioner_args._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_partitioner_args> {
+ private static final TStruct STRUCT_DESC = new
TStruct("describe_partitioner_args");
+
+
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+;
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(describe_partitioner_args.class,
metaDataMap);
+ }
+
+ public describe_partitioner_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public describe_partitioner_args(describe_partitioner_args other) {
+ }
+
+ public describe_partitioner_args deepCopy() {
+ return new describe_partitioner_args(this);
+ }
+
+ @Deprecated
+ public describe_partitioner_args clone() {
+ return new describe_partitioner_args(this);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof describe_partitioner_args)
+ return this.equals((describe_partitioner_args)that);
+ return false;
+ }
+
+ public boolean equals(describe_partitioner_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(describe_partitioner_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ describe_partitioner_args typedOther = (describe_partitioner_args)other;
+
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("describe_partitioner_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
+ public static class describe_partitioner_result implements
TBase<describe_partitioner_result._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_partitioner_result> {
+ private static final TStruct STRUCT_DESC = new
TStruct("describe_partitioner_result");
+
+ private static final TField SUCCESS_FIELD_DESC = new TField("success",
TType.STRING, (short)0);
+
+ public String success;
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements TFieldIdEnum {
+ SUCCESS((short)0, "success");
+
+ private static final Map<Integer, _Fields> byId = new HashMap<Integer,
_Fields>();
+ private static final Map<String, _Fields> byName = new HashMap<String,
_Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byId.put((int)field._thriftId, field);
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not
found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ return byId.get(fieldId);
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " +
fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new EnumMap<_Fields, FieldMetaData>(_Fields.class)
{{
+ put(_Fields.SUCCESS, new FieldMetaData("success",
TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRING)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(describe_partitioner_result.class,
metaDataMap);
+ }
+
+ public describe_partitioner_result() {
+ }
+
+ public describe_partitioner_result(
+ String success)
+ {
+ this();
+ this.success = success;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public describe_partitioner_result(describe_partitioner_result other) {
+ if (other.isSetSuccess()) {
+ this.success = other.success;
+ }
+ }
+
+ public describe_partitioner_result deepCopy() {
+ return new describe_partitioner_result(this);
+ }
+
+ @Deprecated
+ public describe_partitioner_result clone() {
+ return new describe_partitioner_result(this);
+ }
+
+ public String getSuccess() {
+ return this.success;
+ }
+
+ public describe_partitioner_result setSuccess(String success) {
+ this.success = success;
+ return this;
+ }
+
+ public void unsetSuccess() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been asigned a value) and
false otherwise */
+ public boolean isSetSuccess() {
+ return this.success != null;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unsetSuccess();
+ } else {
+ setSuccess((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value);
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return getSuccess();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ public Object getFieldValue(int fieldId) {
+ return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId));
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been
asigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return isSetSuccess();
+ }
+ throw new IllegalStateException();
+ }
+
+ public boolean isSet(int fieldID) {
+ return isSet(_Fields.findByThriftIdOrThrow(fieldID));
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof describe_partitioner_result)
+ return this.equals((describe_partitioner_result)that);
+ return false;
+ }
+
+ public boolean equals(describe_partitioner_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(describe_partitioner_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ describe_partitioner_result typedOther =
(describe_partitioner_result)other;
+
+ lastComparison =
Boolean.valueOf(isSetSuccess()).compareTo(typedOther.isSetSuccess());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSuccess()) { lastComparison =
TBaseHelper.compareTo(success, typedOther.success);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 0: // SUCCESS
+ if (field.type == TType.STRING) {
+ this.success = iprot.readString();
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.isSetSuccess()) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ oprot.writeString(this.success);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("describe_partitioner_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ }
+
+ }
+
public static class describe_keyspace_args implements
TBase<describe_keyspace_args._Fields>, java.io.Serializable, Cloneable,
Comparable<describe_keyspace_args> {
private static final TStruct STRUCT_DESC = new
TStruct("describe_keyspace_args");
Modified:
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
(original)
+++
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
Thu Jul 15 03:29:50 2010
@@ -42,6 +42,6 @@ import org.slf4j.LoggerFactory;
public class Constants {
- public static final String VERSION = "2.1.0";
+ public static final String VERSION = "2.2.0";
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Thu Jul 15 03:29:50 2010
@@ -25,10 +25,13 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
+import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
@@ -40,9 +43,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
public class ColumnFamilyRecordReader extends RecordReader<String,
SortedMap<byte[], IColumn>>
{
@@ -104,34 +107,44 @@ public class ColumnFamilyRecordReader ex
private String startToken;
private int totalRead = 0;
private int i = 0;
- private AbstractType comparator = ConfigHelper.getComparator(conf);
- private AbstractType subComparator =
ConfigHelper.getSubComparator(conf);
- private IPartitioner partitioner = ConfigHelper.getPartitioner(conf);
+ private final AbstractType comparator;
+ private final AbstractType subComparator;
+ private final IPartitioner partitioner;
private TSocket socket;
+ private Cassandra.Client client;
- private void maybeInit()
+ private RowIterator()
{
- // check if we need another batch
- if (rows != null && i >= rows.size())
- rows = null;
-
- if (rows != null)
- return;
-
- // close previous connection if one is open
- close();
-
socket = new TSocket(getLocation(),
ConfigHelper.getThriftPort(conf));
TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket,
false, false);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ client = new Cassandra.Client(binaryProtocol);
+
try
{
socket.open();
+ partitioner =
DatabaseDescriptor.newPartitioner(client.describe_partitioner());
+ Map<String, String> info =
client.describe_keyspace(keyspace).get(cfName);
+ comparator =
DatabaseDescriptor.getComparator(info.get("CompareWith"));
+ subComparator =
DatabaseDescriptor.getComparator(info.get("CompareSubcolumnsWith"));
}
- catch (TTransportException e)
+ catch (TException e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("error communicating via Thrift",
e);
}
+ catch (NotFoundException e)
+ {
+ throw new RuntimeException("server reports no such keyspace "
+ keyspace, e);
+ }
+ }
+
+ private void maybeInit()
+ {
+ // check if we need another batch
+ if (rows != null && i >= rows.size())
+ rows = null;
+
+ if (rows != null)
+ return;
if (startToken == null)
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Thu Jul 15 03:29:50 2010
@@ -51,22 +51,6 @@ public class ConfigHelper
private static final String PARTITIONER = "cassandra.partitioner";
/**
- * Set the keyspace, column family, column comparator, and row partitioner
for this job.
- *
- * @param conf Job configuration you are about to run
- * @param keyspace
- * @param columnFamily
- * @param comparator
- * @param partitioner
- */
- public static void setColumnFamily(Configuration conf, String keyspace,
String columnFamily, String comparator, String partitioner)
- {
- setColumnFamily(conf, keyspace, columnFamily);
- conf.set(COMPARATOR, comparator);
- conf.set(PARTITIONER, partitioner);
- }
-
- /**
* Set the keyspace and column family for this job.
* Comparator and Partitioner types will be read from storage-conf.xml.
*
@@ -97,18 +81,6 @@ public class ConfigHelper
}
/**
- * Set the subcomparator to use in the configured ColumnFamily [of
SuperColumns].
- * Optional when storage-conf.xml is provided.
- *
- * @param conf
- * @param subComparator
- */
- public static void setSubComparator(Configuration conf, String
subComparator)
- {
- conf.set(SUB_COMPARATOR, subComparator);
- }
-
- /**
* The address and port of a Cassandra node that Hadoop can contact over
Thrift
* to learn more about the Cassandra cluster. Optional when
storage-conf.xml
* is provided.
@@ -237,26 +209,4 @@ public class ConfigHelper
String v = conf.get(INITIAL_THRIFT_ADDRESS);
return v == null ?
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
}
-
- public static AbstractType getComparator(Configuration conf)
- {
- String v = conf.get(COMPARATOR);
- return v == null
- ? DatabaseDescriptor.getComparator(getKeyspace(conf),
getColumnFamily(conf))
- : DatabaseDescriptor.getComparator(v);
- }
-
- public static AbstractType getSubComparator(Configuration conf)
- {
- String v = conf.get(SUB_COMPARATOR);
- return v == null
- ? DatabaseDescriptor.getSubComparator(getKeyspace(conf),
getColumnFamily(conf))
- : DatabaseDescriptor.getComparator(v);
- }
-
- public static IPartitioner getPartitioner(Configuration conf)
- {
- String v = conf.get(PARTITIONER);
- return v == null ? DatabaseDescriptor.getPartitioner() :
DatabaseDescriptor.newPartitioner(v);
- }
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=964293&r1=964292&r2=964293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
Thu Jul 15 03:29:50 2010
@@ -633,6 +633,11 @@ public class CassandraServer implements
return ranges;
}
+ public String describe_partitioner() throws TException
+ {
+ return StorageService.getPartitioner().getClass().getName();
+ }
+
public List<String> describe_splits(String start_token, String end_token,
int keys_per_split) throws TException
{
Token.TokenFactory tf =
StorageService.getPartitioner().getTokenFactory();
@@ -668,6 +673,5 @@ public class CassandraServer implements
if (!loginDone.get()) throw new InvalidRequestException("Login is
required before any other API calls");
}
-
// main method moved to CassandraDaemon
}