Author: jbellis
Date: Thu Apr 16 16:23:20 2009
New Revision: 765680
URL: http://svn.apache.org/viewvc?rev=765680&view=rev
Log:
expose redone remove to thrift. patch by jbellis; reviewed by Jun Rao for #83
Modified:
incubator/cassandra/trunk/interface/cassandra.thrift
incubator/cassandra/trunk/src/org/apache/cassandra/service/Cassandra.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=765680&r1=765679&r2=765680&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Thu Apr 16 16:23:20
2009
@@ -60,7 +60,7 @@
async void insert(string tablename,string key,string
columnFamily_column, string cellData,i64 timestamp),
async void batch_insert(batch_mutation_t batchMutation),
bool batch_insert_blocking(batch_mutation_t batchMutation) throws
(1:CassandraException e),
- async void remove(string tablename,string key,string
columnFamily_column),
+ bool remove(string tablename,string key,string
columnFamily_column, i64 timestamp, bool block),
list<column_t> get_columns_since(string tablename, string key, string
columnFamily_column, i64 timeStamp) throws (1:CassandraException e),
list<superColumn_t> get_slice_super(string tablename, string key, string
columnFamily_superColumnName, i32 start = -1 , i32 count = -1) throws
(1:CassandraException e),
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/Cassandra.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/Cassandra.java?rev=765680&r1=765679&r2=765680&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/Cassandra.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/Cassandra.java
Thu Apr 16 16:23:20 2009
@@ -36,7 +36,7 @@
public boolean batch_insert_blocking(batch_mutation_t batchMutation)
throws CassandraException, TException;
- public void remove(String tablename, String key, String
columnFamily_column) throws TException;
+ public boolean remove(String tablename, String key, String
columnFamily_column, long timestamp, boolean block) throws TException;
public List<column_t> get_columns_since(String tablename, String key,
String columnFamily_column, long timeStamp) throws CassandraException,
TException;
@@ -314,23 +314,43 @@
throw new TApplicationException(TApplicationException.MISSING_RESULT,
"batch_insert_blocking failed: unknown result");
}
- public void remove(String tablename, String key, String
columnFamily_column) throws TException
+ public boolean remove(String tablename, String key, String
columnFamily_column, long timestamp, boolean block) throws TException
{
- send_remove(tablename, key, columnFamily_column);
+ send_remove(tablename, key, columnFamily_column, timestamp, block);
+ return recv_remove();
}
- public void send_remove(String tablename, String key, String
columnFamily_column) throws TException
+ public void send_remove(String tablename, String key, String
columnFamily_column, long timestamp, boolean block) throws TException
{
oprot_.writeMessageBegin(new TMessage("remove", TMessageType.CALL,
seqid_));
remove_args args = new remove_args();
args.tablename = tablename;
args.key = key;
args.columnFamily_column = columnFamily_column;
+ args.timestamp = timestamp;
+ args.block = block;
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
+ public boolean recv_remove() throws TException
+ {
+ TMessage msg = iprot_.readMessageBegin();
+ if (msg.type == TMessageType.EXCEPTION) {
+ TApplicationException x = TApplicationException.read(iprot_);
+ iprot_.readMessageEnd();
+ throw x;
+ }
+ remove_result result = new remove_result();
+ result.read(iprot_);
+ iprot_.readMessageEnd();
+ if (result.__isset.success) {
+ return result.success;
+ }
+ throw new TApplicationException(TApplicationException.MISSING_RESULT,
"remove failed: unknown result");
+ }
+
public List<column_t> get_columns_since(String tablename, String key,
String columnFamily_column, long timeStamp) throws CassandraException,
TException
{
send_get_columns_since(tablename, key, columnFamily_column, timeStamp);
@@ -875,9 +895,15 @@
remove_args args = new remove_args();
args.read(iprot);
iprot.readMessageEnd();
- iface_.remove(args.tablename, args.key, args.columnFamily_column);
- return;
+ remove_result result = new remove_result();
+ result.success = iface_.remove(args.tablename, args.key,
args.columnFamily_column, args.timestamp, args.block);
+ result.__isset.success = true;
+ oprot.writeMessageBegin(new TMessage("remove", TMessageType.REPLY,
seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
}
+
}
private class get_columns_since implements ProcessFunction {
@@ -4953,6 +4979,8 @@
private static final TField TABLENAME_FIELD_DESC = new TField("tablename",
TType.STRING, (short)-1);
private static final TField KEY_FIELD_DESC = new TField("key",
TType.STRING, (short)-2);
private static final TField COLUMN_FAMILY_COLUMN_FIELD_DESC = new
TField("columnFamily_column", TType.STRING, (short)-3);
+ private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp",
TType.I64, (short)-4);
+ private static final TField BLOCK_FIELD_DESC = new TField("block",
TType.BOOL, (short)-5);
public String tablename;
public static final int TABLENAME = -1;
@@ -4960,12 +4988,18 @@
public static final int KEY = -2;
public String columnFamily_column;
public static final int COLUMNFAMILY_COLUMN = -3;
+ public long timestamp;
+ public static final int TIMESTAMP = -4;
+ public boolean block;
+ public static final int BLOCK = -5;
private final Isset __isset = new Isset();
private static final class Isset implements java.io.Serializable {
public boolean tablename = false;
public boolean key = false;
public boolean columnFamily_column = false;
+ public boolean timestamp = false;
+ public boolean block = false;
}
public static final Map<Integer, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
@@ -4975,6 +5009,10 @@
new FieldValueMetaData(TType.STRING)));
put(COLUMNFAMILY_COLUMN, new FieldMetaData("columnFamily_column",
TFieldRequirementType.DEFAULT,
new FieldValueMetaData(TType.STRING)));
+ put(TIMESTAMP, new FieldMetaData("timestamp",
TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.I64)));
+ put(BLOCK, new FieldMetaData("block", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.BOOL)));
}});
static {
@@ -4987,7 +5025,9 @@
public remove_args(
String tablename,
String key,
- String columnFamily_column)
+ String columnFamily_column,
+ long timestamp,
+ boolean block)
{
this();
this.tablename = tablename;
@@ -4996,6 +5036,10 @@
this.__isset.key = (key != null);
this.columnFamily_column = columnFamily_column;
this.__isset.columnFamily_column = (columnFamily_column != null);
+ this.timestamp = timestamp;
+ this.__isset.timestamp = true;
+ this.block = block;
+ this.__isset.block = true;
}
/**
@@ -5014,6 +5058,10 @@
if (other.columnFamily_column != null) {
this.columnFamily_column = other.columnFamily_column;
}
+ __isset.timestamp = other.__isset.timestamp;
+ this.timestamp = other.timestamp;
+ __isset.block = other.__isset.block;
+ this.block = other.block;
}
@Override
@@ -5087,6 +5135,50 @@
this.__isset.columnFamily_column = value;
}
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ this.__isset.timestamp = true;
+ }
+
+ public void unsetTimestamp() {
+ this.__isset.timestamp = false;
+ }
+
+ // Returns true if field timestamp is set (has been asigned a value) and
false otherwise
+ public boolean isSetTimestamp() {
+ return this.__isset.timestamp;
+ }
+
+ public void setTimestampIsSet(boolean value) {
+ this.__isset.timestamp = value;
+ }
+
+ public boolean isBlock() {
+ return this.block;
+ }
+
+ public void setBlock(boolean block) {
+ this.block = block;
+ this.__isset.block = true;
+ }
+
+ public void unsetBlock() {
+ this.__isset.block = false;
+ }
+
+ // Returns true if field block is set (has been asigned a value) and false
otherwise
+ public boolean isSetBlock() {
+ return this.__isset.block;
+ }
+
+ public void setBlockIsSet(boolean value) {
+ this.__isset.block = value;
+ }
+
public void setFieldValue(int fieldID, Object value) {
switch (fieldID) {
case TABLENAME:
@@ -5101,6 +5193,14 @@
setColumnFamily_column((String)value);
break;
+ case TIMESTAMP:
+ setTimestamp((Long)value);
+ break;
+
+ case BLOCK:
+ setBlock((Boolean)value);
+ break;
+
default:
throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
}
@@ -5117,6 +5217,12 @@
case COLUMNFAMILY_COLUMN:
return getColumnFamily_column();
+ case TIMESTAMP:
+ return new Long(getTimestamp());
+
+ case BLOCK:
+ return new Boolean(isBlock());
+
default:
throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
}
@@ -5131,6 +5237,10 @@
return this.__isset.key;
case COLUMNFAMILY_COLUMN:
return this.__isset.columnFamily_column;
+ case TIMESTAMP:
+ return this.__isset.timestamp;
+ case BLOCK:
+ return this.__isset.block;
default:
throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
}
@@ -5176,6 +5286,24 @@
return false;
}
+ boolean this_present_timestamp = true;
+ boolean that_present_timestamp = true;
+ if (this_present_timestamp || that_present_timestamp) {
+ if (!(this_present_timestamp && that_present_timestamp))
+ return false;
+ if (this.timestamp != that.timestamp)
+ return false;
+ }
+
+ boolean this_present_block = true;
+ boolean that_present_block = true;
+ if (this_present_block || that_present_block) {
+ if (!(this_present_block && that_present_block))
+ return false;
+ if (this.block != that.block)
+ return false;
+ }
+
return true;
}
@@ -5219,6 +5347,22 @@
TProtocolUtil.skip(iprot, field.type);
}
break;
+ case TIMESTAMP:
+ if (field.type == TType.I64) {
+ this.timestamp = iprot.readI64();
+ this.__isset.timestamp = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case BLOCK:
+ if (field.type == TType.BOOL) {
+ this.block = iprot.readBool();
+ this.__isset.block = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
TProtocolUtil.skip(iprot, field.type);
break;
@@ -5251,6 +5395,12 @@
oprot.writeString(this.columnFamily_column);
oprot.writeFieldEnd();
}
+ oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+ oprot.writeI64(this.timestamp);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(BLOCK_FIELD_DESC);
+ oprot.writeBool(this.block);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -5272,6 +5422,206 @@
sb.append("columnFamily_column:");
sb.append(this.columnFamily_column);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("timestamp:");
+ sb.append(this.timestamp);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("block:");
+ sb.append(this.block);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws TException {
+ // check for required fields
+ // check that fields of type enum have valid values
+ }
+
+ }
+
+ public static class remove_result implements TBase, java.io.Serializable,
Cloneable {
+ private static final TStruct STRUCT_DESC = new TStruct("remove_result");
+ private static final TField SUCCESS_FIELD_DESC = new TField("success",
TType.BOOL, (short)0);
+
+ public boolean success;
+ public static final int SUCCESS = 0;
+
+ private final Isset __isset = new Isset();
+ private static final class Isset implements java.io.Serializable {
+ public boolean success = false;
+ }
+
+ public static final Map<Integer, FieldMetaData> metaDataMap =
Collections.unmodifiableMap(new HashMap<Integer, FieldMetaData>() {{
+ put(SUCCESS, new FieldMetaData("success", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.BOOL)));
+ }});
+
+ static {
+ FieldMetaData.addStructMetaDataMap(remove_result.class, metaDataMap);
+ }
+
+ public remove_result() {
+ }
+
+ public remove_result(
+ boolean success)
+ {
+ this();
+ this.success = success;
+ this.__isset.success = true;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public remove_result(remove_result other) {
+ __isset.success = other.__isset.success;
+ this.success = other.success;
+ }
+
+ @Override
+ public remove_result clone() {
+ return new remove_result(this);
+ }
+
+ public boolean isSuccess() {
+ return this.success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ this.__isset.success = true;
+ }
+
+ public void unsetSuccess() {
+ this.__isset.success = false;
+ }
+
+ // Returns true if field success is set (has been asigned a value) and
false otherwise
+ public boolean isSetSuccess() {
+ return this.__isset.success;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ this.__isset.success = value;
+ }
+
+ public void setFieldValue(int fieldID, Object value) {
+ switch (fieldID) {
+ case SUCCESS:
+ setSuccess((Boolean)value);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
+ }
+ }
+
+ public Object getFieldValue(int fieldID) {
+ switch (fieldID) {
+ case SUCCESS:
+ return new Boolean(isSuccess());
+
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
+ }
+ }
+
+ // Returns true if field corresponding to fieldID is set (has been asigned
a value) and false otherwise
+ public boolean isSet(int fieldID) {
+ switch (fieldID) {
+ case SUCCESS:
+ return this.__isset.success;
+ default:
+ throw new IllegalArgumentException("Field " + fieldID + " doesn't
exist!");
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof remove_result)
+ return this.equals((remove_result)that);
+ return false;
+ }
+
+ public boolean equals(remove_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_success = true;
+ boolean that_present_success = true;
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (this.success != that.success)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public void read(TProtocol iprot) throws TException {
+ TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ switch (field.id)
+ {
+ case SUCCESS:
+ if (field.type == TType.BOOL) {
+ this.success = iprot.readBool();
+ this.__isset.success = true;
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+
+ // check for required fields of primitive type, which can't be checked
in the validate method
+ validate();
+ }
+
+ public void write(TProtocol oprot) throws TException {
+ oprot.writeStructBegin(STRUCT_DESC);
+
+ if (this.__isset.success) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ oprot.writeBool(this.success);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("remove_result(");
+ boolean first = true;
+
+ if (!first) sb.append(", ");
+ sb.append("success:");
+ sb.append(this.success);
+ first = false;
sb.append(")");
return sb.toString();
}
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=765680&r1=765679&r2=765680&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
Thu Apr 16 16:23:20 2009
@@ -491,17 +491,12 @@
StorageProxy.insert(rm);
}
- public void remove(String tablename, String key, String
columnFamily_column)
- {
- throw new UnsupportedOperationException("Remove is coming
soon");
- }
-
- public boolean remove(String tablename, String key, String
columnFamily_column, long timestamp, int block_for)
+ public boolean remove(String tablename, String key, String
columnFamily_column, long timestamp, boolean block)
{
logger_.debug("remove");
RowMutation rm = new RowMutation(tablename, key.trim());
rm.delete(columnFamily_column, timestamp);
- if (block_for > 0) {
+ if (block) {
return StorageProxy.insertBlocking(rm);
} else {
StorageProxy.insert(rm);
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=765680&r1=765679&r2=765680&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
Thu Apr 16 16:23:20 2009
@@ -1345,85 +1345,6 @@
}
-
-
- public void testRemove(String filepath) throws Throwable {
- BufferedReader bufReader = new BufferedReader(new
InputStreamReader(
- new FileInputStream(filepath)), 16 * 1024 *
1024);
- String line = null;
- String delimiter_ = new String(",");
- RowMutationMessage rmInbox = null;
- RowMutationMessage rmOutbox = null;
- ColumnFamily cfInbox = null;
- ColumnFamily cfOutbox = null;
- String firstuser = null ;
- String nextuser = null;
- while ((line = bufReader.readLine()) != null) {
- StringTokenizer st = new StringTokenizer(line,
delimiter_);
- int i = 0;
- String threadId = null;
- int lastUpdated = 0;
- int isDeleted = 0;
- int folder = 0;
- String user = null;
- while (st.hasMoreElements()) {
- switch (i) {
- case 0:
- user = (String) st.nextElement();//
sb.append((String)st.nextElement());
- if ( !isNumeric(user))
- continue;
- break;
-
- case 1:
- folder = Integer.parseInt((String)
st.nextElement());// sb.append((String)st.nextElement());
- break;
-
- case 2:
- threadId = (String) st.nextElement();
- break;
-
- case 3:
- lastUpdated = Integer.parseInt((String)
st.nextElement());
- break;
-
- case 4:
- isDeleted = Integer.parseInt((String)
st.nextElement());// (String)st.nextElement();
- break;
-
- default:
- break;
- }
- ++i;
- }
- String key = null;
- if (folder == 0) {
- key = user + ":0";
- } else {
- key = user + ":1";
- }
-
- nextuser = key;
- if(firstuser == null || firstuser.compareTo(nextuser)
!= 0)
- {
- ArrayList<column_t> columns = null;
- firstuser = key;
- try {
- Thread.sleep(1000/requestsPerSecond_,
1000%requestsPerSecond_);
- long t = System.currentTimeMillis();
-
-
peerstorageClient_.remove(tablename_,key,(columnFamilyHack_%divideby_)+":"+threadId);
- numReqs_++;
- totalTime_ = totalTime_ +
(System.currentTimeMillis() - t);
- logger_.debug("Numreqs:" + numReqs_ + "
Average: " + totalTime_/numReqs_+ " Time taken for thrift..."
- +
(System.currentTimeMillis() - t));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-
public void run(String[] args) throws Throwable
{
if (args[0].compareTo("-testWriteMailbox") == 0 ||
@@ -1491,11 +1412,6 @@
testSuperReadThrift(args[2]
+
System.getProperty("file.separator") + fileName);
}
- if ( args[0].compareTo("-testRemove") == 0 )
- {
- testRemove(args[2]
- +
System.getProperty("file.separator") + fileName);
- }
if ( args[0].compareTo("-testPhp") == 0 )
{
testPhp(args[2]