Author: jbellis Date: Mon Feb 28 19:40:19 2011 New Revision: 1075502 URL: http://svn.apache.org/viewvc?rev=1075502&view=rev Log: add countercolumn support to SSTableExport patch by slebresne; reviewed by Pavel Yaskevich for CASSANDRA-2093
Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java cassandra/trunk/test/resources/SimpleCF.json cassandra/trunk/test/resources/SuperCF.json cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Mon Feb 28 19:40:19 2011 @@ -1,7 +1,7 @@ 0.8-dev * avoid double RowMutation serialization on write path (CASSANDRA-1800) * adds support for columns that act as incr/decr counters - (CASSANDRA-1072, 1937, 1944, 1936, 2101) + (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093) * make NetworkTopologyStrategy the default (CASSANDRA-1960) * configurable internode encryption (CASSANDRA-1567) * human readable column names in sstable2json output (CASSANDRA-1933) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java Mon Feb 28 19:40:19 2011 @@ -34,29 +34,15 @@ public class CounterColumnType extends A public int compare(ByteBuffer o1, ByteBuffer o2) { - if (o1.remaining() == 0) - { - return o2.remaining() == 0 ? 0 : -1; - } - if (o2.remaining() == 0) - { - return 1; - } + if (o1 == null) + return null == o2 ? 0 : -1; return ByteBufferUtil.compareUnsigned(o1, o2); } public String getString(ByteBuffer bytes) { - if (bytes.remaining() == 0) - { - return ""; - } - if (bytes.remaining() != 8) - { - throw new MarshalException("A long is exactly 8 bytes"); - } - return String.valueOf(bytes.getLong(bytes.position())); + return ByteBufferUtil.bytesToHex(bytes); } /** @@ -99,6 +85,11 @@ public class CounterColumnType extends A } } + public ByteBuffer fromString(String source) + { + return ByteBufferUtil.hexToBytes(source); + } + public void validate(ByteBuffer bytes) throws MarshalException { if (bytes.remaining() != 8 && bytes.remaining() != 0) Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Mon Feb 28 19:40:19 2011 @@ -144,16 +144,28 @@ public class SSTableExport out.print(quote(validator.getString(value))); out.print(", "); out.print(column.timestamp()); - out.print(", "); - out.print(column.isMarkedForDelete()); - if (column instanceof ExpiringColumn) + if (column instanceof DeletedColumn) { out.print(", "); + out.print("\"d\""); + } + else if (column instanceof ExpiringColumn) + { + out.print(", "); + out.print("\"e\""); + out.print(", "); out.print(((ExpiringColumn) column).getTimeToLive()); out.print(", "); out.print(column.getLocalDeletionTime()); } + else if (column instanceof CounterColumn) + { + out.print(", "); + out.print("\"c\""); + out.print(", "); + out.print(((CounterColumn) column).timestampOfLastDelete()); + } out.print("]"); } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Mon Feb 28 19:40:19 2011 @@ -82,10 +82,15 @@ public class SSTableImport private ByteBuffer name; private ByteBuffer value; private long timestamp; - private boolean isDeleted; + + private String kind; + // Expiring columns private int ttl; private int localExpirationTime; + // Counter columns + private long timestampOfLastDelete; + public JsonColumn(T json, CFMetaData meta, boolean isSubColumn) { AbstractType comparator = (isSubColumn) ? meta.subcolumnComparator : meta.comparator; @@ -94,23 +99,61 @@ public class SSTableImport { List fields = (List<?>) json; - assert fields.size() == 4 || fields.size() == 6 : "Column definition should have 4 or 6 fields."; + assert fields.size() >= 3 : "Column definition should have at least 3"; name = stringAsType((String) fields.get(0), comparator); value = stringAsType((String) fields.get(1), meta.getValueValidator(name.duplicate())); - timestamp = (Long) fields.get(2); - isDeleted = (Boolean) fields.get(3); + kind = ""; - - if (fields.size() == 6) + if (fields.size() > 3) { - ttl = (Integer) fields.get(4); - localExpirationTime = (int) (long) ((Long) fields.get(5)); + if (fields.get(3) instanceof Boolean) + { + // old format, reading this for backward compatibility sake + if (fields.size() == 6) + { + kind = "e"; + ttl = (Integer) fields.get(4); + localExpirationTime = (int) (long) ((Long) fields.get(5)); + } + else + { + kind = ((Boolean) fields.get(3)) ? "d" : ""; + } + } + else + { + kind = (String) fields.get(3); + if (isExpiring()) + { + ttl = (Integer) fields.get(4); + localExpirationTime = (int) (long) ((Long) fields.get(5)); + } + else if (isCounter()) + { + timestampOfLastDelete = (long) ((Integer) fields.get(4)); + } + } } } } + public boolean isDeleted() + { + return kind.equals("d"); + } + + public boolean isExpiring() + { + return kind.equals("e"); + } + + public boolean isCounter() + { + return kind.equals("c"); + } + public ByteBuffer getName() { return name.duplicate(); @@ -144,11 +187,15 @@ public class SSTableImport JsonColumn col = new JsonColumn<List>((List) c, cfm, (superName != null)); QueryPath path = new QueryPath(cfm.cfName, superName, col.getName()); - if (col.ttl > 0) + if (col.isExpiring()) { cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, col.localExpirationTime)); } - else if (col.isDeleted) + else if (col.isCounter()) + { + cfamily.addColumn(null, new CounterColumn(col.getName(), col.getValue(), col.timestamp, col.timestampOfLastDelete)); + } + else if (col.isDeleted()) { cfamily.addTombstone(path, col.getValue(), col.timestamp); } Modified: cassandra/trunk/test/resources/SimpleCF.json URL: http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SimpleCF.json?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/test/resources/SimpleCF.json (original) +++ cassandra/trunk/test/resources/SimpleCF.json Mon Feb 28 19:40:19 2011 @@ -1,4 +1,4 @@ { - "726f7741": [["636f6c4141", "76616c4141", 1294532915068, false], ["636f6c4142", "76616c4142", 1294532915069, false], ["636f6c4143", "76616c4143", 1294532915071, false, 42, 2000000000 ]], - "726f7742": [["636f6c4241", "76616c4241", 1294532915070, false], ["636f6c4242", "76616c4242", 1294532915073, false]] + "726f7741": [["636f6c4141", "76616c4141", 1294532915068], ["636f6c4142", "76616c4142", 1294532915069], ["636f6c4143", "76616c4143", 1294532915071, "e", 42, 2000000000]], + "726f7742": [["636f6c4241", "76616c4241", 1294532915070], ["636f6c4242", "76616c4242", 1294532915073]] } Modified: cassandra/trunk/test/resources/SuperCF.json URL: http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SuperCF.json?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/test/resources/SuperCF.json (original) +++ cassandra/trunk/test/resources/SuperCF.json Mon Feb 28 19:40:19 2011 @@ -1,4 +1,4 @@ { - "726f7741": {"737570657241": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4141", "76616c75654141", 1294532915069, false], ["636f6c4142", "76616c75654142", 1294532915069, false]]}}, - "726f7742": {"737570657242": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4241", "76616c75654241", 1294532915069, false], ["636f6c4242", "76616c75654242", 1294532915069, false]]}} + "726f7741": {"737570657241": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4141", "76616c75654141", 1294532915069], ["636f6c4142", "76616c75654142", 1294532915069]]}}, + "726f7742": {"737570657242": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4241", "76616c75654241", 1294532915069], ["636f6c4242", "76616c75654242", 1294532915069]]}} } Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Mon Feb 28 19:40:19 2011 @@ -28,6 +28,7 @@ import java.util.Arrays; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.CounterColumn; import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; @@ -132,7 +133,7 @@ public class SSTableExportTest extends S JSONArray rowB = (JSONArray)json.get(asHex("rowB")); JSONArray colB = (JSONArray)rowB.get(0); - assert !(Boolean)colB.get(3); + assert colB.size() == 3; JSONArray rowExclude = (JSONArray)json.get(asHex("rowExclude")); assert rowExclude == null; @@ -174,7 +175,7 @@ public class SSTableExportTest extends S JSONArray colA = (JSONArray)subColumns.get(0); JSONObject rowExclude = (JSONObject)json.get(asHex("rowExclude")); assert hexToBytes((String)colA.get(1)).equals(ByteBufferUtil.bytes("valA")); - assert !(Boolean)colA.get(3); + assert colA.size() == 3; assert rowExclude == null; } @@ -215,4 +216,31 @@ public class SSTableExportTest extends S cf = qf.getSSTableColumnIterator(reader).getColumnFamily(); assert cf == null; } + + @Test + public void testExportCounterCf() throws IOException + { + File tempSS = tempSSTableFile("Keyspace1", "Counter1"); + ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Counter1"); + SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2); + + // Add rowA + cfamily.addColumn(null, new CounterColumn(ByteBufferUtil.bytes("colA"), 42, System.currentTimeMillis())); + writer.append(Util.dk("rowA"), cfamily); + cfamily.clear(); + + SSTableReader reader = writer.closeAndOpenReader(); + + // Export to JSON and verify + File tempJson = File.createTempFile("Counter1", ".json"); + SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new String[0]); + + JSONObject json = (JSONObject)JSONValue.parse(new FileReader(tempJson)); + + JSONArray rowA = (JSONArray)json.get(asHex("rowA")); + JSONArray colA = (JSONArray)rowA.get(0); + assert hexToBytes((String)colA.get(0)).equals(ByteBufferUtil.bytes("colA")); + assert ((String) colA.get(3)).equals("c"); + assert (Long) colA.get(4) == Long.MIN_VALUE; + } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java?rev=1075502&r1=1075501&r2=1075502&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java Mon Feb 28 19:40:19 2011 @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.CounterColumn; import org.apache.cassandra.db.DeletedColumn; import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.IColumn; @@ -72,6 +73,28 @@ public class SSTableImportTest extends S } @Test + public void testImportSimpleCfOldFormat() throws IOException + { + // Import JSON to temp SSTable file + String jsonUrl = getClass().getClassLoader().getResource("SimpleCF.oldformat.json").getPath(); + File tempSS = tempSSTableFile("Keyspace1", "Standard1"); + SSTableImport.importJson(jsonUrl, "Keyspace1", "Standard1", tempSS.getPath()); + + // Verify results + SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath())); + QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new QueryPath("Standard1")); + IColumnIterator iter = qf.getSSTableColumnIterator(reader); + ColumnFamily cf = iter.getColumnFamily(); + while (iter.hasNext()) cf.addColumn(iter.next()); + assert cf.getColumn(ByteBufferUtil.bytes("colAA")).value().equals(hexToBytes("76616c4141")); + assert !(cf.getColumn(ByteBufferUtil.bytes("colAA")) instanceof DeletedColumn); + IColumn expCol = cf.getColumn(ByteBufferUtil.bytes("colAC")); + assert expCol.value().equals(hexToBytes("76616c4143")); + assert expCol instanceof ExpiringColumn; + assert ((ExpiringColumn)expCol).getTimeToLive() == 42 && expCol.getLocalDeletionTime() == 2000000000; + } + + @Test public void testImportSuperCf() throws IOException, ParseException { String jsonUrl = getClass().getClassLoader().getResource("SuperCF.json").getPath(); @@ -102,4 +125,23 @@ public class SSTableImportTest extends S int result = SSTableImport.importSorted(jsonUrl, columnFamily, tempSS.getPath(), partitioner); assert result == -1; } + + @Test + public void testImportCounterCf() throws IOException + { + // Import JSON to temp SSTable file + String jsonUrl = getClass().getClassLoader().getResource("CounterCF.json").getPath(); + File tempSS = tempSSTableFile("Keyspace1", "Counter1"); + SSTableImport.importJson(jsonUrl, "Keyspace1", "Counter1", tempSS.getPath()); + + // Verify results + SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath())); + QueryFilter qf = QueryFilter.getIdentityFilter(Util.dk("rowA"), new QueryPath("Counter1")); + IColumnIterator iter = qf.getSSTableColumnIterator(reader); + ColumnFamily cf = iter.getColumnFamily(); + while (iter.hasNext()) cf.addColumn(iter.next()); + IColumn c = cf.getColumn(ByteBufferUtil.bytes("colAA")); + assert c instanceof CounterColumn: c; + assert ((CounterColumn) c).total() == 42; + } }