Author: brandonwilliams Date: Mon Apr 4 21:53:09 2011 New Revision: 1088800
URL: http://svn.apache.org/viewvc?rev=1088800&view=rev Log: Pig uses schema information to cast to/from native types. Patch by Jeremy Hanna and brandonwilliams, reviewed by brandonwilliams for CASSANDRA-2387 Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Mon Apr 4 21:53:09 2011 @@ -20,6 +20,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,9 +30,8 @@ import org.apache.commons.logging.LogFac import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.SuperColumn; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.hadoop.*; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; import org.apache.cassandra.avro.Mutation; import org.apache.cassandra.avro.Deletion; import org.apache.cassandra.avro.ColumnOrSuperColumn; @@ -44,6 +46,14 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.UDFContext; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * A LoadFunc wrapping ColumnFamilyInputFormat. @@ -58,6 +68,8 @@ public class CassandraStorage extends Lo public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + private static String UDFCONTEXT_SCHEMA_KEY = "schema"; + private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -72,8 +84,8 @@ public class CassandraStorage extends Lo private RecordWriter writer; private int limit; - public CassandraStorage() - { + public CassandraStorage() + { this(1024); } @@ -100,19 +112,20 @@ public class CassandraStorage extends Lo if (!reader.nextKeyValue()) return null; + CfDef cfDef = getCfDef(); ByteBuffer key = (ByteBuffer)reader.getCurrentKey(); SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); assert key != null && cf != null; // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(2); + Tuple tuple = TupleFactory.getInstance().newTuple(2); ArrayList<Tuple> columns = new ArrayList<Tuple>(); tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) - { - columns.add(columnToTuple(entry.getKey(), entry.getValue())); + { + columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef)); } - + tuple.set(1, new DefaultDataBag(columns)); return tuple; } @@ -122,28 +135,85 @@ public class CassandraStorage extends Lo } } - private Tuple columnToTuple(ByteBuffer name, IColumn col) throws IOException + private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException { Tuple pair = TupleFactory.getInstance().newTuple(2); - pair.set(0, new DataByteArray(name.array(), name.position()+name.arrayOffset(), name.limit()+name.arrayOffset())); + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); + if (col instanceof Column) { // standard - pair.set(1, new DataByteArray(col.value().array(), - col.value().position()+col.value().arrayOffset(), - col.value().limit()+col.value().arrayOffset())); + pair.set(0, marshallers.get(0).compose(name)); + if (validators.get(name) == null) + // Have to special case BytesType because compose returns a ByteBuffer + if (marshallers.get(1) instanceof BytesType) + pair.set(1, new DataByteArray(ByteBufferUtil.getArray(col.value()))); + else + pair.set(1, marshallers.get(1).compose(col.value())); + else + pair.set(1, validators.get(name).compose(col.value())); return pair; } // super ArrayList<Tuple> subcols = new ArrayList<Tuple>(); for (IColumn subcol : ((SuperColumn)col).getSubColumns()) - subcols.add(columnToTuple(subcol.name(), subcol)); + subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); pair.set(1, new DefaultDataBag(subcols)); return pair; } + private CfDef getCfDef() + { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(ResourceSchema.class); + return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY)); + } + + private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException + { + ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>(); + AbstractType comparator = null; + AbstractType default_validator = null; + try + { + comparator = FBUtilities.getInstance(cfDef.comparator_type, "comparator"); + default_validator = FBUtilities.getInstance(cfDef.default_validation_class, "validator"); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + + marshallers.add(comparator); + marshallers.add(default_validator); + return marshallers; + } + + private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws IOException + { + Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); + for (ColumnDef cd : cfDef.column_metadata) + { + if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) + { + AbstractType validator = null; + try + { + validator = FBUtilities.getInstance(cd.getValidation_class(), "validator"); + validators.put(cd.name, validator); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + } + } + return validators; + } + @Override public InputFormat getInputFormat() { @@ -156,7 +226,7 @@ public class CassandraStorage extends Lo this.reader = reader; } - private void setLocationFromUri(String location) throws IOException + private void setLocationFromUri(String location) throws IOException { // parse uri into keyspace and columnfamily String names[]; @@ -219,6 +289,7 @@ public class CassandraStorage extends Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); + initSchema(); } @Override @@ -274,7 +345,9 @@ public class CassandraStorage extends Lo ByteBuffer key = objToBB(t.get(0)); DefaultDataBag pairs = (DefaultDataBag) t.get(1); ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); - + CfDef cfDef = getCfDef(); + List<AbstractType> marshallers = getDefaultMarshallers(cfDef); + Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); try { for (Tuple pair : pairs) @@ -306,7 +379,7 @@ public class CassandraStorage extends Lo mutation.column_or_supercolumn.super_column = sc; } } - else // assume column since it could be anything else + else // assume column since it couldn't be anything else { if (pair.get(1) == null) { @@ -318,8 +391,15 @@ public class CassandraStorage extends Lo else { org.apache.cassandra.avro.Column column = new org.apache.cassandra.avro.Column(); - column.name = objToBB(pair.get(0)); - column.value = objToBB(pair.get(1)); + column.name = marshallers.get(0).decompose((pair.get(0))); + if (validators.get(column.name) == null) + // Have to special case BytesType to convert DataByteArray into ByteBuffer + if (marshallers.get(1) instanceof BytesType) + column.value = ByteBuffer.wrap(((DataByteArray) pair.get(1)).get()); + else + column.value = marshallers.get(1).decompose(pair.get(1)); + else + column.value = validators.get(column.name).decompose(pair.get(1)); column.timestamp = System.currentTimeMillis() * 1000; mutation.column_or_supercolumn = new ColumnOrSuperColumn(); mutation.column_or_supercolumn.column = column; @@ -358,4 +438,92 @@ public class CassandraStorage extends Lo return new RequiredFieldResponse(true); } + + /* Methods to get the column family schema from Cassandra */ + + private void initSchema() + { + Cassandra.Client client = null; + try + { + client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true); + CfDef cfDef = null; + client.set_keyspace(keyspace); + KsDef ksDef = client.describe_keyspace(keyspace); + List<CfDef> defs = ksDef.getCf_defs(); + for (CfDef def : defs) + { + if (column_family.equalsIgnoreCase(def.getName())) + { + cfDef = def; + break; + } + } + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(ResourceSchema.class); + property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + catch (InvalidRequestException e) + { + throw new RuntimeException(e); + } + catch (NotFoundException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException + { + TSocket socket = new TSocket(host, port); + TTransport trans = framed ? new TFramedTransport(socket) : socket; + try + { + trans.open(); + } + catch (TTransportException e) + { + throw new IOException("unable to connect to server", e); + } + return new Cassandra.Client(new TBinaryProtocol(trans)); + } + + private static String cfdefToString(CfDef cfDef) + { + assert cfDef != null; + // this is so awful it's kind of cool! + TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); + try + { + return FBUtilities.bytesToHex(serializer.serialize(cfDef)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + } + + private static CfDef cfdefFromString(String st) + { + assert st != null; + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + CfDef cfDef = new CfDef(); + try + { + deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st)); + } + catch (TException e) + { + throw new RuntimeException(e); + } + return cfDef; + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AbstractType.java Mon Apr 4 21:53:09 2011 @@ -36,7 +36,7 @@ import static org.apache.cassandra.io.ss * should always handle those values even if they normally do not * represent a valid ByteBuffer for the type being compared. */ -public abstract class AbstractType implements Comparator<ByteBuffer> +public abstract class AbstractType<T> implements Comparator<ByteBuffer> { public final Comparator<IndexInfo> indexComparator; public final Comparator<IndexInfo> indexReverseComparator; @@ -108,6 +108,10 @@ public abstract class AbstractType imple throw new UnsupportedOperationException(); } + public abstract T compose(ByteBuffer bytes); + + public abstract ByteBuffer decompose(T value); + /* validate that the byte array is a valid sequence for the type we are supposed to be comparing */ public abstract void validate(ByteBuffer bytes) throws MarshalException; Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/AsciiType.java Mon Apr 4 21:53:09 2011 @@ -28,12 +28,32 @@ import com.google.common.base.Charsets; import org.apache.cassandra.utils.ByteBufferUtil; -public class AsciiType extends BytesType +public class AsciiType extends AbstractType<String> { public static final AsciiType instance = new AsciiType(); + public static AsciiType getInstance() + { + return instance; + } + AsciiType() {} // singleton + public String compose(ByteBuffer bytes) + { + return getString(bytes); + } + + public ByteBuffer decompose(String value) + { + return ByteBufferUtil.bytes(value, Charsets.US_ASCII); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + return BytesType.bytesCompare(o1, o2); + } + @Override public String getString(ByteBuffer bytes) { @@ -49,7 +69,7 @@ public class AsciiType extends BytesType public ByteBuffer fromString(String source) { - return ByteBufferUtil.bytes(source, Charsets.US_ASCII); + return decompose(source); } public void validate(ByteBuffer bytes) throws MarshalException Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/BytesType.java Mon Apr 4 21:53:09 2011 @@ -25,19 +25,39 @@ import java.nio.ByteBuffer; import org.apache.cassandra.utils.ByteBufferUtil; -public class BytesType extends AbstractType +public class BytesType extends AbstractType<ByteBuffer> { public static final BytesType instance = new BytesType(); + public static BytesType getInstance() + { + return instance; + } + BytesType() {} // singleton - + + public ByteBuffer compose(ByteBuffer bytes) + { + return bytes.duplicate(); + } + + public ByteBuffer decompose(ByteBuffer value) + { + return value; + } + public int compare(ByteBuffer o1, ByteBuffer o2) { + return BytesType.bytesCompare(o1, o2); + } + + public static int bytesCompare(ByteBuffer o1, ByteBuffer o2) + { if(null == o1){ if(null == o2) return 0; else return -1; } - + return ByteBufferUtil.compareUnsigned(o1, o2); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/IntegerType.java Mon Apr 4 21:53:09 2011 @@ -22,12 +22,19 @@ package org.apache.cassandra.db.marshal; import java.math.BigInteger; import java.nio.ByteBuffer; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.thrift.TBaseHelper; -public final class IntegerType extends AbstractType +public final class IntegerType extends AbstractType<BigInteger> { public static final IntegerType instance = new IntegerType(); + public static IntegerType getInstance() + { + return instance; + } + + private static int findMostSignificantByte(ByteBuffer bytes) { int len = bytes.remaining() - 1; @@ -56,6 +63,16 @@ public final class IntegerType extends A IntegerType() {/* singleton */} + public BigInteger compose(ByteBuffer bytes) + { + return new BigInteger(ByteBufferUtil.getArray(bytes)); + } + + public ByteBuffer decompose(BigInteger value) + { + return ByteBuffer.wrap(value.toByteArray()); + } + public int compare(ByteBuffer lhs, ByteBuffer rhs) { int lhsLen = lhs.remaining(); @@ -138,7 +155,7 @@ public final class IntegerType extends A throw new RuntimeException("'" + source + "' could not be translated into an IntegerType."); } - return ByteBuffer.wrap(integerType.toByteArray()); + return decompose(integerType); } public void validate(ByteBuffer bytes) throws MarshalException Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java Mon Apr 4 21:53:09 2011 @@ -26,12 +26,27 @@ import java.util.UUID; import org.apache.cassandra.utils.UUIDGen; -public class LexicalUUIDType extends AbstractType +public class LexicalUUIDType extends AbstractType<UUID> { public static final LexicalUUIDType instance = new LexicalUUIDType(); + public static LexicalUUIDType getInstance() + { + return instance; + } + LexicalUUIDType() {} // singleton + public UUID compose(ByteBuffer bytes) + { + return UUIDGen.getUUID(bytes); + } + + public ByteBuffer decompose(UUID value) + { + return ByteBuffer.wrap(UUIDGen.decompose(value)); + } + public int compare(ByteBuffer o1, ByteBuffer o2) { if (o1.remaining() == 0) @@ -61,7 +76,7 @@ public class LexicalUUIDType extends Abs public ByteBuffer fromString(String source) { - return ByteBuffer.wrap(UUIDGen.decompose(UUID.fromString(source))); + return decompose(UUID.fromString(source)); } public void validate(ByteBuffer bytes) throws MarshalException Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java Mon Apr 4 21:53:09 2011 @@ -29,7 +29,7 @@ import org.apache.cassandra.utils.ByteBu /** for sorting columns representing row keys in the row ordering as determined by a partitioner. * Not intended for user-defined CFs, and will in fact error out if used with such. */ -public class LocalByPartionerType<T extends Token> extends AbstractType +public class LocalByPartionerType<T extends Token> extends AbstractType<ByteBuffer> { private final IPartitioner<T> partitioner; @@ -38,6 +38,16 @@ public class LocalByPartionerType<T exte this.partitioner = partitioner; } + public ByteBuffer compose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + + public ByteBuffer decompose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + public String getString(ByteBuffer bytes) { return ByteBufferUtil.bytesToHex(bytes); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/LongType.java Mon Apr 4 21:53:09 2011 @@ -25,12 +25,27 @@ import java.nio.ByteBuffer; import org.apache.cassandra.utils.ByteBufferUtil; -public class LongType extends AbstractType +public class LongType extends AbstractType<Long> { public static final LongType instance = new LongType(); + public static LongType getInstance() + { + return instance; + } + LongType() {} // singleton + public Long compose(ByteBuffer bytes) + { + return ByteBufferUtil.toLong(bytes); + } + + public ByteBuffer decompose(Long value) + { + return ByteBufferUtil.bytes(value); + } + public int compare(ByteBuffer o1, ByteBuffer o2) { if (o1.remaining() == 0) @@ -78,7 +93,7 @@ public class LongType extends AbstractTy throw new RuntimeException("'" + source + "' could not be translated into a LongType."); } - return ByteBufferUtil.bytes(longType); + return decompose(longType); } public void validate(ByteBuffer bytes) throws MarshalException Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java Mon Apr 4 21:53:09 2011 @@ -26,12 +26,27 @@ import java.util.UUID; import org.apache.cassandra.utils.UUIDGen; -public class TimeUUIDType extends AbstractType +public class TimeUUIDType extends AbstractType<UUID> { public static final TimeUUIDType instance = new TimeUUIDType(); + public static TimeUUIDType getInstance() + { + return instance; + } + TimeUUIDType() {} // singleton + public UUID compose(ByteBuffer bytes) + { + return UUIDGen.getUUID(bytes); + } + + public ByteBuffer decompose(UUID value) + { + return ByteBuffer.wrap(UUIDGen.decompose(value)); + } + public int compare(ByteBuffer o1, ByteBuffer o2) { if (o1.remaining() == 0) @@ -102,7 +117,7 @@ public class TimeUUIDType extends Abstra if (uuid.version() != 1) throw new IllegalArgumentException("TimeUUID supports only version 1 UUIDs"); - return ByteBuffer.wrap(UUIDGen.decompose(uuid)); + return decompose(uuid); } public void validate(ByteBuffer bytes) throws MarshalException Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/marshal/UTF8Type.java Mon Apr 4 21:53:09 2011 @@ -23,14 +23,35 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import com.google.common.base.Charsets; import org.apache.cassandra.utils.ByteBufferUtil; -public class UTF8Type extends BytesType +public class UTF8Type extends AbstractType<String> { public static final UTF8Type instance = new UTF8Type(); + public static UTF8Type getInstance() + { + return instance; + } + UTF8Type() {} // singleton + public String compose(ByteBuffer bytes) + { + return getString(bytes); + } + + public ByteBuffer decompose(String value) + { + return ByteBufferUtil.bytes(value, Charsets.UTF_8); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + return BytesType.bytesCompare(o1, o2); + } + public String getString(ByteBuffer bytes) { try @@ -45,9 +66,9 @@ public class UTF8Type extends BytesType public ByteBuffer fromString(String source) { - return ByteBufferUtil.bytes(source); + return decompose(source); } - + public void validate(ByteBuffer bytes) throws MarshalException { if (!UTF8Validator.validate(bytes.slice())) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Mon Apr 4 21:53:09 2011 @@ -377,6 +377,21 @@ public class ByteBufferUtil return bytes.getInt(bytes.position()); } + public static long toLong(ByteBuffer bytes) + { + return bytes.getLong(bytes.position()); + } + + public static float toFloat(ByteBuffer bytes) + { + return bytes.getFloat(bytes.position()); + } + + public static double toDouble(ByteBuffer bytes) + { + return bytes.getDouble(bytes.position()); + } + public static ByteBuffer bytes(int i) { return ByteBuffer.allocate(4).putInt(0, i); @@ -387,6 +402,17 @@ public class ByteBufferUtil return ByteBuffer.allocate(8).putLong(0, n); } + public static ByteBuffer bytes(float f) + { + return ByteBuffer.allocate(4).putFloat(0, f); + } + + public static ByteBuffer bytes(double d) + { + return ByteBuffer.allocate(8).putDouble(0, d); + } + + public static InputStream inputStream(ByteBuffer bytes) { final ByteBuffer copy = bytes.duplicate(); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1088800&r1=1088799&r2=1088800&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java Mon Apr 4 21:53:09 2011 @@ -543,6 +543,31 @@ public class FBUtilities } } + public static <T> T getInstance(String classname, String readable) throws ConfigurationException + { + Class cls = classForName(classname, readable); + T rval = null; + try + { + rval = (T) cls.getDeclaredMethod("getInstance").invoke(null, (Object) null); + + } + catch (NoSuchMethodException e) + { + throw new ConfigurationException("Class does not have the getInstance method with no arguments"); + } + catch (InvocationTargetException e) + { + throw new ConfigurationException(String.format("Could not call method getInstance on %s class %s", readable, classname)); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException(String.format("Could not call method getInstance on %s class %s", readable, classname)); + } + + return rval; + } + public static <T extends Comparable> SortedSet<T> singleton(T column) { return new TreeSet<T>(Arrays.asList(column));
