Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f178d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f178d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f178d1
Branch: refs/heads/trunk
Commit: 44f178d1e07eaa94f111ef7811c4de3b14484b85
Parents: 1d2c122 9d0eec2
Author: Brandon Williams <[email protected]>
Authored: Tue May 21 11:14:23 2013 -0500
Committer: Brandon Williams <[email protected]>
Committed: Tue May 21 11:14:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
examples/pig/test/test_storage.pig | 2 +-
.../cassandra/hadoop/pig/CassandraStorage.java | 23 ++++++--------
3 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f178d1/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f178d1/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index d4fb577,76feb5a..ba87c42
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@@ -146,9 -147,9 +146,9 @@@ public class CassandraStorage extends L
if (tuple.size() == 0) // lastRow is a new one
{
key = (ByteBuffer)reader.getCurrentKey();
- addKeyToTuple(tuple, key, cfDef,
parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, key, cfDef,
parseType(cfDef.getKey_validation_class()));
}
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
@@@ -182,22 -183,22 +182,22 @@@
key = (ByteBuffer)reader.getCurrentKey();
if (lastKey != null && !(key.equals(lastKey))) // last
key only had one value
{
- addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
tuple.append(bag);
lastKey = key;
- lastRow =
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ lastRow =
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
return tuple;
}
- addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
+ tuple = addKeyToTuple(tuple, lastKey, cfDef,
parseType(cfDef.getKey_validation_class()));
}
- SortedMap<ByteBuffer,IColumn> row =
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
+ SortedMap<ByteBuffer,Column> row =
(SortedMap<ByteBuffer,Column>)reader.getCurrentValue();
if (lastRow != null) // prepend what was read last time
{
- for (Map.Entry<ByteBuffer, IColumn> entry :
lastRow.entrySet())
+ for (Map.Entry<ByteBuffer, Column> entry :
lastRow.entrySet())
{
bag.add(columnToTuple(entry.getValue(), cfDef,
parseType(cfDef.getComparator_type())));
}
@@@ -311,10 -309,10 +308,10 @@@
{
setTupleValue(tuple, 0,
getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
}
-
+ return tuple;
}
- private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType
comparator) throws IOException
+ private Tuple columnToTuple(Column col, CfDef cfDef, AbstractType
comparator) throws IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);