http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java deleted file mode 100644 index 7bf43ef..0000000 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ /dev/null @@ -1,1397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.pig; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.hadoop.HadoopCompat; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.serializers.CollectionSerializer; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Hex; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.*; -import org.apache.pig.Expression; -import org.apache.pig.LoadFunc; -import org.apache.pig.LoadMetadata; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceStatistics; -import org.apache.pig.StoreFuncInterface; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.*; -import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Cassandra - * - * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))). - */ -@Deprecated -public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata -{ - public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES"; - public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; - public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY"; - - private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; - private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class); - - private ByteBuffer slice_start = BOUND; - private ByteBuffer slice_end = BOUND; - private boolean slice_reverse = false; - private boolean allow_deletes = false; - - private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader; - private RecordWriter<ByteBuffer, List<Mutation>> writer; - - private boolean widerows = false; - private int limit; - - protected String DEFAULT_INPUT_FORMAT; - protected String DEFAULT_OUTPUT_FORMAT; - - protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR }; - - protected String username; - protected String password; - protected String keyspace; - protected String column_family; - protected String loadSignature; - protected String storeSignature; - - protected Configuration conf; - protected String inputFormatClass; - protected String outputFormatClass; - protected int splitSize = 64 * 1024; - protected String partitionerClass; - protected boolean usePartitionFilter = false; - protected String initHostAddress; - protected String rpcPort; - protected int nativeProtocolVersion = 1; - - // wide row hacks - private ByteBuffer lastKey; - private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow; - - public CassandraStorage() - { - this(1024); - } - - /**@param limit number of columns to fetch in a slice */ - public CassandraStorage(int limit) - { - super(); - this.limit = limit; - DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; - DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; - } - - public int getLimit() - { - return limit; - } - - @Override - public void prepareToRead(RecordReader reader, PigSplit split) - { - this.reader = reader; - } - - /** read wide row*/ - public Tuple getNextWide() throws IOException - { - CfDef cfDef = getCfDef(loadSignature); - ByteBuffer key = null; - Tuple tuple = null; - DefaultDataBag bag = new DefaultDataBag(); - try - { - while(true) - { - boolean hasNext = reader.nextKeyValue(); - if (!hasNext) - { - if (tuple == null) - tuple = TupleFactory.getInstance().newTuple(); - - if (lastRow != null) - { - if (tuple.size() == 0) // lastRow is a new one - { - key = reader.getCurrentKey(); - tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); - } - for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) - { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); - } - lastKey = null; - lastRow = null; - tuple.append(bag); - return tuple; - } - else - { - if (tuple.size() == 1) // rare case of just one wide row, key already set - { - tuple.append(bag); - return tuple; - } - else - return null; - } - } - if (key != null && !(reader.getCurrentKey()).equals(key)) // key changed - { - // read too much, hold on to it for next time - lastKey = reader.getCurrentKey(); - lastRow = reader.getCurrentValue(); - // but return what we have so far - tuple.append(bag); - return tuple; - } - if (key == null) // only set the key on the first iteration - { - key = reader.getCurrentKey(); - if (lastKey != null && !(key.equals(lastKey))) // last key only had one value - { - if (tuple == null) - tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class())); - else - addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); - for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) - { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); - } - tuple.append(bag); - lastKey = key; - lastRow = reader.getCurrentValue(); - return tuple; - } - if (tuple == null) - tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); - else - addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class())); - } - SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row = - (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue(); - if (lastRow != null) // prepend what was read last time - { - for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet()) - { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); - } - lastKey = null; - lastRow = null; - } - for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet()) - { - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); - } - } - } - catch (InterruptedException e) - { - throw new IOException(e.getMessage()); - } - } - - /** read next row */ - public Tuple getNext() throws IOException - { - if (widerows) - return getNextWide(); - try - { - // load the next pair - if (!reader.nextKeyValue()) - return null; - - CfDef cfDef = getCfDef(loadSignature); - ByteBuffer key = reader.getCurrentKey(); - Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue(); - assert key != null && cf != null; - - // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest - // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it - - Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class())); - DefaultDataBag bag = new DefaultDataBag(); - // we must add all the indexed columns first to match the schema - Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>(cfDef.column_metadata.size()); - // take care to iterate these in the same order as the schema does - for (ColumnDef cdef : cfDef.column_metadata) - { - boolean hasColumn = false; - boolean cql3Table = false; - try - { - hasColumn = cf.containsKey(cdef.name); - } - catch (Exception e) - { - cql3Table = true; - } - if (hasColumn) - { - tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()))); - } - else if (!cql3Table) - { // otherwise, we need to add an empty tuple to take its place - tuple.append(TupleFactory.getInstance().newTuple()); - } - added.put(cdef.name, true); - } - // now add all the other columns - for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet()) - { - if (!added.containsKey(entry.getKey())) - bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); - } - tuple.append(bag); - // finally, special top-level indexes if needed - if (usePartitionFilter) - { - for (ColumnDef cdef : getIndexes()) - { - Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())); - tuple.append(throwaway.get(1)); - } - } - return tuple; - } - catch (InterruptedException e) - { - throw new IOException(e.getMessage()); - } - } - - /** write next row */ - public void putNext(Tuple t) throws IOException - { - /* - We support two cases for output: - First, the original output: - (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional) - For supers, we only accept the original output. - */ - - if (t.size() < 1) - { - // simply nothing here, we can't even delete without a key - logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); - return; - } - ByteBuffer key = objToBB(t.get(0)); - if (t.getType(1) == DataType.TUPLE) - writeColumnsFromTuple(key, t, 1); - else if (t.getType(1) == DataType.BAG) - { - if (t.size() > 2) - throw new IOException("No arguments allowed after bag"); - writeColumnsFromBag(key, (DataBag) t.get(1)); - } - else - throw new IOException("Second argument in output must be a tuple or bag"); - } - - /** set hadoop cassandra connection settings */ - protected void setConnectionInformation() throws IOException - { - StorageHelper.setConnectionInformation(conf); - if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null) - inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT)); - else - inputFormatClass = DEFAULT_INPUT_FORMAT; - if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null) - outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT)); - else - outputFormatClass = DEFAULT_OUTPUT_FORMAT; - if (System.getenv(PIG_ALLOW_DELETES) != null) - allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES)); - } - - /** get the full class name */ - protected String getFullyQualifiedClassName(String classname) - { - return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; - } - - /** set read configuration settings */ - public void setLocation(String location, Job job) throws IOException - { - conf = HadoopCompat.getConfiguration(job); - setLocationFromUri(location); - - if (ConfigHelper.getInputSlicePredicate(conf) == null) - { - SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit); - SlicePredicate predicate = new SlicePredicate().setSlice_range(range); - ConfigHelper.setInputSlicePredicate(conf, predicate); - } - if (System.getenv(PIG_WIDEROW_INPUT) != null) - widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT)); - if (System.getenv(PIG_USE_SECONDARY) != null) - usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY)); - if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null) - { - try - { - ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE))); - } - catch (NumberFormatException e) - { - throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e); - } - } - - if (usePartitionFilter && getIndexExpressions() != null) - ConfigHelper.setInputRange(conf, getIndexExpressions()); - - if (username != null && password != null) - ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password); - - if (splitSize > 0) - ConfigHelper.setInputSplitSize(conf, splitSize); - if (partitionerClass!= null) - ConfigHelper.setInputPartitioner(conf, partitionerClass); - if (rpcPort != null) - ConfigHelper.setInputRpcPort(conf, rpcPort); - if (initHostAddress != null) - ConfigHelper.setInputInitialAddress(conf, initHostAddress); - - ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); - setConnectionInformation(); - - if (ConfigHelper.getInputRpcPort(conf) == 0) - throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); - if (ConfigHelper.getInputInitialAddress(conf) == null) - throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); - if (ConfigHelper.getInputPartitioner(conf) == null) - throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); - if (loadSignature == null) - loadSignature = location; - initSchema(loadSignature); - } - - /** set store configuration settings */ - public void setStoreLocation(String location, Job job) throws IOException - { - conf = HadoopCompat.getConfiguration(job); - - // don't combine mappers to a single mapper per node - conf.setBoolean("pig.noSplitCombination", true); - setLocationFromUri(location); - - if (username != null && password != null) - ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password); - if (splitSize > 0) - ConfigHelper.setInputSplitSize(conf, splitSize); - if (partitionerClass!= null) - ConfigHelper.setOutputPartitioner(conf, partitionerClass); - if (rpcPort != null) - { - ConfigHelper.setOutputRpcPort(conf, rpcPort); - ConfigHelper.setInputRpcPort(conf, rpcPort); - } - if (initHostAddress != null) - { - ConfigHelper.setOutputInitialAddress(conf, initHostAddress); - ConfigHelper.setInputInitialAddress(conf, initHostAddress); - } - - ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); - setConnectionInformation(); - - if (ConfigHelper.getOutputRpcPort(conf) == 0) - throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set"); - if (ConfigHelper.getOutputInitialAddress(conf) == null) - throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); - if (ConfigHelper.getOutputPartitioner(conf) == null) - throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); - - // we have to do this again here for the check in writeColumnsFromTuple - if (System.getenv(PIG_USE_SECONDARY) != null) - usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY)); - - initSchema(storeSignature); - } - - /** Methods to get the column family schema from Cassandra */ - protected void initSchema(String signature) throws IOException - { - Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class); - - // Only get the schema if we haven't already gotten it - if (!properties.containsKey(signature)) - { - try - { - Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); - client.set_keyspace(keyspace); - - if (username != null && password != null) - { - Map<String, String> credentials = new HashMap<String, String>(2); - credentials.put(PasswordAuthenticator.USERNAME_KEY, username); - credentials.put(PasswordAuthenticator.PASSWORD_KEY, password); - - try - { - client.login(new AuthenticationRequest(credentials)); - } - catch (AuthenticationException e) - { - logger.error("Authentication exception: invalid username and/or password"); - throw new IOException(e); - } - } - - // compose the CfDef for the columfamily - CfDef cfDef = getCfDef(client); - - if (cfDef != null) - { - StringBuilder sb = new StringBuilder(); - sb.append(cfdefToString(cfDef)); - properties.setProperty(signature, sb.toString()); - } - else - throw new IOException(String.format("Table '%s' not found in keyspace '%s'", - column_family, - keyspace)); - } - catch (Exception e) - { - throw new IOException(e); - } - } - } - - public void checkSchema(ResourceSchema schema) throws IOException - { - // we don't care about types, they all get casted to ByteBuffers - } - - /** define the schema */ - public ResourceSchema getSchema(String location, Job job) throws IOException - { - setLocation(location, job); - CfDef cfDef = getCfDef(loadSignature); - if (cfDef.column_type.equals("Super")) - return null; - /* - Our returned schema should look like this: - (key, index1:(name, value), index2:(name, value), columns:{(name, value)}) - Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag. - This way, wide rows can still be handled by the bag, but known columns can easily be referenced. - */ - - // top-level schema, no type - ResourceSchema schema = new ResourceSchema(); - - // get default marshallers and validators - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - - // add key - ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); - keyFieldSchema.setName("key"); - keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR))); - - ResourceSchema bagSchema = new ResourceSchema(); - ResourceFieldSchema bagField = new ResourceFieldSchema(); - bagField.setType(DataType.BAG); - bagField.setName("columns"); - // inside the bag, place one tuple with the default comparator/validator schema - ResourceSchema bagTupleSchema = new ResourceSchema(); - ResourceFieldSchema bagTupleField = new ResourceFieldSchema(); - bagTupleField.setType(DataType.TUPLE); - ResourceFieldSchema bagcolSchema = new ResourceFieldSchema(); - ResourceFieldSchema bagvalSchema = new ResourceFieldSchema(); - bagcolSchema.setName("name"); - bagvalSchema.setName("value"); - bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR))); - bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR))); - bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema }); - bagTupleField.setSchema(bagTupleSchema); - bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField }); - bagField.setSchema(bagSchema); - - // will contain all fields for this schema - List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>(); - // add the key first, then the indexed columns, and finally the bag - allSchemaFields.add(keyFieldSchema); - - if (!widerows) - { - // defined validators/indexes - for (ColumnDef cdef : cfDef.column_metadata) - { - // make a new tuple for each col/val pair - ResourceSchema innerTupleSchema = new ResourceSchema(); - ResourceFieldSchema innerTupleField = new ResourceFieldSchema(); - innerTupleField.setType(DataType.TUPLE); - innerTupleField.setSchema(innerTupleSchema); - innerTupleField.setName(new String(cdef.getName())); - - ResourceFieldSchema idxColSchema = new ResourceFieldSchema(); - idxColSchema.setName("name"); - idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR))); - - ResourceFieldSchema valSchema = new ResourceFieldSchema(); - AbstractType validator = validators.get(cdef.name); - if (validator == null) - validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); - valSchema.setName("value"); - valSchema.setType(StorageHelper.getPigType(validator)); - - innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema }); - allSchemaFields.add(innerTupleField); - } - } - - // bag at the end for unknown columns - allSchemaFields.add(bagField); - - // add top-level index elements if needed - if (usePartitionFilter) - { - for (ColumnDef cdef : getIndexes()) - { - ResourceFieldSchema idxSchema = new ResourceFieldSchema(); - idxSchema.setName("index_" + new String(cdef.getName())); - AbstractType validator = validators.get(cdef.name); - if (validator == null) - validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); - idxSchema.setType(StorageHelper.getPigType(validator)); - allSchemaFields.add(idxSchema); - } - } - // top level schema contains everything - schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()])); - return schema; - } - - /** set partition filter */ - public void setPartitionFilter(Expression partitionFilter) throws IOException - { - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(CassandraStorage.class); - property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter))); - } - - /** prepare writer */ - public void prepareToWrite(RecordWriter writer) - { - this.writer = writer; - } - - /** convert object to ByteBuffer */ - protected ByteBuffer objToBB(Object o) - { - if (o == null) - return nullToBB(); - if (o instanceof java.lang.String) - return ByteBuffer.wrap(new DataByteArray((String)o).get()); - if (o instanceof Integer) - return Int32Type.instance.decompose((Integer)o); - if (o instanceof Long) - return LongType.instance.decompose((Long)o); - if (o instanceof Float) - return FloatType.instance.decompose((Float)o); - if (o instanceof Double) - return DoubleType.instance.decompose((Double)o); - if (o instanceof UUID) - return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); - if(o instanceof Tuple) { - List<Object> objects = ((Tuple)o).getAll(); - //collections - if (objects.size() > 0 && objects.get(0) instanceof String) - { - String collectionType = (String) objects.get(0); - if ("set".equalsIgnoreCase(collectionType) || - "list".equalsIgnoreCase(collectionType)) - return objToListOrSetBB(objects.subList(1, objects.size())); - else if ("map".equalsIgnoreCase(collectionType)) - return objToMapBB(objects.subList(1, objects.size())); - - } - return objToCompositeBB(objects); - } - - return ByteBuffer.wrap(((DataByteArray) o).get()); - } - - private ByteBuffer objToListOrSetBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); - for(Object sub : objects) - { - ByteBuffer buffer = objToBB(sub); - serialized.add(buffer); - } - // NOTE: using protocol v1 serialization format for collections so as to not break - // compatibility. Not sure if that's the right thing. - return CollectionSerializer.pack(serialized, objects.size(), 1); - } - - private ByteBuffer objToMapBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2); - for(Object sub : objects) - { - List<Object> keyValue = ((Tuple)sub).getAll(); - for (Object entry: keyValue) - { - ByteBuffer buffer = objToBB(entry); - serialized.add(buffer); - } - } - // NOTE: using protocol v1 serialization format for collections so as to not break - // compatibility. Not sure if that's the right thing. - return CollectionSerializer.pack(serialized, objects.size(), 1); - } - - private ByteBuffer objToCompositeBB(List<Object> objects) - { - List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size()); - int totalLength = 0; - for(Object sub : objects) - { - ByteBuffer buffer = objToBB(sub); - serialized.add(buffer); - totalLength += 2 + buffer.remaining() + 1; - } - ByteBuffer out = ByteBuffer.allocate(totalLength); - for (ByteBuffer bb : serialized) - { - int length = bb.remaining(); - out.put((byte) ((length >> 8) & 0xFF)); - out.put((byte) (length & 0xFF)); - out.put(bb); - out.put((byte) 0); - } - out.flip(); - return out; - } - - /** write tuple data to cassandra */ - private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException - { - ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); - for (int i = offset; i < t.size(); i++) - { - if (t.getType(i) == DataType.BAG) - writeColumnsFromBag(key, (DataBag) t.get(i)); - else if (t.getType(i) == DataType.TUPLE) - { - Tuple inner = (Tuple) t.get(i); - if (inner.size() > 0) // may be empty, for an indexed column that wasn't present - mutationList.add(mutationFromTuple(inner)); - } - else if (!usePartitionFilter) - { - throw new IOException("Output type was not a bag or a tuple"); - } - } - if (mutationList.size() > 0) - writeMutations(key, mutationList); - } - - /** compose Cassandra mutation from tuple */ - private Mutation mutationFromTuple(Tuple t) throws IOException - { - Mutation mutation = new Mutation(); - if (t.get(1) == null) - { - if (allow_deletes) - { - mutation.deletion = new Deletion(); - mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); - mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0))); - mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); - } - else - throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES + - "=true in environment or allow_deletes=true in URL to enable"); - } - else - { - org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); - column.setName(objToBB(t.get(0))); - column.setValue(objToBB(t.get(1))); - column.setTimestamp(FBUtilities.timestampMicros()); - mutation.column_or_supercolumn = new ColumnOrSuperColumn(); - mutation.column_or_supercolumn.column = column; - } - return mutation; - } - - /** write bag data to Cassandra */ - private void writeColumnsFromBag(ByteBuffer key, DataBag bag) throws IOException - { - List<Mutation> mutationList = new ArrayList<Mutation>(); - for (Tuple pair : bag) - { - Mutation mutation = new Mutation(); - if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn - { - SuperColumn sc = new SuperColumn(); - sc.setName(objToBB(pair.get(0))); - List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>(); - for (Tuple subcol : (DataBag) pair.get(1)) - { - org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); - column.setName(objToBB(subcol.get(0))); - column.setValue(objToBB(subcol.get(1))); - column.setTimestamp(FBUtilities.timestampMicros()); - columns.add(column); - } - if (columns.isEmpty()) - { - if (allow_deletes) - { - mutation.deletion = new Deletion(); - mutation.deletion.super_column = objToBB(pair.get(0)); - mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); - } - else - throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " + - PIG_ALLOW_DELETES + "=true in environment or allow_deletes=true in URL to enable"); - } - else - { - sc.columns = columns; - mutation.column_or_supercolumn = new ColumnOrSuperColumn(); - mutation.column_or_supercolumn.super_column = sc; - } - } - else - mutation = mutationFromTuple(pair); - mutationList.add(mutation); - // for wide rows, we need to limit the amount of mutations we write at once - if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care - { - writeMutations(key, mutationList); - mutationList.clear(); - } - } - // write the last batch - if (mutationList.size() > 0) - writeMutations(key, mutationList); - } - - /** write mutation to Cassandra */ - private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException - { - try - { - writer.write(key, mutations); - } - catch (InterruptedException e) - { - throw new IOException(e); - } - } - - /** get a list of columns with defined index*/ - protected List<ColumnDef> getIndexes() throws IOException - { - CfDef cfdef = getCfDef(loadSignature); - List<ColumnDef> indexes = new ArrayList<ColumnDef>(); - for (ColumnDef cdef : cfdef.column_metadata) - { - if (cdef.index_type != null) - indexes.add(cdef); - } - return indexes; - } - - /** get a list of Cassandra IndexExpression from Pig expression */ - private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException - { - List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>(); - Expression.BinaryExpression be = (Expression.BinaryExpression)expression; - ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes()); - ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes()); - switch (expression.getOpType()) - { - case OP_EQ: - indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value)); - break; - case OP_GE: - indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value)); - break; - case OP_GT: - indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value)); - break; - case OP_LE: - indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value)); - break; - case OP_LT: - indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value)); - break; - case OP_AND: - indexExpressions.addAll(filterToIndexExpressions(be.getLhs())); - indexExpressions.addAll(filterToIndexExpressions(be.getRhs())); - break; - default: - throw new IOException("Unsupported expression type: " + expression.getOpType().name()); - } - return indexExpressions; - } - - /** convert a list of index expression to string */ - private static String indexExpressionsToString(List<IndexExpression> indexExpressions) throws IOException - { - assert indexExpressions != null; - // oh, you thought cfdefToString was awful? - IndexClause indexClause = new IndexClause(); - indexClause.setExpressions(indexExpressions); - indexClause.setStart_key("".getBytes()); - TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); - try - { - return Hex.bytesToHex(serializer.serialize(indexClause)); - } - catch (TException e) - { - throw new IOException(e); - } - } - - /** convert string to a list of index expression */ - private static List<IndexExpression> indexExpressionsFromString(String ie) throws IOException - { - assert ie != null; - TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); - IndexClause indexClause = new IndexClause(); - try - { - deserializer.deserialize(indexClause, Hex.hexToBytes(ie)); - } - catch (TException e) - { - throw new IOException(e); - } - return indexClause.getExpressions(); - } - - public ResourceStatistics getStatistics(String location, Job job) - { - return null; - } - - public void cleanupOnFailure(String failure, Job job) - { - } - - public void cleanupOnSuccess(String location, Job job) throws IOException { - } - - - /** StoreFunc methods */ - public void setStoreFuncUDFContextSignature(String signature) - { - this.storeSignature = signature; - } - - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException - { - return relativeToAbsolutePath(location, curDir); - } - - /** output format */ - public OutputFormat getOutputFormat() throws IOException - { - try - { - return FBUtilities.construct(outputFormatClass, "outputformat"); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - } - - - @Override - public InputFormat getInputFormat() throws IOException - { - try - { - return FBUtilities.construct(inputFormatClass, "inputformat"); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - } - - /** get a list of index expression */ - private List<IndexExpression> getIndexExpressions() throws IOException - { - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(CassandraStorage.class); - if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null) - return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE)); - else - return null; - } - - /** get a list of column for the column family */ - protected List<ColumnDef> getColumnMetadata(Cassandra.Client client) - throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException - { - return getColumnMeta(client, true, true); - } - - - /** get column meta data */ - protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn) - throws TException, - CharacterCodingException, - ConfigurationException - { - String query = String.format("SELECT column_name, validator, index_type, type " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNS, - keyspace, - column_family); - - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - List<CqlRow> rows = result.rows; - List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); - if (rows == null || rows.isEmpty()) - { - // if CassandraStorage, just return the empty list - if (cassandraStorage) - return columnDefs; - - // otherwise for CqlNativeStorage, check metadata for classic thrift tables - CFMetaData cfm = getCFMetaData(keyspace, column_family, client); - for (ColumnDefinition def : cfm.regularAndStaticColumns()) - { - ColumnDef cDef = new ColumnDef(); - String columnName = def.name.toString(); - String type = def.type.toString(); - logger.debug("name: {}, type: {} ", columnName, type); - cDef.name = ByteBufferUtil.bytes(columnName); - cDef.validation_class = type; - columnDefs.add(cDef); - } - // we may not need to include the value column for compact tables as we - // could have already processed it as schema_columnfamilies.value_alias - if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null) - { - ColumnDefinition def = cfm.compactValueColumn(); - if ("value".equals(def.name.toString())) - { - ColumnDef cDef = new ColumnDef(); - cDef.name = def.name.bytes; - cDef.validation_class = def.type.toString(); - columnDefs.add(cDef); - } - } - return columnDefs; - } - - Iterator<CqlRow> iterator = rows.iterator(); - while (iterator.hasNext()) - { - CqlRow row = iterator.next(); - ColumnDef cDef = new ColumnDef(); - String type = ByteBufferUtil.string(row.getColumns().get(3).value); - if (!type.equals("regular")) - continue; - cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value)); - cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value); - ByteBuffer indexType = row.getColumns().get(2).value; - if (indexType != null) - cDef.index_type = getIndexType(ByteBufferUtil.string(indexType)); - columnDefs.add(cDef); - } - return columnDefs; - } - - - /** get CFMetaData of a column family */ - protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client) - throws TException, ConfigurationException - { - KsDef ksDef = client.describe_keyspace(ks); - for (CfDef cfDef : ksDef.cf_defs) - { - if (cfDef.name.equalsIgnoreCase(cf)) - return ThriftConversion.fromThrift(cfDef); - } - return null; - } - - /** get index type from string */ - protected IndexType getIndexType(String type) - { - type = type.toLowerCase(); - if ("keys".equals(type)) - return IndexType.KEYS; - else if("custom".equals(type)) - return IndexType.CUSTOM; - else if("composites".equals(type)) - return IndexType.COMPOSITES; - else - return null; - } - - /** return partition keys */ - public String[] getPartitionKeys(String location, Job job) throws IOException - { - if (!usePartitionFilter) - return null; - List<ColumnDef> indexes = getIndexes(); - String[] partitionKeys = new String[indexes.size()]; - for (int i = 0; i < indexes.size(); i++) - { - partitionKeys[i] = new String(indexes.get(i).getName()); - } - return partitionKeys; - } - - /** convert key to a tuple */ - private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException - { - Tuple tuple = TupleFactory.getInstance().newTuple(1); - addKeyToTuple(tuple, key, cfDef, comparator); - return tuple; - } - - /** add key to a tuple */ - private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException - { - if( comparator instanceof AbstractCompositeType ) - { - StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key)); - } - else - { - StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion)); - } - - } - - /** Deconstructs a composite type to a Tuple. */ - protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException - { - List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name); - Tuple t = TupleFactory.getInstance().newTuple(result.size()); - for (int i=0; i<result.size(); i++) - StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion)); - - return t; - } - - /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end> - * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true] - * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/ - private void setLocationFromUri(String location) throws IOException - { - try - { - if (!location.startsWith("cassandra://")) - throw new Exception("Bad scheme." + location); - - String[] urlParts = location.split("\\?"); - if (urlParts.length > 1) - { - Map<String, String> urlQuery = getQueryMap(urlParts[1]); - AbstractType comparator = BytesType.instance; - if (urlQuery.containsKey("comparator")) - comparator = TypeParser.parse(urlQuery.get("comparator")); - if (urlQuery.containsKey("slice_start")) - slice_start = comparator.fromString(urlQuery.get("slice_start")); - if (urlQuery.containsKey("slice_end")) - slice_end = comparator.fromString(urlQuery.get("slice_end")); - if (urlQuery.containsKey("reversed")) - slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed")); - if (urlQuery.containsKey("limit")) - limit = Integer.parseInt(urlQuery.get("limit")); - if (urlQuery.containsKey("allow_deletes")) - allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes")); - if (urlQuery.containsKey("widerows")) - widerows = Boolean.parseBoolean(urlQuery.get("widerows")); - if (urlQuery.containsKey("use_secondary")) - usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary")); - if (urlQuery.containsKey("split_size")) - splitSize = Integer.parseInt(urlQuery.get("split_size")); - if (urlQuery.containsKey("partitioner")) - partitionerClass = urlQuery.get("partitioner"); - if (urlQuery.containsKey("init_address")) - initHostAddress = urlQuery.get("init_address"); - if (urlQuery.containsKey("rpc_port")) - rpcPort = urlQuery.get("rpc_port"); - } - String[] parts = urlParts[0].split("/+"); - String[] credentialsAndKeyspace = parts[1].split("@"); - if (credentialsAndKeyspace.length > 1) - { - String[] credentials = credentialsAndKeyspace[0].split(":"); - username = credentials[0]; - password = credentials[1]; - keyspace = credentialsAndKeyspace[1]; - } - else - { - keyspace = parts[1]; - } - column_family = parts[2]; - } - catch (Exception e) - { - throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<table>" + - "[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]" + - "[&allow_deletes=true][&widerows=true][&use_secondary=true]" + - "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]" + - "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage()); - } - } - - - /** decompose the query to store the parameters in a map */ - public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException - { - String[] params = query.split("&"); - Map<String, String> map = new HashMap<String, String>(params.length); - for (String param : params) - { - String[] keyValue = param.split("="); - map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8")); - } - return map; - } - - public ByteBuffer nullToBB() - { - return null; - } - - /** return the CfInfo for the column family */ - protected CfDef getCfDef(Cassandra.Client client) - throws TException, - ConfigurationException, - IOException - { - // get CF meta data - String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNFAMILIES, - keyspace, - column_family); - - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - if (result == null || result.rows == null || result.rows.isEmpty()) - return null; - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - CfDef cfDef = new CfDef(); - cfDef.keyspace = keyspace; - cfDef.name = column_family; - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - - cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value); - cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value); - ByteBuffer subComparator = cqlRow.columns.get(2).value; - if (subComparator != null) - cfDef.subcomparator_type = ByteBufferUtil.string(subComparator); - cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value); - cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value); - } - cfDef.column_metadata = getColumnMetadata(client); - return cfDef; - } - - /** get the columnfamily definition for the signature */ - protected CfDef getCfDef(String signature) throws IOException - { - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(CassandraStorage.class); - String prop = property.getProperty(signature); - return cfdefFromString(prop); - } - - /** convert string back to CfDef */ - protected static CfDef cfdefFromString(String st) throws IOException - { - assert st != null; - TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); - CfDef cfDef = new CfDef(); - try - { - deserializer.deserialize(cfDef, Hex.hexToBytes(st)); - } - catch (TException e) - { - throw new IOException(e); - } - return cfDef; - } - - /** convert CfDef to string */ - protected static String cfdefToString(CfDef cfDef) throws IOException - { - assert cfDef != null; - // this is so awful it's kind of cool! - TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); - try - { - return Hex.bytesToHex(serializer.serialize(cfDef)); - } - catch (TException e) - { - throw new IOException(e); - } - } - - /** parse the string to a cassandra data type */ - protected AbstractType parseType(String type) throws IOException - { - try - { - // always treat counters like longs, specifically CCT.compose is not what we need - if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType")) - return LongType.instance; - return TypeParser.parse(type); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - catch (SyntaxException e) - { - throw new IOException(e); - } - } - - /** convert a column to a tuple */ - protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException - { - Tuple pair = TupleFactory.getInstance().newTuple(2); - - // name - if(comparator instanceof AbstractCompositeType) - StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name)); - else - StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion)); - - // value - Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - if (validators.get(column.name) == null) - { - Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef); - StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion)); - } - else - StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion)); - return pair; - } - - /** construct a map to store the mashaller type to cassandra data type mapping */ - protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException - { - Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class); - AbstractType comparator; - AbstractType subcomparator; - AbstractType default_validator; - AbstractType key_validator; - - comparator = parseType(cfDef.getComparator_type()); - subcomparator = parseType(cfDef.getSubcomparator_type()); - default_validator = parseType(cfDef.getDefault_validation_class()); - key_validator = parseType(cfDef.getKey_validation_class()); - - marshallers.put(MarshallerType.COMPARATOR, comparator); - marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator); - marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator); - marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator); - return marshallers; - } - - /** get the validators */ - protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException - { - Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>(); - for (ColumnDef cd : cfDef.getColumn_metadata()) - { - if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty()) - { - AbstractType validator = null; - try - { - validator = TypeParser.parse(cd.getValidation_class()); - if (validator instanceof CounterColumnType) - validator = LongType.instance; - validators.put(cd.name, validator); - } - catch (ConfigurationException e) - { - throw new IOException(e); - } - catch (SyntaxException e) - { - throw new IOException(e); - } - } - } - return validators; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/PigTestBase.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java index 8c27f6c..f556e66 100644 --- a/test/pig/org/apache/cassandra/pig/PigTestBase.java +++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java @@ -20,6 +20,8 @@ package org.apache.cassandra.pig; import java.io.IOException; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.marshal.AbstractType; @@ -27,9 +29,6 @@ import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.service.EmbeddedCassandraService; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Compression; -import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; import org.apache.pig.ExecType; @@ -37,13 +36,6 @@ import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.impl.PigContext; import org.apache.pig.test.MiniCluster; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -80,13 +72,10 @@ public class PigTestBase extends SchemaLoader pig.shutdown(); } - protected static Cassandra.Client getClient() throws TTransportException + protected static Session getClient() { - TTransport tr = new TFramedTransport(new TSocket("localhost", 9170)); - TProtocol proto = new TBinaryProtocol(tr); - Cassandra.Client client = new Cassandra.Client(proto); - tr.open(); - return client; + Cluster cluster = Cluster.builder().addContactPoints("localhost").withPort(9042).build(); + return cluster.connect(); } protected static void startCassandra() throws IOException @@ -114,14 +103,14 @@ public class PigTestBase extends SchemaLoader } } - protected static void executeCQLStatements(String[] statements) throws TException + protected static void executeCQLStatements(String[] statements) { - Cassandra.Client client = getClient(); + Session client = getClient(); for (String statement : statements) { System.out.println("Executing statement: " + statement); - client.execute_cql3_query(ByteBufferUtil.bytes(statement), Compression.NONE, ConsistencyLevel.ONE); + client.execute(statement); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java index 3ddb94e..273cdff 100644 --- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java +++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java @@ -24,10 +24,8 @@ import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Hex; -import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.thrift.TException; import org.junit.BeforeClass; import org.junit.Test; @@ -69,7 +67,7 @@ public class ThriftColumnFamilyDataTypeTest extends PigTestBase }; @BeforeClass - public static void setup() throws IOException, ConfigurationException, TException + public static void setup() throws IOException, ConfigurationException { startCassandra(); executeCQLStatements(statements); @@ -79,76 +77,74 @@ public class ThriftColumnFamilyDataTypeTest extends PigTestBase @Test public void testCassandraStorageDataType() throws IOException { - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + "' USING CqlNativeStorage();"); Tuple t = pig.openIterator("rows").next(); // key assertEquals("foo", t.get(0)); // col_ascii - Tuple column = (Tuple) t.get(1); - assertEquals("ascii", column.get(1)); + Object column = t.get(1); + assertEquals("ascii", column); // col_bigint - column = (Tuple) t.get(2); - assertEquals(12345678L, column.get(1)); + column = t.get(2); + assertEquals(12345678L, column); // col_blob - column = (Tuple) t.get(3); - assertEquals(new DataByteArray(Hex.hexToBytes("DEADBEEF")), column.get(1)); + column = t.get(3); + assertEquals(new DataByteArray(Hex.hexToBytes("DEADBEEF")), column); // col_boolean - column = (Tuple) t.get(4); - assertEquals(false, column.get(1)); + column = t.get(4); + assertEquals(false, column); // col_decimal - column = (Tuple) t.get(5); - assertEquals("23.345", column.get(1)); + column = t.get(5); + assertEquals("23.345", column); // col_double - column = (Tuple) t.get(6); - assertEquals(2.7182818284590451d, column.get(1)); + column = t.get(6); + assertEquals(2.7182818284590451d, column); // col_float - column = (Tuple) t.get(7); - assertEquals(23.45f, column.get(1)); + column = t.get(7); + assertEquals(23.45f, column); // col_inet - column = (Tuple) t.get(8); - assertEquals("127.0.0.1", column.get(1)); + column = t.get(8); + assertEquals("127.0.0.1", column); // col_int - column = (Tuple) t.get(9); - assertEquals(23, column.get(1)); + column = t.get(9); + assertEquals(23, column); // col_text - column = (Tuple) t.get(10); - assertEquals("hello", column.get(1)); + column = t.get(10); + assertEquals("hello", column); // col_timestamp - column = (Tuple) t.get(11); - assertEquals(1296705900000L, column.get(1)); + column = t.get(11); + assertEquals(1296705900000L, column); // col_timeuuid - column = (Tuple) t.get(12); - assertEquals(new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())), column.get(1)); + column = t.get(12); + assertEquals(new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())), column); // col_uuid - column = (Tuple) t.get(13); - assertEquals(new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())), column.get(1)); + column = t.get(13); + assertEquals(new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())), column); // col_varint - column = (Tuple) t.get(14); - assertEquals(12345, column.get(1)); + column = t.get(14); + assertEquals(12345, column); - pig.registerQuery("cc_rows = LOAD 'cassandra://thrift_ks/cc?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("cc_rows = LOAD 'cql://thrift_ks/cc?" + defaultParameters + "' USING CqlNativeStorage();"); t = pig.openIterator("cc_rows").next(); assertEquals("chuck", t.get(0)); - DataBag columns = (DataBag) t.get(1); - column = columns.iterator().next(); - assertEquals("kick", column.get(0)); - assertEquals(3L, column.get(1)); + assertEquals("kick", t.get(1)); + assertEquals(3L, t.get(2)); } }
