Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/314c8e85
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/314c8e85
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/314c8e85
Branch: refs/heads/trunk
Commit: 314c8e85de1ef65dd060bf113cc3c1a4be939f4c
Parents: 5891079 e771b07
Author: Sylvain Lebresne <[email protected]>
Authored: Sun May 26 13:19:16 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Sun May 26 13:19:16 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/pig/CassandraStorage.java | 34 ++++++++++-----
2 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/314c8e85/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/314c8e85/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 03e225b,0854758..b5c596e
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -148,9 -150,9 +151,9 @@@ public class CassandraStorage extends L
if (tuple.size() == 0) // lastRow is a new one
{
key = (ByteBuffer)reader.getCurrentKey();
- tuple = addKeyToTuple(tuple, key, cfDef,
parseType(cfDef.getKey_validation_class()));
+ tuple = keyToTuple(key, cfDef,
parseType(cfDef.getKey_validation_class()));
}
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
@@@ -184,22 -186,28 +187,28 @@@
key = (ByteBuffer)reader.getCurrentKey();
if (lastKey != null && !(key.equals(lastKey))) // last
key only had one value
{
- tuple = addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
+ if (tuple == null)
+ tuple = keyToTuple(lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
+ else
+ addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
- lastRow =
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ lastRow =
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
return tuple;
}
- tuple = addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
+ if (tuple == null)
+ tuple = keyToTuple(key, cfDef,
parseType(cfDef.getKey_validation_class()));
+ else
+ addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
}
- SortedMap<ByteBuffer,IColumn> row =
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ SortedMap<ByteBuffer,Column> row =
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
@@@ -310,10 -321,10 +322,10 @@@
{
setTupleValue(tuple, 0,
getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
}
- return tuple;
+
}
- private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType
comparator) throws IOException
+ private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType
comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);