Author: jbellis
Date: Mon Feb 28 19:40:19 2011
New Revision: 1075502
URL: http://svn.apache.org/viewvc?rev=1075502&view=rev
Log:
add countercolumn support to SSTableExport
patch by slebresne; reviewed by Pavel Yaskevich for CASSANDRA-2093
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
cassandra/trunk/test/resources/SimpleCF.json
cassandra/trunk/test/resources/SuperCF.json
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Feb 28 19:40:19 2011
@@ -1,7 +1,7 @@
0.8-dev
* avoid double RowMutation serialization on write path (CASSANDRA-1800)
* adds support for columns that act as incr/decr counters
- (CASSANDRA-1072, 1937, 1944, 1936, 2101)
+ (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093)
* make NetworkTopologyStrategy the default (CASSANDRA-1960)
* configurable internode encryption (CASSANDRA-1567)
* human readable column names in sstable2json output (CASSANDRA-1933)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
Mon Feb 28 19:40:19 2011
@@ -34,29 +34,15 @@ public class CounterColumnType extends A
public int compare(ByteBuffer o1, ByteBuffer o2)
{
- if (o1.remaining() == 0)
- {
- return o2.remaining() == 0 ? 0 : -1;
- }
- if (o2.remaining() == 0)
- {
- return 1;
- }
+ if (o1 == null)
+ return null == o2 ? 0 : -1;
return ByteBufferUtil.compareUnsigned(o1, o2);
}
public String getString(ByteBuffer bytes)
{
- if (bytes.remaining() == 0)
- {
- return "";
- }
- if (bytes.remaining() != 8)
- {
- throw new MarshalException("A long is exactly 8 bytes");
- }
- return String.valueOf(bytes.getLong(bytes.position()));
+ return ByteBufferUtil.bytesToHex(bytes);
}
/**
@@ -99,6 +85,11 @@ public class CounterColumnType extends A
}
}
+ public ByteBuffer fromString(String source)
+ {
+ return ByteBufferUtil.hexToBytes(source);
+ }
+
public void validate(ByteBuffer bytes) throws MarshalException
{
if (bytes.remaining() != 8 && bytes.remaining() != 0)
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Mon
Feb 28 19:40:19 2011
@@ -144,16 +144,28 @@ public class SSTableExport
out.print(quote(validator.getString(value)));
out.print(", ");
out.print(column.timestamp());
- out.print(", ");
- out.print(column.isMarkedForDelete());
- if (column instanceof ExpiringColumn)
+ if (column instanceof DeletedColumn)
{
out.print(", ");
+ out.print("\"d\"");
+ }
+ else if (column instanceof ExpiringColumn)
+ {
+ out.print(", ");
+ out.print("\"e\"");
+ out.print(", ");
out.print(((ExpiringColumn) column).getTimeToLive());
out.print(", ");
out.print(column.getLocalDeletionTime());
}
+ else if (column instanceof CounterColumn)
+ {
+ out.print(", ");
+ out.print("\"c\"");
+ out.print(", ");
+ out.print(((CounterColumn) column).timestampOfLastDelete());
+ }
out.print("]");
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Mon
Feb 28 19:40:19 2011
@@ -82,10 +82,15 @@ public class SSTableImport
private ByteBuffer name;
private ByteBuffer value;
private long timestamp;
- private boolean isDeleted;
+
+ private String kind;
+ // Expiring columns
private int ttl;
private int localExpirationTime;
+ // Counter columns
+ private long timestampOfLastDelete;
+
public JsonColumn(T json, CFMetaData meta, boolean isSubColumn)
{
AbstractType comparator = (isSubColumn) ? meta.subcolumnComparator
: meta.comparator;
@@ -94,23 +99,61 @@ public class SSTableImport
{
List fields = (List<?>) json;
- assert fields.size() == 4 || fields.size() == 6 : "Column
definition should have 4 or 6 fields.";
+ assert fields.size() >= 3 : "Column definition should have at
least 3";
name = stringAsType((String) fields.get(0), comparator);
value = stringAsType((String) fields.get(1),
meta.getValueValidator(name.duplicate()));
-
timestamp = (Long) fields.get(2);
- isDeleted = (Boolean) fields.get(3);
+ kind = "";
-
- if (fields.size() == 6)
+ if (fields.size() > 3)
{
- ttl = (Integer) fields.get(4);
- localExpirationTime = (int) (long) ((Long) fields.get(5));
+ if (fields.get(3) instanceof Boolean)
+ {
+ // old format, reading this for backward compatibility
sake
+ if (fields.size() == 6)
+ {
+ kind = "e";
+ ttl = (Integer) fields.get(4);
+ localExpirationTime = (int) (long) ((Long)
fields.get(5));
+ }
+ else
+ {
+ kind = ((Boolean) fields.get(3)) ? "d" : "";
+ }
+ }
+ else
+ {
+ kind = (String) fields.get(3);
+ if (isExpiring())
+ {
+ ttl = (Integer) fields.get(4);
+ localExpirationTime = (int) (long) ((Long)
fields.get(5));
+ }
+ else if (isCounter())
+ {
+ timestampOfLastDelete = (long) ((Integer)
fields.get(4));
+ }
+ }
}
}
}
+ public boolean isDeleted()
+ {
+ return kind.equals("d");
+ }
+
+ public boolean isExpiring()
+ {
+ return kind.equals("e");
+ }
+
+ public boolean isCounter()
+ {
+ return kind.equals("c");
+ }
+
public ByteBuffer getName()
{
return name.duplicate();
@@ -144,11 +187,15 @@ public class SSTableImport
JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName !=
null));
QueryPath path = new QueryPath(cfm.cfName, superName,
col.getName());
- if (col.ttl > 0)
+ if (col.isExpiring())
{
cfamily.addColumn(null, new ExpiringColumn(col.getName(),
col.getValue(), col.timestamp, col.ttl, col.localExpirationTime));
}
- else if (col.isDeleted)
+ else if (col.isCounter())
+ {
+ cfamily.addColumn(null, new CounterColumn(col.getName(),
col.getValue(), col.timestamp, col.timestampOfLastDelete));
+ }
+ else if (col.isDeleted())
{
cfamily.addTombstone(path, col.getValue(), col.timestamp);
}
Modified: cassandra/trunk/test/resources/SimpleCF.json
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SimpleCF.json?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/test/resources/SimpleCF.json (original)
+++ cassandra/trunk/test/resources/SimpleCF.json Mon Feb 28 19:40:19 2011
@@ -1,4 +1,4 @@
{
- "726f7741": [["636f6c4141", "76616c4141", 1294532915068, false],
["636f6c4142", "76616c4142", 1294532915069, false], ["636f6c4143",
"76616c4143", 1294532915071, false, 42, 2000000000 ]],
- "726f7742": [["636f6c4241", "76616c4241", 1294532915070, false],
["636f6c4242", "76616c4242", 1294532915073, false]]
+ "726f7741": [["636f6c4141", "76616c4141", 1294532915068], ["636f6c4142",
"76616c4142", 1294532915069], ["636f6c4143", "76616c4143", 1294532915071, "e",
42, 2000000000]],
+ "726f7742": [["636f6c4241", "76616c4241", 1294532915070], ["636f6c4242",
"76616c4242", 1294532915073]]
}
Modified: cassandra/trunk/test/resources/SuperCF.json
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SuperCF.json?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/test/resources/SuperCF.json (original)
+++ cassandra/trunk/test/resources/SuperCF.json Mon Feb 28 19:40:19 2011
@@ -1,4 +1,4 @@
{
- "726f7741": {"737570657241": {"deletedAt": -9223372036854775808,
"subColumns": [["636f6c4141", "76616c75654141", 1294532915069, false],
["636f6c4142", "76616c75654142", 1294532915069, false]]}},
- "726f7742": {"737570657242": {"deletedAt": -9223372036854775808,
"subColumns": [["636f6c4241", "76616c75654241", 1294532915069, false],
["636f6c4242", "76616c75654242", 1294532915069, false]]}}
+ "726f7741": {"737570657241": {"deletedAt": -9223372036854775808,
"subColumns": [["636f6c4141", "76616c75654141", 1294532915069], ["636f6c4142",
"76616c75654142", 1294532915069]]}},
+ "726f7742": {"737570657242": {"deletedAt": -9223372036854775808,
"subColumns": [["636f6c4241", "76616c75654241", 1294532915069], ["636f6c4242",
"76616c75654242", 1294532915069]]}}
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
Mon Feb 28 19:40:19 2011
@@ -28,6 +28,7 @@ import java.util.Arrays;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -132,7 +133,7 @@ public class SSTableExportTest extends S
JSONArray rowB = (JSONArray)json.get(asHex("rowB"));
JSONArray colB = (JSONArray)rowB.get(0);
- assert !(Boolean)colB.get(3);
+ assert colB.size() == 3;
JSONArray rowExclude = (JSONArray)json.get(asHex("rowExclude"));
assert rowExclude == null;
@@ -174,7 +175,7 @@ public class SSTableExportTest extends S
JSONArray colA = (JSONArray)subColumns.get(0);
JSONObject rowExclude = (JSONObject)json.get(asHex("rowExclude"));
assert
hexToBytes((String)colA.get(1)).equals(ByteBufferUtil.bytes("valA"));
- assert !(Boolean)colA.get(3);
+ assert colA.size() == 3;
assert rowExclude == null;
}
@@ -215,4 +216,31 @@ public class SSTableExportTest extends S
cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
assert cf == null;
}
+
+ @Test
+ public void testExportCounterCf() throws IOException
+ {
+ File tempSS = tempSSTableFile("Keyspace1", "Counter1");
+ ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Counter1");
+ SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+
+ // Add rowA
+ cfamily.addColumn(null, new
CounterColumn(ByteBufferUtil.bytes("colA"), 42, System.currentTimeMillis()));
+ writer.append(Util.dk("rowA"), cfamily);
+ cfamily.clear();
+
+ SSTableReader reader = writer.closeAndOpenReader();
+
+ // Export to JSON and verify
+ File tempJson = File.createTempFile("Counter1", ".json");
+ SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new
String[0]);
+
+ JSONObject json = (JSONObject)JSONValue.parse(new
FileReader(tempJson));
+
+ JSONArray rowA = (JSONArray)json.get(asHex("rowA"));
+ JSONArray colA = (JSONArray)rowA.get(0);
+ assert
hexToBytes((String)colA.get(0)).equals(ByteBufferUtil.bytes("colA"));
+ assert ((String) colA.get(3)).equals("c");
+ assert (Long) colA.get(4) == Long.MIN_VALUE;
+ }
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java?rev=1075502&r1=1075501&r2=1075502&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
Mon Feb 28 19:40:19 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.CounterColumn;
import org.apache.cassandra.db.DeletedColumn;
import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.IColumn;
@@ -72,6 +73,28 @@ public class SSTableImportTest extends S
}
@Test
+ public void testImportSimpleCfOldFormat() throws IOException
+ {
+ // Import JSON to temp SSTable file
+ String jsonUrl =
getClass().getClassLoader().getResource("SimpleCF.oldformat.json").getPath();
+ File tempSS = tempSSTableFile("Keyspace1", "Standard1");
+ SSTableImport.importJson(jsonUrl, "Keyspace1", "Standard1",
tempSS.getPath());
+
+ // Verify results
+ SSTableReader reader =
SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new
QueryPath("Standard1"));
+ IColumnIterator iter = qf.getSSTableColumnIterator(reader);
+ ColumnFamily cf = iter.getColumnFamily();
+ while (iter.hasNext()) cf.addColumn(iter.next());
+ assert
cf.getColumn(ByteBufferUtil.bytes("colAA")).value().equals(hexToBytes("76616c4141"));
+ assert !(cf.getColumn(ByteBufferUtil.bytes("colAA")) instanceof
DeletedColumn);
+ IColumn expCol = cf.getColumn(ByteBufferUtil.bytes("colAC"));
+ assert expCol.value().equals(hexToBytes("76616c4143"));
+ assert expCol instanceof ExpiringColumn;
+ assert ((ExpiringColumn)expCol).getTimeToLive() == 42 &&
expCol.getLocalDeletionTime() == 2000000000;
+ }
+
+ @Test
public void testImportSuperCf() throws IOException, ParseException
{
String jsonUrl =
getClass().getClassLoader().getResource("SuperCF.json").getPath();
@@ -102,4 +125,23 @@ public class SSTableImportTest extends S
int result = SSTableImport.importSorted(jsonUrl, columnFamily,
tempSS.getPath(), partitioner);
assert result == -1;
}
+
+ @Test
+ public void testImportCounterCf() throws IOException
+ {
+ // Import JSON to temp SSTable file
+ String jsonUrl =
getClass().getClassLoader().getResource("CounterCF.json").getPath();
+ File tempSS = tempSSTableFile("Keyspace1", "Counter1");
+ SSTableImport.importJson(jsonUrl, "Keyspace1", "Counter1",
tempSS.getPath());
+
+ // Verify results
+ SSTableReader reader =
SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
+ QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new
QueryPath("Counter1"));
+ IColumnIterator iter = qf.getSSTableColumnIterator(reader);
+ ColumnFamily cf = iter.getColumnFamily();
+ while (iter.hasNext()) cf.addColumn(iter.next());
+ IColumn c = cf.getColumn(ByteBufferUtil.bytes("colAA"));
+ assert c instanceof CounterColumn: c;
+ assert ((CounterColumn) c).total() == 42;
+ }
}