Pig: support for composite row keys, writing composites Patch by Dirkjan Bussink, reviewed by brandonwilliams for CASSANDRA-4144
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f6cc5ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f6cc5ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f6cc5ef Branch: refs/heads/trunk Commit: 7f6cc5ef6cad831061dc5cb75f5623f949ab7ca2 Parents: 962b23b Author: Brandon Williams <[email protected]> Authored: Fri Jul 13 10:38:15 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Fri Jul 13 10:38:15 2012 -0500 ---------------------------------------------------------------------- examples/pig/test/populate-cli.txt | 20 +++++++ examples/pig/test/test_storage.pig | 17 +++++- .../cassandra/hadoop/pig/CassandraStorage.java | 44 +++++++++++++- 3 files changed, 76 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/populate-cli.txt ---------------------------------------------------------------------- diff --git a/examples/pig/test/populate-cli.txt b/examples/pig/test/populate-cli.txt index 7ec6cd6..1f59642 100644 --- a/examples/pig/test/populate-cli.txt +++ b/examples/pig/test/populate-cli.txt @@ -112,3 +112,23 @@ set CompoInt['clock']['1:0'] = 'z'; set CompoInt['clock']['1:30'] = 'zzzz'; set CompoInt['clock']['2:30'] = 'daddy?'; set CompoInt['clock']['6:30'] = 'coffee...'; + +create column family CompoIntCopy + with key_validation_class = UTF8Type + and default_validation_class = UTF8Type + and comparator = 'CompositeType(LongType,LongType)'; + +create column family CompoKey + with key_validation_class = 'CompositeType(UTF8Type,LongType)' + and default_validation_class = UTF8Type + and comparator = LongType; + +set CompoKey['clock:10']['1'] = 'z'; +set CompoKey['clock:20']['1'] = 'zzzz'; +set CompoKey['clock:30']['2'] = 'daddy?'; +set CompoKey['clock:40']['6'] = 'coffee...'; + +create column family CompoKeyCopy + with key_validation_class = 'CompositeType(UTF8Type,LongType)' + and default_validation_class = UTF8Type + and comparator = LongType; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/examples/pig/test/test_storage.pig ---------------------------------------------------------------------- diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig index a0157f7..026cb02 100644 --- a/examples/pig/test/test_storage.pig +++ b/examples/pig/test/test_storage.pig @@ -67,4 +67,19 @@ night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 -- What happens at the darkest hour? darkest = filter night by hour > 2 and hour < 5; -dump darkest; \ No newline at end of file +dump darkest; + +compo_int_rows = LOAD 'cassandra://PigTest/CompoInt' USING CassandraStorage(); +STORE compo_int_rows INTO 'cassandra://PigTest/CompoIntCopy' USING CassandraStorage(); + +-- +-- Test CompositeKey +-- + +compokeys = load 'cassandra://PigTest/CompoKey' using CassandraStorage(); +compokeys = filter compokeys by key.$1 == 40; + +dump compokeys; + +compo_key_rows = LOAD 'cassandra://PigTest/CompoKey' USING CassandraStorage(); +STORE compo_key_rows INTO 'cassandra://PigTest/CompoKeyCopy' USING CassandraStorage(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f6cc5ef/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 5742cb9..454330c 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -131,7 +131,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { return limit; } - + public Tuple getNextWide() throws IOException { CfDef cfDef = getCfDef(loadSignature); @@ -223,10 +223,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it - Tuple tuple = TupleFactory.getInstance().newTuple(1); + + Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); DefaultDataBag bag = new DefaultDataBag(); - // set the key - setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key)); + // we must add all the indexed columns first to match the schema Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>(); // take care to iterate these in the same order as the schema does @@ -283,6 +283,20 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return t; } + private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + { + Tuple tuple = TupleFactory.getInstance().newTuple(1); + if( comparator instanceof AbstractCompositeType ) + { + setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); + } + else + { + setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(2).compose(key)); + } + return tuple; + } + private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); @@ -825,6 +839,28 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return DoubleType.instance.decompose((Double)o); if (o instanceof UUID) return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); + if(o instanceof Tuple) { + List<Object> objects = ((Tuple)o).getAll(); + List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); + int totalLength = 0; + for(Object sub : objects) + { + ByteBuffer buffer = objToBB(sub); + serialized.add(buffer); + totalLength += 2 + buffer.remaining() + 1; + } + ByteBuffer out = ByteBuffer.allocate(totalLength); + for (ByteBuffer bb : serialized) + { + int length = bb.remaining(); + out.put((byte) ((length >> 8) & 0xFF)); + out.put((byte) (length & 0xFF)); + out.put(bb); + out.put((byte) 0); + } + out.flip(); + return out; + } return ByteBuffer.wrap(((DataByteArray) o).get()); }
