Merge branch 'cassandra-1.2' into trunk

Conflicts:
        src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/314c8e85
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/314c8e85
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/314c8e85

Branch: refs/heads/trunk
Commit: 314c8e85de1ef65dd060bf113cc3c1a4be939f4c
Parents: 5891079 e771b07
Author: Sylvain Lebresne <[email protected]>
Authored: Sun May 26 13:19:16 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Sun May 26 13:19:16 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/pig/CassandraStorage.java     |   34 ++++++++++-----
 2 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/314c8e85/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/314c8e85/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 03e225b,0854758..b5c596e
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -148,9 -150,9 +151,9 @@@ public class CassandraStorage extends L
                          if (tuple.size() == 0) // lastRow is a new one
                          {
                              key = (ByteBuffer)reader.getCurrentKey();
-                             tuple = addKeyToTuple(tuple, key, cfDef, 
parseType(cfDef.getKey_validation_class()));
+                             tuple = keyToTuple(key, cfDef, 
parseType(cfDef.getKey_validation_class()));
                          }
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                          {
                              bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                          }
@@@ -184,22 -186,28 +187,28 @@@
                      key = (ByteBuffer)reader.getCurrentKey();
                      if (lastKey != null && !(key.equals(lastKey))) // last 
key only had one value
                      {
-                         tuple = addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
+                         if (tuple == null)
+                             tuple = keyToTuple(lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
+                         else
+                             addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
 -                        for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                        for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                          {
                              bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                          }
                          tuple.append(bag);
                          lastKey = key;
 -                        lastRow = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
 +                        lastRow = 
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
                          return tuple;
                      }
-                     tuple = addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
+                     if (tuple == null)
+                         tuple = keyToTuple(key, cfDef, 
parseType(cfDef.getKey_validation_class()));
+                     else
+                         addKeyToTuple(tuple, lastKey, cfDef, 
parseType(cfDef.getKey_validation_class()));
                  }
 -                SortedMap<ByteBuffer,IColumn> row = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
 +                SortedMap<ByteBuffer,Column> row = 
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
                  if (lastRow != null) // prepend what was read last time
                  {
 -                    for (Map.Entry<ByteBuffer, IColumn> entry : 
lastRow.entrySet())
 +                    for (Map.Entry<ByteBuffer, Column> entry : 
lastRow.entrySet())
                      {
                          bag.add(columnToTuple(entry.getValue(), cfDef, 
parseType(cfDef.getComparator_type())));
                      }
@@@ -310,10 -321,10 +322,10 @@@
          {
              setTupleValue(tuple, 0, 
getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
          }
-         return tuple;
+ 
      }
  
 -    private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType 
comparator) throws IOException
 +    private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType 
comparator) throws IOException
      {
          Tuple pair = TupleFactory.getInstance().newTuple(2);
  

Reply via email to