Updated Branches: refs/heads/cassandra-1.1 c5dc0292e -> 7d2ce5f95 refs/heads/cassandra-1.2 b69c1aa4c -> 9d0eec217 refs/heads/trunk 1d2c12242 -> 44f178d1e
Fix NPE in Pig's widerow mode. Patch by Sheetal Gorsani and Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-5488 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d2ce5f9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d2ce5f9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d2ce5f9 Branch: refs/heads/cassandra-1.1 Commit: 7d2ce5f957b1fb392617c1ff05a561571eccd593 Parents: c5dc029 Author: Brandon Williams <[email protected]> Authored: Tue May 21 11:08:50 2013 -0500 Committer: Brandon Williams <[email protected]> Committed: Tue May 21 11:08:50 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/pig/test/test_storage.pig | 2 +- .../cassandra/hadoop/pig/CassandraStorage.java | 23 ++++++-------- 3 files changed, 12 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7c89987..256e69a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ (CASSANDRA-5497) * fsync leveled manifest to avoid corruption (CASSANDRA-5535) * Fix Bound intersection computation (CASSANDRA-5551) + * Fix NPE in Pig's widerow mode (CASSANDRA-5488) 1.1.11 * Fix trying to load deleted row into row cache on startup (CASSANDRA-4463) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/examples/pig/test/test_storage.pig ---------------------------------------------------------------------- diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig index 026cb02..93dd91f 100644 --- a/examples/pig/test/test_storage.pig +++ b/examples/pig/test/test_storage.pig @@ -1,4 +1,4 @@ -rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage(); +rows = LOAD 'cassandra://PigTest/SomeApp?widerows=true' USING CassandraStorage(); -- full copy STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); -- single tuple http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d2ce5f9/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 55ccbb9..b681ee3 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -144,7 +144,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (tuple.size() == 0) // lastRow is a new one { key = (ByteBuffer)reader.getCurrentKey(); - addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class())); + tuple = addKeyToTuple(tuple, key, cfDef, parseType(cfDef.getKey_validation_class())); } for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { @@ -180,7 +180,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo key = (ByteBuffer)reader.getCurrentKey(); if (lastKey != null && !(key.equals(lastKey))) // last key only had one value { - addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); + tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); @@ -190,7 +190,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); return tuple; } - addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); + tuple = addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); } SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); if (lastRow != null) // prepend what was read last time @@ -233,7 +233,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it - Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); + Tuple tuple = addKeyToTuple(null, key, cfDef, parseType(cfDef.getKey_validation_class())); DefaultDataBag bag = new DefaultDataBag(); // we must add all the indexed columns first to match the schema @@ -292,15 +292,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return t; } - private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException - { - Tuple tuple = TupleFactory.getInstance().newTuple(1); - addKeyToTuple(tuple, key, cfDef, comparator); - return tuple; - } - - private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException + private Tuple addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException { + if( tuple == null ) + { + tuple = TupleFactory.getInstance().newTuple(1); + } if( comparator instanceof AbstractCompositeType ) { setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key)); @@ -309,7 +306,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key)); } - + return tuple; } private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
