Repository: cassandra Updated Branches: refs/heads/cassandra-1.2 87c4efe81 -> c9cef44a1
Support Thrift tables clustering columns on CqlPagingInputFormat patch by pauloricardomg; reviewed by alexliu68 for CASSANDRA-7445 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9cef44a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9cef44a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9cef44a Branch: refs/heads/cassandra-1.2 Commit: c9cef44a1a6c10036200f410c8a26942e64c8f12 Parents: 87c4efe Author: Sylvain Lebresne <[email protected]> Authored: Thu Jun 26 10:40:22 2014 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Jun 26 10:40:22 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../hadoop/cql3/CqlPagingRecordReader.java | 16 ++--- .../cassandra/pig/ThriftColumnFamilyTest.java | 61 +++++++++++++++++--- 3 files changed, 65 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6adef97..9fbcd9a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +1.2.18 + * Support Thrift tables clustering columns on CqlPagingInputFormat (CASSANDRA-7445) + 1.2.17 * cqlsh: Fix CompositeType columns in DESCRIBE TABLE output (CASSANDRA-7399) * Expose global ColmunFamily metrics (CASSANDRA-7273) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index b6e793c..0542f7e 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -261,7 +261,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, { value.clear(); value.putAll(getCurrentValue()); - + keys.clear(); keys.putAll(getCurrentKey()); @@ -703,7 +703,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, clusterColumns.add(new BoundColumn(key)); parseKeyValidators(ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()))); - + Column rawComparator = cqlRow.columns.get(3); String comparator = ByteBufferUtil.string(ByteBuffer.wrap(rawComparator.getValue())); logger.debug("comparator: {}", comparator); @@ -719,8 +719,8 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, } } - /** - * retrieve the fake partition keys and cluster keys for classic thrift table + /** + * retrieve the fake partition keys and cluster keys for classic thrift table * use CFDefinition to get keys and columns * */ private void retrieveKeysForThriftTables() throws Exception @@ -732,8 +732,10 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, { CFMetaData cfMeta = CFMetaData.fromThrift(cfDef); CFDefinition cfDefinition = new CFDefinition(cfMeta); - for (ColumnIdentifier columnIdentifier : cfDefinition.keys.keySet()) - partitionBoundColumns.add(new BoundColumn(columnIdentifier.toString())); + for (ColumnIdentifier key : cfDefinition.keys.keySet()) + partitionBoundColumns.add(new BoundColumn(key.toString())); + for (ColumnIdentifier column : cfDefinition.columns.keySet()) + clusterColumns.add(new BoundColumn(column.toString())); parseKeyValidators(cfDef.key_validation_class); return; } @@ -814,7 +816,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, this.name = name; } } - + /** get string from a ByteBuffer, catch the exception and throw it as runtime exception*/ private static String stringValue(ByteBuffer value) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9cef44a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java index 223cbf4..6f6aa0b 100644 --- a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java +++ b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java @@ -47,7 +47,7 @@ import org.junit.BeforeClass; import org.junit.Test; public class ThriftColumnFamilyTest extends PigTestBase -{ +{ private static String[] statements = { "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" + " strategy_options={replication_factor:1};", @@ -125,7 +125,7 @@ public class ThriftColumnFamilyTest extends PigTestBase "create column family U8 with " + "key_validation_class = UTF8Type and " + "comparator = UTF8Type;", - + "create column family Bytes with " + "key_validation_class = BytesType and " + "comparator = UTF8Type;", @@ -181,7 +181,22 @@ public class ThriftColumnFamilyTest extends PigTestBase "create column family CompoKeyCopy " + "with key_validation_class = 'CompositeType(UTF8Type,LongType)' " + "and default_validation_class = UTF8Type " + - "and comparator = LongType;" + "and comparator = LongType;", + + + "create column family WideCf " + + " with comparator = UTF8Type " + + " and default_validation_class = UTF8Type " + + " and key_validation_class = UTF8Type " + + " and comparator = UTF8Type;", + + "set WideCf['2014-06-06']['1'] = 'event1';", + "set WideCf['2014-06-06']['2'] = 'event2';", + + "set WideCf['2014-06-07']['3'] = 'event3';", + "set WideCf['2014-06-07']['4'] = 'event4';", + "set WideCf['2014-06-07']['5'] = 'event5';", + "set WideCf['2014-06-07']['6'] = 'event6';", }; @BeforeClass @@ -326,6 +341,36 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test + public void testCqlStorageWithThriftWideRowCf() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException + { + //regular thrift wide row column family with page size set to 1 to cause CASSANDRA-7445 + pig.registerQuery("rows = load 'cql://thriftKs/WideCf?" + defaultParameters + "&page_size=1' using CqlStorage();"); + + /** + "set WideCf['2014-06-06']['1'] = 'event1';", + "set WideCf['2014-06-06']['2'] = 'event2';", + --------------------------------------------- + "set WideCf['2014-06-07']['3'] = 'event3';", + "set WideCf['2014-06-07']['4'] = 'event4';", + "set WideCf['2014-06-07']['5'] = 'event5';", + "set WideCf['2014-06-07']['6'] = 'event6';", + */ + + Iterator<Tuple> it = pig.openIterator("rows"); + for (Integer i = 1; i <= 6; i++) { + Assert.assertTrue(it.hasNext()); + Tuple t = it.next(); + if (i < 3) { + Assert.assertEquals(t.get(0).toString(), "2014-06-06"); + } else { + Assert.assertEquals(t.get(0).toString(), "2014-06-07"); + } + Assert.assertEquals(t.get(1).toString(), i.toString()); + Assert.assertEquals(t.get(2).toString(), "event" + i); + } + } + + @Test public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException { //results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User Qux),(percent,64.7), @@ -707,7 +752,7 @@ public class ThriftColumnFamilyTest extends PigTestBase Iterator<Tuple> it = pig.openIterator("compokeys"); if (it.hasNext()) { Tuple t = it.next(); - Tuple key = (Tuple) t.get(0); + Tuple key = (Tuple) t.get(0); Assert.assertEquals(key.get(0), "clock"); Assert.assertEquals(key.get(1), 40L); DataBag columns = (DataBag) t.get(1); @@ -735,7 +780,7 @@ public class ThriftColumnFamilyTest extends PigTestBase count ++; if (count == 1) { - Tuple key = (Tuple) t.get(0); + Tuple key = (Tuple) t.get(0); Assert.assertEquals(key.get(0), "clock"); Assert.assertEquals(key.get(1), 10L); DataBag columns = (DataBag) t.get(1); @@ -749,7 +794,7 @@ public class ThriftColumnFamilyTest extends PigTestBase } else if (count == 2) { - Tuple key = (Tuple) t.get(0); + Tuple key = (Tuple) t.get(0); Assert.assertEquals(key.get(0), "clock"); Assert.assertEquals(key.get(1), 20L); DataBag columns = (DataBag) t.get(1); @@ -763,7 +808,7 @@ public class ThriftColumnFamilyTest extends PigTestBase } else if (count == 3) { - Tuple key = (Tuple) t.get(0); + Tuple key = (Tuple) t.get(0); Assert.assertEquals(key.get(0), "clock"); Assert.assertEquals(key.get(1), 30L); DataBag columns = (DataBag) t.get(1); @@ -777,7 +822,7 @@ public class ThriftColumnFamilyTest extends PigTestBase } else if (count == 4) { - Tuple key = (Tuple) t.get(0); + Tuple key = (Tuple) t.get(0); Assert.assertEquals(key.get(0), "clock"); Assert.assertEquals(key.get(1), 40L); DataBag columns = (DataBag) t.get(1);
