Pig: fix widerow input with single column rows Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4355aa87 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4355aa87 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4355aa87 Branch: refs/heads/cassandra-1.1 Commit: 4355aa877f29b653614cb801a2b57861f4cef428 Parents: 178c934 Author: Brandon Williams <[email protected]> Authored: Thu Oct 11 13:54:32 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Thu Oct 11 13:58:16 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/pig/CassandraStorage.java | 17 ++++++++++++++- 2 files changed, 17 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ac3a157..40cd7b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 1.1.6 * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782) * preflight check ttl for maximum of 20 years (CASSANDRA-4771) + * (Pig) fix widerow input with single column rows (CASSANDRA-4789) * Fix HH to compact with correct gcBefore, which avoids wiping out undelivered hints (CASSANDRA-4772) * LCS will merge up to 32 L0 sstables as intended (CASSANDRA-4778) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4355aa87/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 434ca7f..49d8eac 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -109,10 +109,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private boolean widerows; private boolean usePartitionFilter; // wide row hacks + private ByteBuffer lastKey; private Map<ByteBuffer,IColumn> lastRow; private boolean hasNext = true; - public CassandraStorage() { this(1024); @@ -156,6 +156,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } + lastKey = null; lastRow = null; tuple.append(bag); return tuple; @@ -174,6 +175,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed { // read too much, hold on to it for next time + lastKey = (ByteBuffer)reader.getCurrentKey(); lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); // but return what we have so far tuple.append(bag); @@ -182,6 +184,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (key == null) // only set the key on the first iteration { key = (ByteBuffer)reader.getCurrentKey(); + if (lastKey != null && !(key.equals(lastKey))) // last key only had one value + { + tuple.append(new DataByteArray(lastKey.array(), lastKey.position()+lastKey.arrayOffset(), lastKey.limit()+lastKey.arrayOffset())); + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + tuple.append(bag); + lastKey = key; + lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + return tuple; + } tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); } SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); @@ -191,6 +205,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } + lastKey = null; lastRow = null; } for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())
