Updated Branches: refs/heads/cassandra-1.1.0 5ccdc7f15 -> 015dc3fca
Pig wide row support. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/015dc3fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/015dc3fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/015dc3fc Branch: refs/heads/cassandra-1.1.0 Commit: 015dc3fca9c3117fb0268d6b8d201d7000568aab Parents: 5ccdc7f Author: Brandon Williams <[email protected]> Authored: Fri Apr 20 11:19:48 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Fri Apr 20 11:19:48 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/pig/CassandraStorage.java | 87 ++++++++++++++- 2 files changed, 87 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b19d118..e30004a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ * (cql) Fix type in CQL3 ALTER TABLE preventing update (CASSANDRA-4170) * (cql) Throw invalid exception from CQL3 on obsolete options (CASSANDRA-4171) * (cqlsh) fix recognizing uppercase SELECT keyword (CASSANDRA-4161) + * Pig: wide row support (CASSANDRA-3909) Merged from 1.0: * avoid streaming empty files with bulk loader if sstablewriter errors out (CASSANDRA-3946) http://git-wip-us.apache.org/repos/asf/cassandra/blob/015dc3fc/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index f10dde5..0609d11 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -78,9 +78,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; + public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; + private final static boolean DEFAULT_WIDEROW_INPUT = false; private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -100,6 +102,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private String inputFormatClass; private String outputFormatClass; private int limit; + private boolean widerows; + // wide row hacks + private Map<ByteBuffer,IColumn> lastRow; + private boolean hasNext = true; + public CassandraStorage() { @@ -119,10 +126,85 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { return limit; } + + public Tuple getNextWide() throws IOException + { + CfDef cfDef = getCfDef(loadSignature); + ByteBuffer key = null; + Tuple tuple = TupleFactory.getInstance().newTuple(); + DefaultDataBag bag = new DefaultDataBag(); + try + { + while(true) + { + hasNext = reader.nextKeyValue(); + if (!hasNext) + { + if (lastRow != null) + { + if (tuple.size() == 0) // lastRow is a new one + { + key = (ByteBuffer)reader.getCurrentKey(); + tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + } + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + lastRow = null; + tuple.append(bag); + return tuple; + } + else + { + if (tuple.size() == 1) // rare case of just one wide row, key already set + { + tuple.append(bag); + return tuple; + } + else + return null; + } + } + if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed + { + // read too much, hold on to it for next time + lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + // but return what we have so far + tuple.append(bag); + return tuple; + } + if (key == null) // only set the key on the first iteration + { + key = (ByteBuffer)reader.getCurrentKey(); + tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + } + SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + if (lastRow != null) // prepend what was read last time + { + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + lastRow = null; + } + for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + } + } + catch (InterruptedException e) + { + throw new IOException(e.getMessage()); + } + } @Override public Tuple getNext() throws IOException { + if (widerows) + return getNextWide(); try { // load the next pair @@ -423,7 +505,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo SlicePredicate predicate = new SlicePredicate().setSlice_range(range); ConfigHelper.setInputSlicePredicate(conf, predicate); } - ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); + widerows = DEFAULT_WIDEROW_INPUT; + if (System.getenv(PIG_WIDEROW_INPUT) != null) + widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT)); + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); setConnectionInformation(); if (ConfigHelper.getInputRpcPort(conf) == 0)
