Updated Branches: refs/heads/cassandra-1.0 f4064b5ff -> 6173fa9e3
(Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6173fa9e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6173fa9e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6173fa9e Branch: refs/heads/cassandra-1.0 Commit: 6173fa9e3443909474e14f5695e705df5b10dbcc Parents: f4064b5 Author: Pavel Yaskevich <xe...@apache.org> Authored: Tue Jan 31 23:24:27 2012 +0200 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Wed Feb 1 00:48:26 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../cassandra/hadoop/pig/CassandraStorage.java | 45 ++++++++++----- 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6173fa9e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d32eba8..468dbcf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -54,7 +54,8 @@ Merged from 0.8: * use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled (CASSANDRA-3696) * block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733) - + * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily + case (CASSANDRA-3251) 1.0.6 * (CQL) fix cqlsh support for replicate_on_write (CASSANDRA-3596) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6173fa9e/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 4513783..1a594ce 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -128,7 +128,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) { - columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef)); + columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } tuple.set(1, new DefaultDataBag(columns)); @@ -140,29 +140,31 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } } - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException + private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - setTupleValue(pair, 0, marshallers.get(0).compose(name)); + setTupleValue(pair, 0, comparator.compose(col.name())); if (col instanceof Column) { // standard - if (validators.get(name) == null) + if (validators.get(col.name()) == null) setTupleValue(pair, 1, marshallers.get(1).compose(col.value())); else - setTupleValue(pair, 1, validators.get(name).compose(col.value())); + setTupleValue(pair, 1, validators.get(col.name()).compose(col.value())); return pair; } + else + { + // super + ArrayList<Tuple> subcols = new ArrayList<Tuple>(); + for (IColumn subcol : col.getSubColumns()) + subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type()))); - // super - ArrayList<Tuple> subcols = new ArrayList<Tuple>(); - for (IColumn subcol : col.getSubColumns()) - subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); - - pair.set(1, new DefaultDataBag(subcols)); + pair.set(1, new DefaultDataBag(subcols)); + } return pair; } @@ -188,12 +190,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException { ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); - AbstractType comparator = null; - AbstractType default_validator = null; - AbstractType key_validator = null; + AbstractType comparator; + AbstractType subcomparator; + AbstractType default_validator; + AbstractType key_validator; try { comparator = TypeParser.parse(cfDef.getComparator_type()); + subcomparator = TypeParser.parse(cfDef.getSubcomparator_type()); default_validator = TypeParser.parse(cfDef.getDefault_validation_class()); key_validator = TypeParser.parse(cfDef.getKey_validation_class()); } @@ -205,6 +209,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo marshallers.add(comparator); marshallers.add(default_validator); marshallers.add(key_validator); + marshallers.add(subcomparator); return marshallers; } @@ -230,6 +235,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return validators; } + private AbstractType parseType(String type) throws IOException + { + try + { + return TypeParser.parse(type); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + @Override public InputFormat getInputFormat() {