Add more data type mappings for pig. Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6128
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/538039a7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/538039a7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/538039a7 Branch: refs/heads/trunk Commit: 538039a7001a4db0ff87dafbfe0be2877310b14f Parents: b966e1a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Oct 7 13:57:45 2013 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Oct 7 14:06:29 2013 -0500 ---------------------------------------------------------------------- .../hadoop/pig/AbstractCassandraStorage.java | 31 +++++++++++++++----- .../cassandra/hadoop/pig/CassandraStorage.java | 2 +- .../apache/cassandra/hadoop/pig/CqlStorage.java | 7 ++--- 3 files changed, 26 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/538039a7/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 1e207b3..0766adf 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -110,7 +110,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store List<CompositeComponent> result = comparator.deconstruct(name); Tuple t = TupleFactory.getInstance().newTuple(result.size()); for (int i=0; i<result.size(); i++) - setTupleValue(t, i, result.get(i).comparator.compose(result.get(i).value)); + setTupleValue(t, i, cassandraToObj(result.get(i).comparator, result.get(i).value)); return t; } @@ -124,17 +124,16 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store if(comparator instanceof AbstractCompositeType) setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name())); else - setTupleValue(pair, 0, comparator.compose(col.name())); + setTupleValue(pair, 0, cassandraToObj(comparator, col.name())); // value - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); if (validators.get(col.name()) == null) { Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value())); + setTupleValue(pair, 1, cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value())); } else - setTupleValue(pair, 1, validators.get(col.name()).compose(col.value())); + setTupleValue(pair, 1, cassandraToObj(validators.get(col.name()), col.value())); return pair; } @@ -313,9 +312,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store return DataType.LONG; else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger return DataType.INTEGER; - else if (type instanceof AsciiType) - return DataType.CHARARRAY; - else if (type instanceof UTF8Type) + else if (type instanceof AsciiType || + type instanceof UTF8Type || + type instanceof DecimalType || + type instanceof InetAddressType || + type instanceof LexicalUUIDType || + type instanceof UUIDType ) return DataType.CHARARRAY; else if (type instanceof FloatType) return DataType.FLOAT; @@ -758,5 +760,18 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } return null; } + + protected Object cassandraToObj(AbstractType validator, ByteBuffer value) + { + if (validator instanceof DecimalType || + validator instanceof InetAddressType || + validator instanceof LexicalUUIDType || + validator instanceof UUIDType) + { + return validator.getString(value); + } + else + return validator.compose(value); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/538039a7/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index c9afff0..4083236 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -725,7 +725,7 @@ public class CassandraStorage extends AbstractCassandraStorage } else { - setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key)); + setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/538039a7/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index b96d10e..50ee6b7 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -188,12 +188,9 @@ public class CqlStorage extends AbstractCassandraStorage // standard Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); if (validators.get(col.name()) == null) - { - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - return marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()); - } + return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value()); else - return validators.get(col.name()).compose(col.value()); + return cassandraToObj(validators.get(col.name()), col.value()); } /** set read configuration settings */