Wide row support for pig. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3909
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6302d6d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6302d6d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6302d6d Branch: refs/heads/trunk Commit: a6302d6d7dd35d6b59e9911850f6bd2e69da088b Parents: ac21a55 Author: Brandon Williams <[email protected]> Authored: Mon Apr 16 17:41:08 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Apr 16 17:41:08 2012 -0500 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 87 ++++++++++++++- 1 files changed, 86 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a6302d6d/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index bcc0c79..aaac4ef 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -78,9 +78,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; + public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; + private final static boolean DEFAULT_WIDEROW_INPUT = false; private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -100,6 +102,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private String inputFormatClass; private String outputFormatClass; private int limit; + private boolean widerows; + // wide row hacks + private Map<ByteBuffer,IColumn> lastRow; + private boolean hasNext = true; + public CassandraStorage() { @@ -119,10 +126,85 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { return limit; } + + public Tuple getNextWide() throws IOException + { + CfDef cfDef = getCfDef(loadSignature); + ByteBuffer key = null; + Tuple tuple = TupleFactory.getInstance().newTuple(); + DefaultDataBag bag = new DefaultDataBag(); + try + { + while(true) + { + hasNext = reader.nextKeyValue(); + if (!hasNext) + { + if (lastRow != null) + { + if (tuple.size() == 0) // lastRow is a new one + { + key = (ByteBuffer)reader.getCurrentKey(); + tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + } + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + lastRow = null; + tuple.append(bag); + return tuple; + } + else + { + if (tuple.size() == 1) // rare case of just one wide row, key already set + { + tuple.append(bag); + return tuple; + } + else + return null; + } + } + if (key != null && !((ByteBuffer)reader.getCurrentKey()).equals(key)) // key changed + { + // read too much, hold on to it for next time + lastRow = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + // but return what we have so far + tuple.append(bag); + return tuple; + } + if (key == null) // only set the key on the first iteration + { + key = (ByteBuffer)reader.getCurrentKey(); + tuple.append(new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + } + SortedMap<ByteBuffer,IColumn> row = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + if (lastRow != null) // prepend what was read last time + { + for (Map.Entry<ByteBuffer, IColumn> entry : lastRow.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + lastRow = null; + } + for (Map.Entry<ByteBuffer, IColumn> entry : row.entrySet()) + { + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + } + } + } + catch (InterruptedException e) + { + throw new IOException(e.getMessage()); + } + } @Override public Tuple getNext() throws IOException { + if (widerows) + return getNextWide(); try { // load the next pair @@ -424,7 +506,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo SlicePredicate predicate = new SlicePredicate().setSlice_range(range); ConfigHelper.setInputSlicePredicate(conf, predicate); } - ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); + widerows = DEFAULT_WIDEROW_INPUT; + if (System.getenv(PIG_WIDEROW_INPUT) != null) + widerows = Boolean.valueOf(System.getProperty(PIG_WIDEROW_INPUT)); + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); setConnectionInformation(); if (ConfigHelper.getInputRpcPort(conf) == 0)
