Updated Branches:
  refs/heads/ACCUMULO-1783 [created] 30fd9aa6c

ACCUMULO-1783 Clean up the "typed" variant from before and supersede the
original AccumuloStorage.

Going off of what HBaseStorage provides, the typical usecase is
treating each tuple as a "row" and inserting the multiple columns from a
bag or likewise as columns in that row. Trying to move towards this, I
renamed the AccumuloStorage into AccumuloKVStorage to make it obvious
that this storage engine is purely providing a Key/Value storage
mechanism and nothing more.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/294f9ce8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/294f9ce8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/294f9ce8

Branch: refs/heads/ACCUMULO-1783
Commit: 294f9ce8498db614dac198aaf62d3023c7d9b02d
Parents: c25f26c
Author: Josh Elser <els...@apache.org>
Authored: Wed Oct 23 16:48:37 2013 -0700
Committer: Josh Elser <els...@apache.org>
Committed: Wed Oct 23 16:48:37 2013 -0700

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   |  13 +-
 .../apache/accumulo/pig/AccumuloKVStorage.java  | 270 +++++++++++++++++++
 .../apache/accumulo/pig/AccumuloStorage.java    |  83 ------
 .../accumulo/pig/TypedAccumuloStorage.java      | 207 --------------
 .../accumulo/pig/AccumuloKVStorageTest.java     | 134 +++++++++
 .../accumulo/pig/AccumuloStorageTest.java       | 104 -------
 6 files changed, 416 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index d26cf40..0424b8a 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
@@ -46,6 +47,7 @@ import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -80,6 +82,8 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
   
+  protected String contextSignature = null;
+  
   public AbstractAccumuloStorage() {}
   
   @Override
@@ -213,12 +217,19 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
   
   @Override
   public void setUDFContextSignature(String signature) {
-    
+    this.contextSignature = signature;
   }
   
   /* StoreFunc methods */
   public void setStoreFuncUDFContextSignature(String signature) {
+    this.contextSignature = signature;
     
+  }  
+  /**
+   * Returns UDFProperties based on <code>contextSignature</code>.
+   */
+  protected Properties getUDFProperties() {
+    return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new 
String[] {contextSignature});
   }
   
   public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java 
b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
new file mode 100644
index 0000000..8a17e8b
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.pig.LoadStoreCaster;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Accumulo.
+ * 
+ * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, 
timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
+ * long.
+ * 
+ * <p>Tuples require at least key, column family, column qualifier and value; 
however column visibility or column visibility and timestamp may also be
+ * provided:</p>
+ * 
+ * <ul>
+ * <li>(key, colfam, colqual, value)</li>
+ * <li>(key, colfam, colqual, colvis, value)</li>
+ * <li>(key, colfam, colqual, colvis, timestamp, value)</li> 
+ * </ul>
+ */
+public class AccumuloKVStorage extends AbstractAccumuloStorage {
+  private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
+  protected LoadStoreCaster caster;
+  
+  private ResourceSchema schema;
+  
+  public AccumuloKVStorage() {
+    this.caster = new Utf8StorageConverter();
+  }
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
+    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
+    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
+    tuple.set(4, new Long(key.getTimestamp()));
+    tuple.set(5, new DataByteArray(value.get()));
+    return tuple;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, 
IOException {
+    ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : 
schema.getFields();
+    
+    Text t = tupleToText(tuple, 0, fieldSchemas);
+    
+    Mutation mut = new Mutation(t);
+    Text cf = tupleToText(tuple, 1, fieldSchemas);
+    Text cq = tupleToText(tuple, 2, fieldSchemas);
+    
+    if (4 == tuple.size()) {
+      byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
+      Value val = new Value(valueBytes);
+      
+      mut.put(cf, cq, val);
+    } else if (5 == tuple.size()) {
+      Text cv = tupleToText(tuple, 3, fieldSchemas);
+      
+      byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
+      
+      Value val = new Value(valueBytes);
+      if (cv.getLength() == 0) {
+        mut.put(cf, cq, val);
+      } else {
+        mut.put(cf, cq, new ColumnVisibility(cv), val);
+      }
+    } else {
+      if (6 < tuple.size()) {
+        LOG.debug("Ignoring additional entries in tuple of length " + 
tuple.size());
+      }
+      
+      Text cv = tupleToText(tuple, 3, fieldSchemas);
+      
+      long ts = objToLong(tuple, 4, fieldSchemas);
+      
+      byte[] valueBytes = tupleToBytes(tuple, 5, fieldSchemas);
+      
+      Value val = new Value(valueBytes);
+      if (cv.getLength() == 0) {
+        mut.put(cf, cq, val);
+      } else {
+        mut.put(cf, cq, new ColumnVisibility(cv), ts, val);
+      }
+    }
+    
+    return Collections.singleton(mut);
+  }
+  
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+    if (!(caster instanceof LoadStoreCaster)) {
+      LOG.error("Caster must implement LoadStoreCaster for writing to 
Accumulo.");
+      throw new IOException("Bad Caster " + caster.getClass());
+    }
+    schema = s;
+    getUDFProperties().setProperty(contextSignature + "_schema", 
ObjectSerializer.serialize(schema));
+  }
+  
+  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] 
fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToText(o, type);
+  }
+  
+  private byte schemaToType(Object o, int i, ResourceFieldSchema[] 
fieldSchemas) {
+    return (fieldSchemas == null) ? DataType.findType(o) : 
fieldSchemas[i].getType();
+  }
+  
+  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] 
fieldSchemas) throws IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToBytes(o, type);
+    
+  }
+  
+  private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] 
fieldSchemas) throws IOException { 
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    switch (type) {
+      case DataType.LONG:
+        return (Long) o;
+      case DataType.CHARARRAY:
+        String timestampString = (String) o;
+        try {
+          return Long.parseLong(timestampString);
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast chararray into long: " + 
timestampString;
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      case DataType.DOUBLE:
+        Double doubleTimestamp = (Double) o;
+        return doubleTimestamp.longValue();
+      case DataType.FLOAT:
+        Float floatTimestamp = (Float) o;
+        return floatTimestamp.longValue();
+      case DataType.INTEGER:
+        Integer intTimestamp = (Integer) o;
+        return intTimestamp.longValue();
+      case DataType.BIGINTEGER:
+        BigInteger bigintTimestamp = (BigInteger) o;
+        long longTimestamp = bigintTimestamp.longValue();
+        
+        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+        
+        if (!recreatedTimestamp.equals(bigintTimestamp)) {
+          LOG.warn("Downcasting BigInteger into Long results in a change of 
the original value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+        }
+        
+        return longTimestamp;
+      case DataType.BIGDECIMAL:
+        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+        try {
+          return bigdecimalTimestamp.longValueExact();
+        } catch (ArithmeticException e) {
+          long convertedLong = bigdecimalTimestamp.longValue();
+          LOG.warn("Downcasting BigDecimal into Long results in a loss of 
information. Was " + bigdecimalTimestamp + " but is now " + convertedLong);
+          return convertedLong;
+        }
+      case DataType.BYTEARRAY:
+        DataByteArray bytes = (DataByteArray) o;
+        try {
+          return Long.parseLong(bytes.toString());
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast bytes into long: " + 
bytes.toString();
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      default:
+        LOG.error("Could not convert " + o + " of class " + o.getClass() + " 
into long.");
+        throw new IOException("Could not convert " + o.getClass() + " into 
long");
+        
+    }
+  }
+  
+  private Text objToText(Object o, byte type) throws IOException {
+    return new Text(objToBytes(o, type));
+  }
+  
+  @SuppressWarnings("unchecked")
+  private byte[] objToBytes(Object o, byte type) throws IOException {
+    if (o == null)
+      return null;
+    switch (type) {
+      case DataType.BYTEARRAY:
+        return ((DataByteArray) o).get();
+      case DataType.BAG:
+        return caster.toBytes((DataBag) o);
+      case DataType.CHARARRAY:
+        return caster.toBytes((String) o);
+      case DataType.DOUBLE:
+        return caster.toBytes((Double) o);
+      case DataType.FLOAT:
+        return caster.toBytes((Float) o);
+      case DataType.INTEGER:
+        return caster.toBytes((Integer) o);
+      case DataType.LONG:
+        return caster.toBytes((Long) o);
+      case DataType.BIGINTEGER:
+        return caster.toBytes((BigInteger) o);
+      case DataType.BIGDECIMAL:
+        return caster.toBytes((BigDecimal) o);
+      case DataType.BOOLEAN:
+        return caster.toBytes((Boolean) o);
+      case DataType.DATETIME:
+        return caster.toBytes((DateTime) o);
+        
+        // The type conversion here is unchecked.
+        // Relying on DataType.findType to do the right thing.
+      case DataType.MAP:
+        return caster.toBytes((Map<String,Object>) o);
+        
+      case DataType.NULL:
+        return null;
+      case DataType.TUPLE:
+        return caster.toBytes((Tuple) o);
+      case DataType.ERROR:
+        throw new IOException("Unable to determine type of " + o.getClass());
+      default:
+        throw new IOException("Unable to find a converter for tuple field " + 
o);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
deleted file mode 100644
index 15b1c47..0000000
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * 
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, 
timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- * 
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR 
(key, colfam, colqual, value)
- * 
- */
-public class AccumuloStorage extends AbstractAccumuloStorage {
-  private static final Log LOG = LogFactory.getLog(AccumuloStorage.class);
-  
-  public AccumuloStorage() {}
-  
-  @Override
-  protected Tuple getTuple(Key key, Value value) throws IOException {
-    // and wrap it in a tuple
-    Tuple tuple = TupleFactory.getInstance().newTuple(6);
-    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-    tuple.set(4, new Long(key.getTimestamp()));
-    tuple.set(5, new DataByteArray(value.get()));
-    return tuple;
-  }
-  
-  @Override
-  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, 
IOException {
-    Mutation mut = new Mutation(Utils.objToText(tuple.get(0)));
-    Text cf = Utils.objToText(tuple.get(1));
-    Text cq = Utils.objToText(tuple.get(2));
-    
-    if (tuple.size() > 4) {
-      Text cv = Utils.objToText(tuple.get(3));
-      Value val = new Value(Utils.objToBytes(tuple.get(4)));
-      if (cv.getLength() == 0) {
-        mut.put(cf, cq, val);
-      } else {
-        mut.put(cf, cq, new ColumnVisibility(cv), val);
-      }
-    } else {
-      Value val = new Value(Utils.objToBytes(tuple.get(3)));
-      mut.put(cf, cq, val);
-    }
-    
-    return Collections.singleton(mut);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
deleted file mode 100644
index 30c39c9..0000000
--- a/src/main/java/org/apache/accumulo/pig/TypedAccumuloStorage.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
-import org.joda.time.DateTime;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Accumulo
- * 
- * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, 
timestamp, value). All fields except timestamp are DataByteArray, timestamp is a
- * long.
- * 
- * Tuples can be written in 2 forms: (key, colfam, colqual, colvis, value) OR 
(key, colfam, colqual, value)
- * 
- */
-public class TypedAccumuloStorage extends AbstractAccumuloStorage {
-  private static final Log LOG = LogFactory.getLog(TypedAccumuloStorage.class);
-  protected LoadStoreCaster caster;
-  protected String contextSignature = null;
-  
-  private ResourceSchema schema_;
-  private RequiredFieldList requiredFieldList;
-  
-  public TypedAccumuloStorage() {
-    this.caster = new Utf8StorageConverter();
-  }
-  
-  @Override
-  protected Tuple getTuple(Key key, Value value) throws IOException {
-    // and wrap it in a tuple
-    Tuple tuple = TupleFactory.getInstance().newTuple(6);
-    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
-    tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes()));
-    tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes()));
-    tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes()));
-    tuple.set(4, new Long(key.getTimestamp()));
-    tuple.set(5, new DataByteArray(value.get()));
-    return tuple;
-  }
-  
-  @Override
-  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, 
IOException {
-    ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : 
schema_.getFields();
-    
-    Text t = tupleToText(tuple, 0, fieldSchemas);
-    
-    Mutation mut = new Mutation(t);
-    Text cf = tupleToText(tuple, 1, fieldSchemas);
-    Text cq = tupleToText(tuple, 2, fieldSchemas);
-    
-    if (tuple.size() > 4) {
-      Text cv = tupleToText(tuple, 3, fieldSchemas);
-      
-      byte[] valueBytes = tupleToBytes(tuple, 4, fieldSchemas);
-      
-      Value val = new Value(valueBytes);
-      if (cv.getLength() == 0) {
-        mut.put(cf, cq, val);
-      } else {
-        mut.put(cf, cq, new ColumnVisibility(cv), val);
-      }
-    } else {
-      byte[] valueBytes = tupleToBytes(tuple, 3, fieldSchemas);
-      Value val = new Value(valueBytes);
-      mut.put(cf, cq, val);
-    }
-    
-    return Collections.singleton(mut);
-  }
-  
-  @Override
-  public void setUDFContextSignature(String signature) {
-    this.contextSignature = signature;
-  }
-  
-  @Override
-  public void setStoreFuncUDFContextSignature(String signature) {
-    this.contextSignature = signature;
-  }
-  
-  /**
-   * Returns UDFProperties based on <code>contextSignature</code>.
-   */
-  private Properties getUDFProperties() {
-    return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new 
String[] {contextSignature});
-  }
-  
-  @Override
-  public void checkSchema(ResourceSchema s) throws IOException {
-    if (!(caster instanceof LoadStoreCaster)) {
-      LOG.error("Caster must implement LoadStoreCaster for writing to HBase.");
-      throw new IOException("Bad Caster " + caster.getClass());
-    }
-    schema_ = s;
-    getUDFProperties().setProperty(contextSignature + "_schema", 
ObjectSerializer.serialize(schema_));
-  }
-  
-  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] 
fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToText(o, type);
-  }
-  
-  private byte schemaToType(Object o, int i, ResourceFieldSchema[] 
fieldSchemas) {
-    return (fieldSchemas == null) ? DataType.findType(o) : 
fieldSchemas[i].getType();
-  }
-  
-  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] 
fieldSchemas) throws IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToBytes(o, type);
-    
-  }
-  
-  private Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
-  }
-  
-  @SuppressWarnings("unchecked")
-  private byte[] objToBytes(Object o, byte type) throws IOException {
-    if (o == null)
-      return null;
-    switch (type) {
-      case DataType.BYTEARRAY:
-        return ((DataByteArray) o).get();
-      case DataType.BAG:
-        return caster.toBytes((DataBag) o);
-      case DataType.CHARARRAY:
-        return caster.toBytes((String) o);
-      case DataType.DOUBLE:
-        return caster.toBytes((Double) o);
-      case DataType.FLOAT:
-        return caster.toBytes((Float) o);
-      case DataType.INTEGER:
-        return caster.toBytes((Integer) o);
-      case DataType.LONG:
-        return caster.toBytes((Long) o);
-      case DataType.BIGINTEGER:
-        return caster.toBytes((BigInteger) o);
-      case DataType.BIGDECIMAL:
-        return caster.toBytes((BigDecimal) o);
-      case DataType.BOOLEAN:
-        return caster.toBytes((Boolean) o);
-      case DataType.DATETIME:
-        return caster.toBytes((DateTime) o);
-        
-        // The type conversion here is unchecked.
-        // Relying on DataType.findType to do the right thing.
-      case DataType.MAP:
-        return caster.toBytes((Map<String,Object>) o);
-        
-      case DataType.NULL:
-        return null;
-      case DataType.TUPLE:
-        return caster.toBytes((Tuple) o);
-      case DataType.ERROR:
-        throw new IOException("Unable to determine type of " + o.getClass());
-      default:
-        throw new IOException("Unable to find a converter for tuple field " + 
o);
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java 
b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
new file mode 100644
index 0000000..8adbb52
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloKVStorageTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.accumulo.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AccumuloKVStorageTest {
+  
+  @Test
+  public void testGetMutations4() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(4);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), 
mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), 
update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), 
update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), 
update.getValue()));
+    assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
+  }
+  
+  @Test
+  public void testGetMutations5() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, "row1");
+    tuple.set(1, "cf1");
+    tuple.set(2, "cq1");
+    tuple.set(3, "cv1");
+    tuple.set(4, "val1");
+    
+    Collection<Mutation> muts = s.getMutations(tuple);
+    
+    assertNotNull(muts);
+    assertEquals(1, muts.size());
+    Mutation mut = muts.iterator().next();
+    List<ColumnUpdate> updates = mut.getUpdates();
+    assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), 
mut.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), 
update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), 
update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), 
update.getColumnVisibility()));
+    assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), 
update.getValue()));
+  }
+  
+  @Test
+  public void testGetMutations6() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Tuple tuple = TupleFactory.getInstance().newTuple(6);
+    tuple.set(0, "row");
+    tuple.set(1, "cf");
+    tuple.set(2, "cq");
+    tuple.set(3, "cv");
+    tuple.set(4, new Long(1));
+    tuple.set(5, "value");
+    
+    Collection<Mutation> mutations = s.getMutations(tuple);
+    Assert.assertNotNull(mutations);
+    Assert.assertEquals(1, mutations.size());
+    Mutation m = mutations.iterator().next();
+    
+    List<ColumnUpdate> updates = m.getUpdates();
+    Assert.assertEquals(1, updates.size());
+    ColumnUpdate update = updates.get(0);
+    
+    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), m.getRow()));
+    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), 
update.getColumnFamily()));
+    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), 
update.getColumnQualifier()));
+    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), 
update.getColumnVisibility()));
+    assertEquals(((Long) tuple.get(4)).longValue(), update.getTimestamp());
+    assertTrue(Arrays.equals(((String) tuple.get(5)).getBytes(), 
update.getValue()));
+  }
+  
+  @Test
+  public void testGetTuple() throws Exception {
+    AccumuloKVStorage s = new AccumuloKVStorage();
+    
+    Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
+    Value value = new Value("val1".getBytes());
+    Tuple tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+    
+    key = new Key("row1", "cf1", "cq1");
+    value = new Value("val1".getBytes());
+    tuple = s.getTuple(key, value);
+    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/294f9ce8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java 
b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
deleted file mode 100644
index fbd68c6..0000000
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.accumulo.pig;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.accumulo.core.data.ColumnUpdate;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.Test;
-
-public class AccumuloStorageTest {
-  
-  @Test
-  public void testGetMutations4() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Tuple tuple = TupleFactory.getInstance().newTuple(4);
-    tuple.set(0, "row1");
-    tuple.set(1, "cf1");
-    tuple.set(2, "cq1");
-    tuple.set(3, "val1");
-    
-    Collection<Mutation> muts = s.getMutations(tuple);
-    
-    assertNotNull(muts);
-    assertEquals(1, muts.size());
-    Mutation mut = muts.iterator().next();
-    List<ColumnUpdate> updates = mut.getUpdates();
-    assertEquals(1, updates.size());
-    ColumnUpdate update = updates.get(0);
-    
-    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), 
mut.getRow()));
-    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), 
update.getColumnFamily()));
-    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), 
update.getColumnQualifier()));
-    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), 
update.getValue()));
-    assertTrue(Arrays.equals("".getBytes(), update.getColumnVisibility()));
-  }
-  
-  @Test
-  public void testGetMutations5() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Tuple tuple = TupleFactory.getInstance().newTuple(5);
-    tuple.set(0, "row1");
-    tuple.set(1, "cf1");
-    tuple.set(2, "cq1");
-    tuple.set(3, "cv1");
-    tuple.set(4, "val1");
-    
-    Collection<Mutation> muts = s.getMutations(tuple);
-    
-    assertNotNull(muts);
-    assertEquals(1, muts.size());
-    Mutation mut = muts.iterator().next();
-    List<ColumnUpdate> updates = mut.getUpdates();
-    assertEquals(1, updates.size());
-    ColumnUpdate update = updates.get(0);
-    
-    assertTrue(Arrays.equals(((String) tuple.get(0)).getBytes(), 
mut.getRow()));
-    assertTrue(Arrays.equals(((String) tuple.get(1)).getBytes(), 
update.getColumnFamily()));
-    assertTrue(Arrays.equals(((String) tuple.get(2)).getBytes(), 
update.getColumnQualifier()));
-    assertTrue(Arrays.equals(((String) tuple.get(3)).getBytes(), 
update.getColumnVisibility()));
-    assertTrue(Arrays.equals(((String) tuple.get(4)).getBytes(), 
update.getValue()));
-  }
-  
-  @Test
-  public void testGetTuple() throws Exception {
-    AccumuloStorage s = new AccumuloStorage();
-    
-    Key key = new Key("row1", "cf1", "cq1", "cv1", 1024L);
-    Value value = new Value("val1".getBytes());
-    Tuple tuple = s.getTuple(key, value);
-    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-    
-    key = new Key("row1", "cf1", "cq1");
-    value = new Value("val1".getBytes());
-    tuple = s.getTuple(key, value);
-    TestUtils.assertKeyValueEqualsTuple(key, value, tuple);
-  }
-}

Reply via email to