Updated Branches:
  refs/heads/trunk 9d58b7158 -> 63a5e4ef8

Pig: fix widerow input with single column rows
Patch by Will Oberman, reviewed by brandonwilliams for CASSANDRA-4789


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/63a5e4ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/63a5e4ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/63a5e4ef

Branch: refs/heads/trunk
Commit: 63a5e4ef892d4a11edc0e273fc9440e7c43e944c
Parents: 9d58b71
Author: Brandon Williams <[email protected]>
Authored: Thu Oct 11 13:54:32 2012 -0500
Committer: Brandon Williams <[email protected]>
Committed: Thu Oct 11 13:57:39 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/pig/CassandraStorage.java     |   17 ++++++++++++++-
 2 files changed, 17 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a5e4ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba146e5..d004f8f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -100,6 +100,7 @@
 1.1.6
  * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)
  * preflight check ttl for maximum of 20 years (CASSANDRA-4771)
+ * (Pig) fix widerow input with single column rows (CASSANDRA-4789)
  * Fix HH to compact with correct gcBefore, which avoids wiping out
    undelivered hints (CASSANDRA-4772)
  * LCS will merge up to 32 L0 sstables as intended (CASSANDRA-4778)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/63a5e4ef/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 687af5f..1ea5382 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -111,10 +111,10 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     private boolean widerows;
     private boolean usePartitionFilter;
     // wide row hacks
+    private ByteBuffer lastKey;
     private Map<ByteBuffer,IColumn> lastRow;
     private boolean hasNext = true;
 
-
     public CassandraStorage()
     {
         this(1024);
@@ -158,6 +158,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                         {
                             bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                         }
+                        lastKey = null;
                         lastRow = null;
                         tuple.append(bag);
                         return tuple;
@@ -176,6 +177,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                 if (key != null && 
!((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed
                 {
                     // read too much, hold on to it for next time
+                    lastKey = (ByteBuffer)reader.getCurrentKey();
                     lastRow = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
                     // but return what we have so far
                     tuple.append(bag);
@@ -184,6 +186,18 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                 if (key == null) // only set the key on the first iteration
                 {
                     key = (ByteBuffer)reader.getCurrentKey();
+                    if (lastKey != null && !(key.equals(lastKey))) // last key 
only had one value
+                    {
+                        tuple.append(new DataByteArray(lastKey.array(), 
lastKey.position()+lastKey.arrayOffset(), 
lastKey.limit()+lastKey.arrayOffset()));
+                        for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
+                        {
+                            bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
+                        }
+                        tuple.append(bag);
+                        lastKey = key;
+                        lastRow = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+                        return tuple;
+                    }
                     tuple.append(new DataByteArray(key.array(), 
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
                 }
                 SortedMap<ByteBuffer,IColumn> row = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
@@ -193,6 +207,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                     {
                         bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                     }
+                    lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet())

Reply via email to