Updated Branches: refs/heads/ACCUMULO-1783 [created] 30fd9aa6c
ACCUMULO-1783 Clean up the "typed" variant from before and supersede the original AccumuloStorage. Going off of what HBaseStorage provides, the typical usecase is treating each tuple as a "row" and inserting the multiple columns from a bag or likewise as columns in that row. Trying to move towards this, I renamed the AccumuloStorage into AccumuloKVStorage to make it obvious that this storage engine is purely providing a Key/Value storage mechanism and nothing more. Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/294f9ce8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/294f9ce8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/294f9ce8 Branch: refs/heads/ACCUMULO-1783 Commit: 294f9ce8498db614dac198aaf62d3023c7d9b02d Parents: c25f26c Author: Josh Elser <els...@apache.org> Authored: Wed Oct 23 16:48:37 2013 -0700 Committer: Josh Elser <els...@apache.org> Committed: Wed Oct 23 16:48:37 2013 -0700 ---------------------------------------------------------------------- .../accumulo/pig/AbstractAccumuloStorage.java | 13 +- .../apache/accumulo/pig/AccumuloKVStorage.java | 270 +++++++++++++++++++ .../apache/accumulo/pig/AccumuloStorage.java | 83 ------ .../accumulo/pig/TypedAccumuloStorage.java | 207 -------------- .../accumulo/pig/AccumuloKVStorageTest.java | 134 +++++++++ .../accumulo/pig/AccumuloStorageTest.java | 104 ------- 6 files changed, 416 insertions(+), 395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java index d26cf40..0424b8a 100644 --- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; @@ -46,6 +47,7 @@ import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; /** * A LoadStoreFunc for retrieving data from and storing data to Accumulo @@ -80,6 +82,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF long maxMutationBufferSize = 10 * 1000 * 1000; int maxLatency = 10 * 1000; + protected String contextSignature = null; + public AbstractAccumuloStorage() {} @Override @@ -213,12 +217,19 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF @Override public void setUDFContextSignature(String signature) { - + this.contextSignature = signature; } /* StoreFunc methods */ public void setStoreFuncUDFContextSignature(String signature) { + this.contextSignature = signature; + } + /** + * Returns UDFProperties based on <code>contextSignature</code>. + */ + protected Properties getUDFProperties() { + return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature}); } public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java new file mode 100644 index 0000000..8a17e8b --- /dev/null +++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.pig; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.pig.LoadStoreCaster; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.builtin.Utf8StorageConverter; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.ObjectSerializer; +import org.joda.time.DateTime; + +/** + * A LoadStoreFunc for retrieving data from and storing data to Accumulo. + * + * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a + * long. + * + * <p>Tuples require at least key, column family, column qualifier and value; however column visibility or column visibility and timestamp may also be + * provided:</p> + * + * <ul> + * <li>(key, colfam, colqual, value)</li> + * <li>(key, colfam, colqual, colvis, value)</li> + * <li>(key, colfam, colqual, colvis, timestamp, value)</li> + * </ul> + */ +public class AccumuloKVStorage extends AbstractAccumuloStorage { + private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class); + protected LoadStoreCaster caster; + + private ResourceSchema schema; + + public AccumuloKVStorage() { + this.caster = new Utf8StorageConverter(); + } + + @Override + protected Tuple getTuple(Key key, Value value) throws IOException { + // and wrap it in a tuple + Tuple tuple = TupleFactory.getInstance().newTuple(6); + tuple.set(0, new DataByteArray(key.getRow().getBytes())); + tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); + tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); + tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes())); + tuple.set(4, new Long(key.getTimestamp())); + tuple.set(5, new DataByteArray(value.get())); + return tuple; + } + + @Override + public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException { + ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); + + Text t = tupleToText(tuple, 0, fieldSchemas); + + Mutation mut = new Mutation(t); + Text cf = tupleToText(tuple, 1, fieldSchemas); + Text cq = tupleToText(tuple, 2, fieldSchemas); + + if (4 == tuple.size()) { + byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas); + Value val = new Value(valueBytes); + + mut.put(cf, cq, val); + } else if (5 == tuple.size()) { + Text cv = tupleToText(tuple, 3, fieldSchemas); + + byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas); + + Value val = new Value(valueBytes); + if (cv.getLength() == 0) { + mut.put(cf, cq, val); + } else { + mut.put(cf, cq, new ColumnVisibility(cv), val); + } + } else { + if (6 < tuple.size()) { + LOG.debug("Ignoring additional entries in tuple of length " + tuple.size()); + } + + Text cv = tupleToText(tuple, 3, fieldSchemas); + + long ts = objToLong(tuple, 4, fieldSchemas); + + byte[] valueBytes = tupleToBytes(tuple, 5, fieldSchemas); + + Value val = new Value(valueBytes); + if (cv.getLength() == 0) { + mut.put(cf, cq, val); + } else { + mut.put(cf, cq, new ColumnVisibility(cv), ts, val); + } + } + + return Collections.singleton(mut); + } + + @Override + public void checkSchema(ResourceSchema s) throws IOException { + if (!(caster instanceof LoadStoreCaster)) { + LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo."); + throw new IOException("Bad Caster " + caster.getClass()); + } + schema = s; + getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema)); + } + + private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { + Object o = tuple.get(i); + byte type = schemaToType(o, i, fieldSchemas); + + return objToText(o, type); + } + + private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) { + return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); + } + + private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { + Object o = tuple.get(i); + byte type = schemaToType(o, i, fieldSchemas); + + return objToBytes(o, type); + + } + + private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { + Object o = tuple.get(i); + byte type = schemaToType(o, i, fieldSchemas); + + switch (type) { + case DataType.LONG: + return (Long) o; + case DataType.CHARARRAY: + String timestampString = (String) o; + try { + return Long.parseLong(timestampString); + } catch (NumberFormatException e) { + final String msg = "Could not cast chararray into long: " + timestampString; + LOG.error(msg); + throw new IOException(msg, e); + } + case DataType.DOUBLE: + Double doubleTimestamp = (Double) o; + return doubleTimestamp.longValue(); + case DataType.FLOAT: + Float floatTimestamp = (Float) o; + return floatTimestamp.longValue(); + case DataType.INTEGER: + Integer intTimestamp = (Integer) o; + return intTimestamp.longValue(); + case DataType.BIGINTEGER: + BigInteger bigintTimestamp = (BigInteger) o; + long longTimestamp = bigintTimestamp.longValue(); + + BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp); + + if (!recreatedTimestamp.equals(bigintTimestamp)) { + LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp); + } + + return longTimestamp; + case DataType.BIGDECIMAL: + BigDecimal bigdecimalTimestamp = (BigDecimal) o; + try { + return bigdecimalTimestamp.longValueExact(); + } catch (ArithmeticException e) { + long convertedLong = bigdecimalTimestamp.longValue(); + LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was " + bigdecimalTimestamp + " but is now " + convertedLong); + return convertedLong; + } + case DataType.BYTEARRAY: + DataByteArray bytes = (DataByteArray) o; + try { + return Long.parseLong(bytes.toString()); + } catch (NumberFormatException e) { + final String msg = "Could not cast bytes into long: " + bytes.toString(); + LOG.error(msg); + throw new IOException(msg, e); + } + default: + LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long."); + throw new IOException("Could not convert " + o.getClass() + " into long"); + + } + } + + private Text objToText(Object o, byte type) throws IOException { + return new Text(objToBytes(o, type)); + } + + @SuppressWarnings("unchecked") + private byte[] objToBytes(Object o, byte type) throws IOException { + if (o == null) + return null; + switch (type) { + case DataType.BYTEARRAY: + return ((DataByteArray) o).get(); + case DataType.BAG: + return caster.toBytes((DataBag) o); + case DataType.CHARARRAY: + return caster.toBytes((String) o); + case DataType.DOUBLE: + return caster.toBytes((Double) o); + case DataType.FLOAT: + return caster.toBytes((Float) o); + case DataType.INTEGER: + return caster.toBytes((Integer) o); + case DataType.LONG: + return caster.toBytes((Long) o); + case DataType.BIGINTEGER: + return caster.toBytes((BigInteger) o); + case DataType.BIGDECIMAL: + return caster.toBytes((BigDecimal) o); + case DataType.BOOLEAN: + return caster.toBytes((Boolean) o); + case DataType.DATETIME: + return caster.toBytes((DateTime) o); + + // The type conversion here is unchecked. + // Relying on DataType.findType to do the right thing. + case DataType.MAP: + return caster.toBytes((Map<String,Object>) o); + + case DataType.NULL: + return null; + case DataType.TUPLE: + return caster.toBytes((Tuple) o); + case DataType.ERROR: + throw new IOException("Unable to determine type of " + o.getClass()); + default: + throw new IOException("Unable to find a converter for tuple field " + o); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java deleted file mode 100644 index 15b1c47..0000000 --- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.accumulo.pig; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Accumulo - * - * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a - * long. - * - * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value) - * - */ -public class AccumuloStorage extends AbstractAccumuloStorage { - private static final Log LOG = LogFactory.getLog(AccumuloStorage.class); - - public AccumuloStorage() {} - - @Override - protected Tuple getTuple(Key key, Value value) throws IOException { - // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(6); - tuple.set(0, new DataByteArray(key.getRow().getBytes())); - tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); - tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); - tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes())); - tuple.set(4, new Long(key.getTimestamp())); - tuple.set(5, new DataByteArray(value.get())); - return tuple; - } - - @Override - public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException { - Mutation mut = new Mutation(Utils.objToText(tuple.get(0))); - Text cf = Utils.objToText(tuple.get(1)); - Text cq = Utils.objToText(tuple.get(2)); - - if (tuple.size() > 4) { - Text cv = Utils.objToText(tuple.get(3)); - Value val = new Value(Utils.objToBytes(tuple.get(4))); - if (cv.getLength() == 0) { - mut.put(cf, cq, val); - } else { - mut.put(cf, cq, new ColumnVisibility(cv), val); - } - } else { - Value val = new Value(Utils.objToBytes(tuple.get(3))); - mut.put(cf, cq, val); - } - - return Collections.singleton(mut); - } -} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java deleted file mode 100644 index 30c39c9..0000000 --- a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.accumulo.pig; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.pig.LoadStoreCaster; -import org.apache.pig.ResourceSchema; -import org.apache.pig.LoadPushDown.RequiredFieldList; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.builtin.Utf8StorageConverter; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.util.ObjectSerializer; -import org.apache.pig.impl.util.UDFContext; -import org.joda.time.DateTime; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Accumulo - * - * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a - * long. - * - * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR (key, colfam, colqual, value) - * - */ -public class TypedAccumuloStorage extends AbstractAccumuloStorage { - private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class); - protected LoadStoreCaster caster; - protected String contextSignature = null; - - private ResourceSchema schema_; - private RequiredFieldList requiredFieldList; - - public TypedAccumuloStorage() { - this.caster = new Utf8StorageConverter(); - } - - @Override - protected Tuple getTuple(Key key, Value value) throws IOException { - // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(6); - tuple.set(0, new DataByteArray(key.getRow().getBytes())); - tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); - tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); - tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes())); - tuple.set(4, new Long(key.getTimestamp())); - tuple.set(5, new DataByteArray(value.get())); - return tuple; - } - - @Override - public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException { - ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields(); - - Text t = tupleToText(tuple, 0, fieldSchemas); - - Mutation mut = new Mutation(t); - Text cf = tupleToText(tuple, 1, fieldSchemas); - Text cq = tupleToText(tuple, 2, fieldSchemas); - - if (tuple.size() > 4) { - Text cv = tupleToText(tuple, 3, fieldSchemas); - - byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas); - - Value val = new Value(valueBytes); - if (cv.getLength() == 0) { - mut.put(cf, cq, val); - } else { - mut.put(cf, cq, new ColumnVisibility(cv), val); - } - } else { - byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas); - Value val = new Value(valueBytes); - mut.put(cf, cq, val); - } - - return Collections.singleton(mut); - } - - @Override - public void setUDFContextSignature(String signature) { - this.contextSignature = signature; - } - - @Override - public void setStoreFuncUDFContextSignature(String signature) { - this.contextSignature = signature; - } - - /** - * Returns UDFProperties based on <code>contextSignature</code>. - */ - private Properties getUDFProperties() { - return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature}); - } - - @Override - public void checkSchema(ResourceSchema s) throws IOException { - if (!(caster instanceof LoadStoreCaster)) { - LOG.error("Caster must implement LoadStoreCaster for writing to HBase."); - throw new IOException("Bad Caster " + caster.getClass()); - } - schema_ = s; - getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_)); - } - - private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { - Object o = tuple.get(i); - byte type = schemaToType(o, i, fieldSchemas); - - return objToText(o, type); - } - - private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) { - return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); - } - - private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { - Object o = tuple.get(i); - byte type = schemaToType(o, i, fieldSchemas); - - return objToBytes(o, type); - - } - - private Text objToText(Object o, byte type) throws IOException { - return new Text(objToBytes(o, type)); - } - - @SuppressWarnings("unchecked") - private byte[] objToBytes(Object o, byte type) throws IOException { - if (o == null) - return null; - switch (type) { - case DataType.BYTEARRAY: - return ((DataByteArray) o).get(); - case DataType.BAG: - return caster.toBytes((DataBag) o); - case DataType.CHARARRAY: - return caster.toBytes((String) o); - case DataType.DOUBLE: - return caster.toBytes((Double) o); - case DataType.FLOAT: - return caster.toBytes((Float) o); - case DataType.INTEGER: - return caster.toBytes((Integer) o); - case DataType.LONG: - return caster.toBytes((Long) o); - case DataType.BIGINTEGER: - return caster.toBytes((BigInteger) o); - case DataType.BIGDECIMAL: - return caster.toBytes((BigDecimal) o); - case DataType.BOOLEAN: - return caster.toBytes((Boolean) o); - case DataType.DATETIME: - return caster.toBytes((DateTime) o); - - // The type conversion here is unchecked. - // Relying on DataType.findType to do the right thing. - case DataType.MAP: - return caster.toBytes((Map<String,Object>) o); - - case DataType.NULL: - return null; - case DataType.TUPLE: - return caster.toBytes((Tuple) o); - case DataType.ERROR: - throw new IOException("Unable to determine type of " + o.getClass()); - default: - throw new IOException("Unable to find a converter for tuple field " + o); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java new file mode 100644 index 0000000..8adbb52 --- /dev/null +++ b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.pig; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.junit.Assert; +import org.junit.Test; + +public class AccumuloKVStorageTest { + + @Test + public void testGetMutations4() throws Exception { + AccumuloKVStorage s = new AccumuloKVStorage(); + + Tuple tuple = TupleFactory.getInstance().newTuple(4); + tuple.set(0, "row1"); + tuple.set(1, "cf1"); + tuple.set(2, "cq1"); + tuple.set(3, "val1"); + + Collection<Mutation> muts = s.getMutations(tuple); + + assertNotNull(muts); + assertEquals(1, muts.size()); + Mutation mut = muts.iterator().next(); + List<ColumnUpdate> updates = mut.getUpdates(); + assertEquals(1, updates.size()); + ColumnUpdate update = updates.get(0); + + assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow())); + assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily())); + assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier())); + assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue())); + assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility())); + } + + @Test + public void testGetMutations5() throws Exception { + AccumuloKVStorage s = new AccumuloKVStorage(); + + Tuple tuple = TupleFactory.getInstance().newTuple(5); + tuple.set(0, "row1"); + tuple.set(1, "cf1"); + tuple.set(2, "cq1"); + tuple.set(3, "cv1"); + tuple.set(4, "val1"); + + Collection<Mutation> muts = s.getMutations(tuple); + + assertNotNull(muts); + assertEquals(1, muts.size()); + Mutation mut = muts.iterator().next(); + List<ColumnUpdate> updates = mut.getUpdates(); + assertEquals(1, updates.size()); + ColumnUpdate update = updates.get(0); + + assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow())); + assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily())); + assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier())); + assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility())); + assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue())); + } + + @Test + public void testGetMutations6() throws Exception { + AccumuloKVStorage s = new AccumuloKVStorage(); + + Tuple tuple = TupleFactory.getInstance().newTuple(6); + tuple.set(0, "row"); + tuple.set(1, "cf"); + tuple.set(2, "cq"); + tuple.set(3, "cv"); + tuple.set(4, new Long(1)); + tuple.set(5, "value"); + + Collection<Mutation> mutations = s.getMutations(tuple); + Assert.assertNotNull(mutations); + Assert.assertEquals(1, mutations.size()); + Mutation m = mutations.iterator().next(); + + List<ColumnUpdate> updates = m.getUpdates(); + Assert.assertEquals(1, updates.size()); + ColumnUpdate update = updates.get(0); + + assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), m.getRow())); + assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily())); + assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier())); + assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility())); + assertEquals(((Long) tuple.get(4)).longValue(), update.getTimestamp()); + assertTrue(Arrays.equals(((String) tuple.get(5)).getBytes(), update.getValue())); + } + + @Test + public void testGetTuple() throws Exception { + AccumuloKVStorage s = new AccumuloKVStorage(); + + Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L); + Value value = new Value("val1".getBytes()); + Tuple tuple = s.getTuple(key, value); + TestUtils.assertKeyValueEqualsTuple(key, value, tuple); + + key = new Key("row1", "cf1", "cq1"); + value = new Value("val1".getBytes()); + tuple = s.getTuple(key, value); + TestUtils.assertKeyValueEqualsTuple(key, value, tuple); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java deleted file mode 100644 index fbd68c6..0000000 --- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.accumulo.pig; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import org.apache.accumulo.core.data.ColumnUpdate; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.junit.Test; - -public class AccumuloStorageTest { - - @Test - public void testGetMutations4() throws Exception { - AccumuloStorage s = new AccumuloStorage(); - - Tuple tuple = TupleFactory.getInstance().newTuple(4); - tuple.set(0, "row1"); - tuple.set(1, "cf1"); - tuple.set(2, "cq1"); - tuple.set(3, "val1"); - - Collection<Mutation> muts = s.getMutations(tuple); - - assertNotNull(muts); - assertEquals(1, muts.size()); - Mutation mut = muts.iterator().next(); - List<ColumnUpdate> updates = mut.getUpdates(); - assertEquals(1, updates.size()); - ColumnUpdate update = updates.get(0); - - assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow())); - assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily())); - assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier())); - assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getValue())); - assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility())); - } - - @Test - public void testGetMutations5() throws Exception { - AccumuloStorage s = new AccumuloStorage(); - - Tuple tuple = TupleFactory.getInstance().newTuple(5); - tuple.set(0, "row1"); - tuple.set(1, "cf1"); - tuple.set(2, "cq1"); - tuple.set(3, "cv1"); - tuple.set(4, "val1"); - - Collection<Mutation> muts = s.getMutations(tuple); - - assertNotNull(muts); - assertEquals(1, muts.size()); - Mutation mut = muts.iterator().next(); - List<ColumnUpdate> updates = mut.getUpdates(); - assertEquals(1, updates.size()); - ColumnUpdate update = updates.get(0); - - assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), mut.getRow())); - assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), update.getColumnFamily())); - assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), update.getColumnQualifier())); - assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), update.getColumnVisibility())); - assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), update.getValue())); - } - - @Test - public void testGetTuple() throws Exception { - AccumuloStorage s = new AccumuloStorage(); - - Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L); - Value value = new Value("val1".getBytes()); - Tuple tuple = s.getTuple(key, value); - TestUtils.assertKeyValueEqualsTuple(key, value, tuple); - - key = new Key("row1", "cf1", "cq1"); - value = new Value("val1".getBytes()); - tuple = s.getTuple(key, value); - TestUtils.assertKeyValueEqualsTuple(key, value, tuple); - } -}