Cassandra inferred schema and actual data don't match -----------------------------------------------------
Key: CASSANDRA-3371 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371 Project: Cassandra Issue Type: Bug Components: Hadoop Affects Versions: 0.8.7 Reporter: Pete Warden It's looking like there may be a mismatch between the schema that's being reported by the latest CassandraStorage.java, and the data that's actually returned. Here's an example: rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage(); DESCRIBE rows; rows: {key: chararray,columns: {(name: chararray,value: bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: chararray,value_pid: bytearray,matched_string: chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: bytearray,time: chararray,value_time: bytearray,vote_type: chararray,value_vote_type: bytearray,voter: chararray,value_voter: bytearray)}} DUMP rows; (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)}) getSchema() is reporting the columns as an inner bag of tuples, each of which contains 16 values. In fact, getNext() seems to return an inner bag containing 7 tuples, each of which contains two values. It appears that things got out of sync with this change: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083 See more discussion at: http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html Here's a patch I ended up creating for my own use, which gives the results I need (though it doesn't handle super-columns): DESCRIBE rows; rows: {cassandra_key: chararray,photo_owner: bytearray,pid: bytearray,place_matched_string: bytearray,src_big: bytearray,time: bytearray,vote_type: bytearray,voter: bytearray} Index: contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java =================================================================== --- contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (revision 1185044) +++ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (working copy) @@ -26,7 +26,7 @@ import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -122,15 +122,15 @@ assert key != null && cf != null; // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(2); + Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1); ArrayList<Tuple> columns = new ArrayList<Tuple>(); - tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); + int tupleIndex = 0; + tuple.set(tupleIndex++, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) { - columns.add(columnToTuple(entry.getKey(), entry.getValue(), cfDef)); + tuple.set(tupleIndex++, columnToTuple(entry.getKey(), entry.getValue(), cfDef)); } - tuple.set(1, new DefaultDataBag(columns)); return tuple; } catch (InterruptedException e) @@ -139,30 +139,22 @@ } } - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException + private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) throws IOException { - Tuple pair = TupleFactory.getInstance().newTuple(2); List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - setTupleValue(pair, 0, marshallers.get(0).compose(name)); if (col instanceof Column) { // standard if (validators.get(name) == null) - setTupleValue(pair, 1, marshallers.get(1).compose(col.value())); + return marshallers.get(1).compose(col.value()); else - setTupleValue(pair, 1, validators.get(name).compose(col.value())); - return pair; + return validators.get(name).compose(col.value()); } - // super - ArrayList<Tuple> subcols = new ArrayList<Tuple>(); - for (IColumn subcol : col.getSubColumns()) - subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); - - pair.set(1, new DefaultDataBag(subcols)); - return pair; + // super not currently handled + return null; } private void setTupleValue(Tuple pair, int position, Object value) throws ExecException @@ -312,62 +304,32 @@ // top-level schema, no type ResourceSchema schema = new ResourceSchema(); + ResourceFieldSchema[] tupleFields = new ResourceFieldSchema[cfDef.column_metadata.size()+1]; + int tupleIndex = 0; + // add key ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); - keyFieldSchema.setName("key"); + keyFieldSchema.setName("cassandra_key"); keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type + tupleFields[tupleIndex++] = keyFieldSchema; - // will become the bag of tuples - ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); - bagFieldSchema.setName("columns"); - bagFieldSchema.setType(DataType.BAG); - ResourceSchema bagSchema = new ResourceSchema(); - - List<AbstractType> marshallers = getDefaultMarshallers(cfDef); Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); - List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>(); - // default comparator/validator - ResourceSchema innerTupleSchema = new ResourceSchema(); - ResourceFieldSchema tupleField = new ResourceFieldSchema(); - tupleField.setType(DataType.TUPLE); - tupleField.setSchema(innerTupleSchema); - - ResourceFieldSchema colSchema = new ResourceFieldSchema(); - colSchema.setName("name"); - colSchema.setType(getPigType(marshallers.get(0))); - tupleFields.add(colSchema); - - ResourceFieldSchema valSchema = new ResourceFieldSchema(); - AbstractType validator = marshallers.get(1); - valSchema.setName("value"); - valSchema.setType(getPigType(validator)); - tupleFields.add(valSchema); - // defined validators/indexes for (ColumnDef cdef : cfDef.column_metadata) { - colSchema = new ResourceFieldSchema(); - colSchema.setName(new String(cdef.getName())); - colSchema.setType(getPigType(marshallers.get(0))); - tupleFields.add(colSchema); - - valSchema = new ResourceFieldSchema(); - validator = validators.get(cdef.getName()); + ResourceFieldSchema valSchema = new ResourceFieldSchema(); + AbstractType validator = validators.get(cdef.getName()); if (validator == null) validator = marshallers.get(1); - valSchema.setName("value"); + valSchema.setName(new String(cdef.getName())); valSchema.setType(getPigType(validator)); - tupleFields.add(valSchema); + tupleFields[tupleIndex++] = valSchema; } - innerTupleSchema.setFields(tupleFields.toArray(new ResourceFieldSchema[tupleFields.size()])); - // a bag can contain only one tuple, but that tuple can contain anything - bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); - bagFieldSchema.setSchema(bagSchema); // top level schema contains everything - schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, bagFieldSchema }); + schema.setFields(tupleFields); return schema; } @@ -601,7 +563,7 @@ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory()); try { - return Hex.bytesToHex(serializer.serialize(cfDef)); + return FBUtilities.bytesToHex(serializer.serialize(cfDef)); } catch (TException e) { @@ -616,7 +578,7 @@ CfDef cfDef = new CfDef(); try { - deserializer.deserialize(cfDef, Hex.hexToBytes(st)); + deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st)); } catch (TException e) { -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira