Pig: inferred and actual schema match, smoke tests. Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3371
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e48b29a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e48b29a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e48b29a6 Branch: refs/heads/trunk Commit: e48b29a6b0b15f9e1c8d8e462a573daa04d1ec5e Parents: 9ca8478 Author: Brandon Williams <[email protected]> Authored: Mon Feb 13 17:28:47 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Mon Feb 13 17:28:47 2012 -0600 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 304 +++++++++------ contrib/pig/test/populate-cli.txt | 67 ++++ contrib/pig/test/test_storage.pig | 22 + 3 files changed, 283 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 975d5ba..7e55ee0 100644 --- a/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.logging.Log; @@ -33,7 +34,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; -import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.hadoop.*; import org.apache.cassandra.thrift.Mutation; @@ -50,15 +50,10 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; /** * A LoadStoreFunc for retrieving data from and storing data to Cassandra @@ -85,8 +80,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private String storeSignature; private Configuration conf; - private RecordReader reader; - private RecordWriter writer; + private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader; + private RecordWriter<ByteBuffer, List<Mutation>> writer; private int limit; public CassandraStorage() @@ -118,20 +113,37 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return null; CfDef cfDef = getCfDef(loadSignature); - ByteBuffer key = (ByteBuffer)reader.getCurrentKey(); - SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue(); + ByteBuffer key = reader.getCurrentKey(); + Map<ByteBuffer, IColumn> cf = reader.getCurrentValue(); assert key != null && cf != null; - - // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(2); - ArrayList<Tuple> columns = new ArrayList<Tuple>(); - tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + + // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest + Tuple tuple = TupleFactory.getInstance().newTuple(); + DefaultDataBag bag = new DefaultDataBag(); + // set the key + tuple.append(new DataByteArray(ByteBufferUtil.getArray(key))); + // we must add all the indexed columns first to match the schema + Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>(); + // take care to iterate these in the same order as the schema does + for (ColumnDef cdef : cfDef.column_metadata) + { + if (cf.containsKey(cdef.name)) + { + tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()))); + } + else + { // otherwise, we need to add an empty tuple to take its place + tuple.append(TupleFactory.getInstance().newTuple()); + } + added.put(cdef.name, true); + } + // now add all the other columns for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) { - columns.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); + if (!added.containsKey(entry.getKey())) + bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type()))); } - - tuple.set(1, new DefaultDataBag(columns)); + tuple.append(bag); return tuple; } catch (InterruptedException e) @@ -334,6 +346,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); + if (loadSignature == null) + loadSignature = location; initSchema(loadSignature); } @@ -344,6 +358,13 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (cfDef.column_type.equals("Super")) return null; + /* + Our returned schema should look like this: + (key, index1:(name, value), index2:(name, value), columns:{(name, value)}) + Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag. + This way, wide rows can still be handled by the bag, but known columns can easily be referenced. + */ + // top-level schema, no type ResourceSchema schema = new ResourceSchema(); @@ -356,54 +377,59 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo keyFieldSchema.setName("key"); keyFieldSchema.setType(getPigType(marshallers.get(2))); - // will become the bag of tuples - ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); - bagFieldSchema.setName("columns"); - bagFieldSchema.setType(DataType.BAG); ResourceSchema bagSchema = new ResourceSchema(); - - List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); - - // default comparator/validator - ResourceSchema innerTupleSchema = new ResourceSchema(); - ResourceFieldSchema tupleField = new ResourceFieldSchema(); - tupleField.setType(DataType.TUPLE); - tupleField.setSchema(innerTupleSchema); - - ResourceFieldSchema colSchema = new ResourceFieldSchema(); - colSchema.setName("name"); - colSchema.setType(getPigType(marshallers.get(0))); - tupleFields.add(colSchema); - - ResourceFieldSchema valSchema = new ResourceFieldSchema(); - AbstractType validator = marshallers.get(1); - valSchema.setName("value"); - valSchema.setType(getPigType(validator)); - tupleFields.add(valSchema); + ResourceFieldSchema bagField = new ResourceFieldSchema(); + bagField.setType(DataType.BAG); + bagField.setName("columns"); + // inside the bag, place one tuple with the default comparator/validator schema + ResourceSchema bagTupleSchema = new ResourceSchema(); + ResourceFieldSchema bagTupleField = new ResourceFieldSchema(); + bagTupleField.setType(DataType.TUPLE); + ResourceFieldSchema bagcolSchema = new ResourceFieldSchema(); + ResourceFieldSchema bagvalSchema = new ResourceFieldSchema(); + bagcolSchema.setName("name"); + bagvalSchema.setName("value"); + bagcolSchema.setType(getPigType(marshallers.get(0))); + bagvalSchema.setType(getPigType(marshallers.get(1))); + bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema }); + bagTupleField.setSchema(bagTupleSchema); + bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField }); + bagField.setSchema(bagSchema); + + // will contain all fields for this schema + List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>(); + // add the key first, then the indexed columns, and finally the bag + allSchemaFields.add(keyFieldSchema); // defined validators/indexes for (ColumnDef cdef : cfDef.column_metadata) { - colSchema = new ResourceFieldSchema(); - colSchema.setName(new String(cdef.getName())); - colSchema.setType(getPigType(marshallers.get(0))); - tupleFields.add(colSchema); - - valSchema = new ResourceFieldSchema(); - validator = validators.get(ByteBuffer.wrap(cdef.getName())); + // make a new tuple for each col/val pair + ResourceSchema innerTupleSchema = new ResourceSchema(); + ResourceFieldSchema innerTupleField = new ResourceFieldSchema(); + innerTupleField.setType(DataType.TUPLE); + innerTupleField.setSchema(innerTupleSchema); + innerTupleField.setName(new String(cdef.getName())); + + ResourceFieldSchema idxColSchema = new ResourceFieldSchema(); + idxColSchema.setName("name"); + idxColSchema.setType(getPigType(marshallers.get(0))); + + ResourceFieldSchema valSchema = new ResourceFieldSchema(); + AbstractType validator = validators.get(cdef.name); if (validator == null) validator = marshallers.get(1); valSchema.setName("value"); valSchema.setType(getPigType(validator)); - tupleFields.add(valSchema); + + innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema }); + allSchemaFields.add(innerTupleField); } - innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()])); + // bag at the end for unknown columns + allSchemaFields.add(bagField); - // a bag can contain only one tuple, but that tuple can contain anything - bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); - bagFieldSchema.setSchema(bagSchema); // top level schema contains everything - schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema }); + schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()])); return schema; } @@ -502,79 +528,137 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo return DoubleType.instance.decompose((Double)o); if (o instanceof UUID) return ByteBuffer.wrap(UUIDGen.decompose((UUID) o)); + return ByteBuffer.wrap(((DataByteArray) o).get()); } - public void putNext(Tuple t) throws ExecException, IOException + public void putNext(Tuple t) throws IOException { + /* + We support two cases for output: + First, the original output: + (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional) + For supers, we only accept the original output. + */ + + if (t.size() < 1) + { + // simply nothing here, we can't even delete without a key + logger.warn("Empty output skipped, filter empty tuples to suppress this warning"); + return; + } ByteBuffer key = objToBB(t.get(0)); - DefaultDataBag pairs = (DefaultDataBag) t.get(1); + if (t.getType(1) == DataType.TUPLE) + writeColumnsFromTuple(key, t, 1); + else if (t.getType(1) == DataType.BAG) + { + if (t.size() > 2) + throw new IOException("No arguments allowed after bag"); + writeColumnsFromBag(key, (DefaultDataBag) t.get(1)); + } + else + throw new IOException("Second argument in output must be a tuple or bag"); + } + + private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException + { ArrayList<Mutation> mutationList = new ArrayList<Mutation>(); - CfDef cfDef = getCfDef(storeSignature); - try + for (int i = offset; i < t.size(); i++) { - for (Tuple pair : pairs) + if (t.getType(i) == DataType.BAG) + writeColumnsFromBag(key, (DefaultDataBag) t.get(i)); + else if (t.getType(i) == DataType.TUPLE) { - Mutation mutation = new Mutation(); - if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn - { - org.apache.cassandra.thrift.SuperColumn sc = new org.apache.cassandra.thrift.SuperColumn(); - sc.name = objToBB(pair.get(0)); - ArrayList<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>(); - for (Tuple subcol : (DefaultDataBag) pair.get(1)) - { - org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); - column.name = objToBB(subcol.get(0)); - column.value = objToBB(subcol.get(1)); - column.setTimestamp(System.currentTimeMillis() * 1000); - columns.add(column); - } - if (columns.isEmpty()) // a deletion - { - mutation.deletion = new Deletion(); - mutation.deletion.super_column = objToBB(pair.get(0)); - mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000); - } - else - { - sc.columns = columns; - mutation.column_or_supercolumn = new ColumnOrSuperColumn(); - mutation.column_or_supercolumn.super_column = sc; - } - } - else // assume column since it couldn't be anything else - { - if (pair.get(1) == null) - { - mutation.deletion = new Deletion(); - mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); - mutation.deletion.predicate.column_names = Arrays.asList(objToBB(pair.get(0))); - mutation.deletion.setTimestamp(System.currentTimeMillis() * 1000); - } - else - { - org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); - column.name = objToBB(pair.get(0)); - column.value = objToBB(pair.get(1)); - column.setTimestamp(System.currentTimeMillis() * 1000); - mutation.column_or_supercolumn = new ColumnOrSuperColumn(); - mutation.column_or_supercolumn.column = column; - } - } - mutationList.add(mutation); + Tuple inner = (Tuple) t.get(i); + if (inner.size() > 0) // may be empty, for an indexed column that wasn't present + mutationList.add(mutationFromTuple(inner)); } + else + throw new IOException("Output type was not a bag or a tuple"); } - catch (ClassCastException e) + if (mutationList.size() > 0) + writeMutations(key, mutationList); + } + + private Mutation mutationFromTuple(Tuple t) throws IOException + { + Mutation mutation = new Mutation(); + if (t.get(1) == null) { - throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e); + // TODO: optional deletion + mutation.deletion = new Deletion(); + mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate(); + mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0))); + mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); } + else + { + org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); + column.setName(objToBB(t.get(0))); + column.setValue(objToBB(t.get(1))); + column.setTimestamp(FBUtilities.timestampMicros()); + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.column = column; + } + return mutation; + } + + private void writeColumnsFromBag(ByteBuffer key, DefaultDataBag bag) throws IOException + { + List<Mutation> mutationList = new ArrayList<Mutation>(); + for (Tuple pair : bag) + { + Mutation mutation = new Mutation(); + if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn + { + SuperColumn sc = new SuperColumn(); + sc.setName(objToBB(pair.get(0))); + List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>(); + for (Tuple subcol : (DefaultDataBag) pair.get(1)) + { + org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); + column.setName(objToBB(subcol.get(0))); + column.setValue(objToBB(subcol.get(1))); + column.setTimestamp(FBUtilities.timestampMicros()); + columns.add(column); + } + if (columns.isEmpty()) // TODO: optional deletion + { + mutation.deletion = new Deletion(); + mutation.deletion.super_column = objToBB(pair.get(0)); + mutation.deletion.setTimestamp(FBUtilities.timestampMicros()); + } + else + { + sc.columns = columns; + mutation.column_or_supercolumn = new ColumnOrSuperColumn(); + mutation.column_or_supercolumn.super_column = sc; + } + } + else + mutation = mutationFromTuple(pair); + mutationList.add(mutation); + // for wide rows, we need to limit the amount of mutations we write at once + if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care + { + writeMutations(key, mutationList); + mutationList.clear(); + } + } + // write the last batch + if (mutationList.size() > 0) + writeMutations(key, mutationList); + } + + private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException + { try { - writer.write(key, mutationList); + writer.write(key, mutations); } catch (InterruptedException e) { - throw new IOException(e); + throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/test/populate-cli.txt ---------------------------------------------------------------------- diff --git a/contrib/pig/test/populate-cli.txt b/contrib/pig/test/populate-cli.txt new file mode 100644 index 0000000..665fba4 --- /dev/null +++ b/contrib/pig/test/populate-cli.txt @@ -0,0 +1,67 @@ +create keyspace PigTest; +use PigTest; +create column family SomeApp with +key_validation_class = UTF8Type and +default_validation_class = LexicalUUIDType and +comparator = UTF8Type and +column_metadata = +[ + {column_name: name, validation_class: UTF8Type, index_type: KEYS}, + {column_name: vote_type, validation_class: UTF8Type}, + {column_name: rating, validation_class: IntegerType}, + {column_name: score, validation_class: LongType}, + {column_name: percent, validation_class: FloatType}, + {column_name: atomic_weight, validation_class: DoubleType}, +]; + +create column family CopyOfSomeApp with +key_validation_class = UTF8Type and +default_validation_class = LexicalUUIDType and +comparator = UTF8Type and +column_metadata = +[ + {column_name: name, validation_class: UTF8Type, index_type: KEYS}, + {column_name: vote_type, validation_class: UTF8Type}, + {column_name: rating, validation_class: IntegerType}, + {column_name: score, validation_class: LongType}, + {column_name: percent, validation_class: FloatType}, + {column_name: atomic_weight, validation_class: DoubleType}, +]; + +set SomeApp['foo']['name'] = 'User Foo'; +set SomeApp['foo']['vote_type'] = 'like'; +set SomeApp['foo']['rating'] = 8; +set SomeApp['foo']['score'] = 125000; +set SomeApp['foo']['percent'] = '85.0'; +set SomeApp['foo']['atomic_weight'] = '2.7182818284590451'; + +set SomeApp['bar']['name'] = 'User Bar'; +set SomeApp['bar']['vote_type'] = 'like'; +set SomeApp['bar']['rating'] = 9; +set SomeApp['bar']['score'] = 15000; +set SomeApp['bar']['percent'] = '35.0'; +set SomeApp['bar']['atomic_weight'] = '3.1415926535897931'; + +set SomeApp['baz']['name'] = 'User Baz'; +set SomeApp['baz']['vote_type'] = 'dislike'; +set SomeApp['baz']['rating'] = 3; +set SomeApp['baz']['score'] = 512000; +set SomeApp['baz']['percent'] = '95.3'; +set SomeApp['baz']['atomic_weight'] = '1.61803399'; +set SomeApp['baz']['extra1'] = lexicaluuid(); +set SomeApp['baz']['extra2'] = lexicaluuid(); +set SomeApp['baz']['extra3'] = lexicaluuid(); + +set SomeApp['qux']['name'] = 'User Qux'; +set SomeApp['qux']['vote_type'] = 'dislike'; +set SomeApp['qux']['rating'] = 2; +set SomeApp['qux']['score'] = 12000; +set SomeApp['qux']['percent'] = '64.7'; +set SomeApp['qux']['atomic_weight'] = '0.660161815846869'; +set SomeApp['qux']['extra1'] = lexicaluuid(); +set SomeApp['qux']['extra2'] = lexicaluuid(); +set SomeApp['qux']['extra3'] = lexicaluuid(); +set SomeApp['qux']['extra4'] = lexicaluuid(); +set SomeApp['qux']['extra5'] = lexicaluuid(); +set SomeApp['qux']['extra6'] = lexicaluuid(); +set SomeApp['qux']['extra7'] = lexicaluuid(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48b29a6/contrib/pig/test/test_storage.pig ---------------------------------------------------------------------- diff --git a/contrib/pig/test/test_storage.pig b/contrib/pig/test/test_storage.pig new file mode 100644 index 0000000..22143dc --- /dev/null +++ b/contrib/pig/test/test_storage.pig @@ -0,0 +1,22 @@ +rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage(); +-- full copy +STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); +-- single tuple +onecol = FOREACH rows GENERATE key, percent; +STORE onecol INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); +-- bag only +other = FOREACH rows GENERATE key, columns; +STORE other INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); + + +-- filter +likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5; +dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0; + +-- store these too +STORE likes INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); +STORE dislikes_extras INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage(); + +-- filter to fully visible rows (no uuid columns) and dump +visible = FILTER rows BY COUNT(columns) == 0; +dump visible;
