Updated Branches: refs/heads/cassandra-0.8 9ecaa2a5c -> dbd8ced10
(Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily case patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-3251 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbd8ced1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbd8ced1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbd8ced1 Branch: refs/heads/cassandra-0.8 Commit: dbd8ced107fad720c416d7e4919cb33884378a02 Parents: 9ecaa2a Author: Pavel Yaskevich <[email protected]> Authored: Tue Jan 31 23:24:27 2012 +0200 Committer: Pavel Yaskevich <[email protected]> Committed: Wed Feb 1 00:39:37 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../cassandra/hadoop/pig/CassandraStorage.java | 45 ++++++++++----- 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64b2747..a6a724f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,7 +3,8 @@ * use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled (CASSANDRA-3696) * block on flush before compacting hints (may prevent OOM) (CASSANDRA-3733) - + * (Pig) fix CassandraStorage to use correct comparator in Super ColumnFamily + case (CASSANDRA-3251) 0.8.9 * avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbd8ced1/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index fd0959d..a0dec20 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -128,7 +128,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) { - columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef)); + columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } tuple.set(1, new DefaultDataBag(columns)); @@ -140,29 +140,31 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } } - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException + private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - setTupleValue(pair, 0, marshallers.get(0).compose(name)); + setTupleValue(pair, 0, comparator.compose(col.name())); if (col instanceof Column) { // standard - if (validators.get(name) == null) + if (validators.get(col.name()) == null) setTupleValue(pair, 1, marshallers.get(1).compose(col.value())); else - setTupleValue(pair, 1, validators.get(name).compose(col.value())); + setTupleValue(pair, 1, validators.get(col.name()).compose(col.value())); return pair; } + else + { + // super + ArrayList<Tuple> subcols = new ArrayList<Tuple>(); + for (IColumn subcol : col.getSubColumns()) + subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type()))); - // super - ArrayList<Tuple> subcols = new ArrayList<Tuple>(); - for (IColumn subcol : col.getSubColumns()) - subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); - - pair.set(1, new DefaultDataBag(subcols)); + pair.set(1, new DefaultDataBag(subcols)); + } return pair; } @@ -188,12 +190,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException { ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); - AbstractType comparator = null; - AbstractType default_validator = null; - AbstractType key_validator = null; + AbstractType comparator; + AbstractType subcomparator; + AbstractType default_validator; + AbstractType key_validator; try { comparator = TypeParser.parse(cfDef.getComparator_type()); + subcomparator = TypeParser.parse(cfDef.getSubcomparator_type()); default_validator = TypeParser.parse(cfDef.getDefault_validation_class()); key_validator = TypeParser.parse(cfDef.getKey_validation_class()); } @@ -205,6 +209,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo marshallers.add(comparator); marshallers.add(default_validator); marshallers.add(key_validator); + marshallers.add(subcomparator); return marshallers; } @@ -230,6 +235,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return validators; } + private AbstractType parseType(String type) throws IOException + { + try + { + return TypeParser.parse(type); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + @Override public InputFormat getInputFormat() {
