Updated Branches: refs/heads/trunk 9d58b7158 -> 63a5e4ef8
Pig: fix widerow input with single column rows Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63a5e4ef Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63a5e4ef Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63a5e4ef Branch: refs/heads/trunk Commit: 63a5e4ef892d4a11edc0e273fc9440e7c43e944c Parents: 9d58b71 Author: Brandon Williams <[email protected]> Authored: Thu Oct 11 13:54:32 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Thu Oct 11 13:57:39 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/pig/CassandraStorage.java | 17 ++++++++++++++- 2 files changed, 17 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a5e4ef/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba146e5..d004f8f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -100,6 +100,7 @@ 1.1.6 * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782) * preflight check ttl for maximum of 20 years (CASSANDRA-4771) + * (Pig) fix widerow input with single column rows (CASSANDRA-4789) * Fix HH to compact with correct gcBefore, which avoids wiping out undelivered hints (CASSANDRA-4772) * LCS will merge up to 32 L0 sstables as intended (CASSANDRA-4778) http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a5e4ef/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 687af5f..1ea5382 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -111,10 +111,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private boolean widerows; private boolean usePartitionFilter; // wide row hacks + private ByteBuffer lastKey; private Map<ByteBuffer,IColumn> lastRow; private boolean hasNext = true; - public CassandraStorage() { this(1024); @@ -158,6 +158,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } + lastKey = null; lastRow = null; tuple.append(bag); return tuple; @@ -176,6 +177,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed { // read too much, hold on to it for next time + lastKey = (ByteBuffer)reader.getCurrentKey(); lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); // but return what we have so far tuple.append(bag); @@ -184,6 +186,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (key == null) // only set the key on the first iteration { key = (ByteBuffer)reader.getCurrentKey(); + if (lastKey != null && !(key.equals(lastKey))) // last key only had one value + { + tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset())); + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + tuple.append(bag); + lastKey = key; + lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + return tuple; + } tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); } SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); @@ -193,6 +207,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } + lastKey = null; lastRow = null; } for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
