http://git-wip-us.apache.org/repos/asf/gora/blob/4bbf52ee/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
----------------------------------------------------------------------
diff --cc 
gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
index c0cd026,a4cddce..bac354b
--- 
a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
+++ 
b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
@@@ -1,1042 -1,1064 +1,1044 @@@
--/**
-- * Licensed to the Apache Software Foundation (ASF) under one or more
-- * contributor license agreements.  See the NOTICE file distributed with
-- * this work for additional information regarding copyright ownership.
-- * The ASF licenses this file to You under the Apache License, Version 2.0
-- * (the "License"); you may not use this file except in compliance with
-- * the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing, software
-- * distributed under the License is distributed on an "AS IS" BASIS,
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- * See the License for the specific language governing permissions and
-- * limitations under the License.
-- */
--package org.apache.gora.accumulo.store;
--
--import java.io.ByteArrayOutputStream;
--import java.io.IOException;
--import java.net.InetAddress;
--import java.nio.ByteBuffer;
--import java.util.ArrayList;
--import java.util.Arrays;
--import java.util.Collections;
--import java.util.HashMap;
--import java.util.Iterator;
--import java.util.List;
--import java.util.Map;
--import java.util.Map.Entry;
--import java.util.Properties;
--import java.util.Set;
--import java.util.concurrent.TimeUnit;
--
--import javax.xml.parsers.DocumentBuilder;
--import javax.xml.parsers.DocumentBuilderFactory;
--
--import org.apache.accumulo.core.client.AccumuloException;
--import org.apache.accumulo.core.client.AccumuloSecurityException;
--import org.apache.accumulo.core.client.BatchWriter;
--import org.apache.accumulo.core.client.BatchWriterConfig;
--import org.apache.accumulo.core.client.Connector;
--import org.apache.accumulo.core.client.IsolatedScanner;
--import org.apache.accumulo.core.client.IteratorSetting;
--import org.apache.accumulo.core.client.MutationsRejectedException;
--import org.apache.accumulo.core.client.RowIterator;
--import org.apache.accumulo.core.client.Scanner;
--import org.apache.accumulo.core.client.TableDeletedException;
--import org.apache.accumulo.core.client.TableExistsException;
--import org.apache.accumulo.core.client.TableNotFoundException;
--import org.apache.accumulo.core.client.TableOfflineException;
--import org.apache.accumulo.core.client.ZooKeeperInstance;
 -import org.apache.accumulo.core.client.impl.ClientContext;
--import org.apache.accumulo.core.client.impl.Tables;
--import org.apache.accumulo.core.client.impl.TabletLocator;
--import org.apache.accumulo.core.client.mock.MockConnector;
--import org.apache.accumulo.core.client.mock.MockInstance;
--import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
--import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
--import org.apache.accumulo.core.client.security.tokens.PasswordToken;
--import org.apache.accumulo.core.conf.AccumuloConfiguration;
--import org.apache.accumulo.core.data.ByteSequence;
--import org.apache.accumulo.core.data.Key;
- import org.apache.accumulo.core.data.impl.KeyExtent;
--import org.apache.accumulo.core.data.Mutation;
--import org.apache.accumulo.core.data.Range;
--import org.apache.accumulo.core.data.Value;
 -import org.apache.accumulo.core.data.impl.KeyExtent;
--import org.apache.accumulo.core.iterators.SortedKeyIterator;
--import org.apache.accumulo.core.iterators.user.TimestampFilter;
--import org.apache.accumulo.core.master.state.tables.TableState;
--import org.apache.accumulo.core.security.Authorizations;
--import org.apache.accumulo.core.security.ColumnVisibility;
- import org.apache.accumulo.core.client.impl.ClientContext;
--import org.apache.accumulo.core.client.impl.Credentials;
--import org.apache.accumulo.core.util.Pair;
--import org.apache.accumulo.core.util.UtilWaitThread;
--import org.apache.avro.Schema;
--import org.apache.avro.Schema.Field;
--import org.apache.avro.Schema.Type;
--import org.apache.avro.generic.GenericData;
--import org.apache.avro.io.BinaryDecoder;
--import org.apache.avro.io.Decoder;
--import org.apache.avro.io.DecoderFactory;
--import org.apache.avro.io.EncoderFactory;
--import org.apache.avro.specific.SpecificDatumReader;
--import org.apache.avro.specific.SpecificDatumWriter;
--import org.apache.avro.util.Utf8;
--import org.apache.gora.accumulo.encoders.BinaryEncoder;
--import org.apache.gora.accumulo.encoders.Encoder;
--import org.apache.gora.accumulo.query.AccumuloQuery;
--import org.apache.gora.accumulo.query.AccumuloResult;
--import org.apache.gora.persistency.impl.DirtyListWrapper;
--import org.apache.gora.persistency.impl.DirtyMapWrapper;
--import org.apache.gora.persistency.impl.PersistentBase;
--import org.apache.gora.query.PartitionQuery;
--import org.apache.gora.query.Query;
--import org.apache.gora.query.Result;
--import org.apache.gora.query.impl.PartitionQueryImpl;
--import org.apache.gora.store.DataStoreFactory;
--import org.apache.gora.store.impl.DataStoreBase;
--import org.apache.gora.util.AvroUtils;
--import org.apache.gora.util.GoraException;
--import org.apache.gora.util.IOUtils;
--import org.apache.hadoop.io.Text;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--import org.w3c.dom.Document;
--import org.w3c.dom.Element;
--import org.w3c.dom.NodeList;
--
--/**
-- * Implementation of a Accumulo data store to be used by gora.
-- *
-  * @param <K> class to be used for the key
-  * @param <T> class to be persisted within the store
 - * @param <K>
 - *            class to be used for the key
 - * @param <T>
 - *            class to be persisted within the store
-- */
- public class AccumuloStore<K,T extends PersistentBase> extends 
DataStoreBase<K,T> {
 -public class AccumuloStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
--
-   protected static final String MOCK_PROPERTY = "accumulo.mock";
-   protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
-   protected static final String ZOOKEEPERS_NAME_PROPERTY = 
"accumulo.zookeepers";
-   protected static final String USERNAME_PROPERTY = "accumulo.user";
-   protected static final String PASSWORD_PROPERTY = "accumulo.password";
-   protected static final String DEFAULT_MAPPING_FILE = 
"gora-accumulo-mapping.xml";
 -      protected static final String MOCK_PROPERTY = "accumulo.mock";
 -      protected static final String INSTANCE_NAME_PROPERTY = 
"accumulo.instance";
 -      protected static final String ZOOKEEPERS_NAME_PROPERTY = 
"accumulo.zookeepers";
 -      protected static final String USERNAME_PROPERTY = "accumulo.user";
 -      protected static final String PASSWORD_PROPERTY = "accumulo.password";
 -      protected static final String DEFAULT_MAPPING_FILE = 
"gora-accumulo-mapping.xml";
--
-   private final static String UNKOWN = "Unknown type ";
 -      private final static String UNKOWN = "Unknown type ";
--
-   private Connector conn;
-   private BatchWriter batchWriter;
-   private AccumuloMapping mapping;
-   private Credentials credentials;
-   private Encoder encoder;
 -      private Connector conn;
 -      private BatchWriter batchWriter;
 -      private AccumuloMapping mapping;
 -      private Credentials credentials;
 -      private Encoder encoder;
--
-   public static final Logger LOG = 
LoggerFactory.getLogger(AccumuloStore.class);
 -      public static final Logger LOG = 
LoggerFactory.getLogger(AccumuloStore.class);
--
-   public Object fromBytes(Schema schema, byte[] data) throws IOException {
-     Schema fromSchema = null;
-     if (schema.getType() == Type.UNION) {
-       try {
-         Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
-         int unionIndex = decoder.readIndex();
-         List<Schema> possibleTypes = schema.getTypes();
-         fromSchema = possibleTypes.get(unionIndex);
-         Schema effectiveSchema = possibleTypes.get(unionIndex);
-         if (effectiveSchema.getType() == Type.NULL) {
-           decoder.readNull();
-           return null;
-         } else {
-           data = decoder.readBytes(null).array();
-         }
-       } catch (IOException e) {
-         LOG.error(e.getMessage());
-         throw new GoraException("Error decoding union type: ", e);
-       }
-     } else {
-       fromSchema = schema;
-     }
-     return fromBytes(encoder, fromSchema, data);
-   }
 -      public Object fromBytes(Schema schema, byte[] data) throws IOException {
 -              Schema fromSchema = null;
 -              if (schema.getType() == Type.UNION) {
 -                      try {
 -                              Decoder decoder = 
DecoderFactory.get().binaryDecoder(data, null);
 -                              int unionIndex = decoder.readIndex();
 -                              List<Schema> possibleTypes = schema.getTypes();
 -                              fromSchema = possibleTypes.get(unionIndex);
 -                              Schema effectiveSchema = 
possibleTypes.get(unionIndex);
 -                              if (effectiveSchema.getType() == Type.NULL) {
 -                                      decoder.readNull();
 -                                      return null;
 -                              } else {
 -                                      data = decoder.readBytes(null).array();
 -                              }
 -                      } catch (IOException e) {
 -                              LOG.error(e.getMessage());
 -                              throw new GoraException("Error decoding union 
type: ", e);
 -                      }
 -              } else {
 -                      fromSchema = schema;
 -              }
 -              return fromBytes(encoder, fromSchema, data);
 -      }
--
-   public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) 
throws IOException {
-     switch (schema.getType()) {
-     case BOOLEAN:
-       return encoder.decodeBoolean(data);
-     case DOUBLE:
-       return encoder.decodeDouble(data);
-     case FLOAT:
-       return encoder.decodeFloat(data);
-     case INT:
-       return encoder.decodeInt(data);
-     case LONG:
-       return encoder.decodeLong(data);
-     case STRING:
-       return new Utf8(data);
-     case BYTES:
-       return ByteBuffer.wrap(data);
-     case ENUM:
-       return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
-     case ARRAY:
-       break;
-     case FIXED:
-       break;
-     case MAP:
-       break;
-     case NULL:
-       break;
-     case RECORD:
-       break;
-     case UNION:
-       break;
-     default:
-       break;
-     }
-     throw new IllegalArgumentException(UNKOWN + schema.getType());
 -      public static Object fromBytes(Encoder encoder, Schema schema, byte 
data[]) throws IOException {
 -              switch (schema.getType()) {
 -              case BOOLEAN:
 -                      return encoder.decodeBoolean(data);
 -              case DOUBLE:
 -                      return encoder.decodeDouble(data);
 -              case FLOAT:
 -                      return encoder.decodeFloat(data);
 -              case INT:
 -                      return encoder.decodeInt(data);
 -              case LONG:
 -                      return encoder.decodeLong(data);
 -              case STRING:
 -                      return new Utf8(data);
 -              case BYTES:
 -                      return ByteBuffer.wrap(data);
 -              case ENUM:
 -                      return AvroUtils.getEnumValue(schema, 
encoder.decodeInt(data));
 -              case ARRAY:
 -                      break;
 -              case FIXED:
 -                      break;
 -              case MAP:
 -                      break;
 -              case NULL:
 -                      break;
 -              case RECORD:
 -                      break;
 -              case UNION:
 -                      break;
 -              default:
 -                      break;
 -              }
 -              throw new IllegalArgumentException(UNKOWN + schema.getType());
--
-   }
 -      }
--
-   private static byte[] getBytes(Text text) {
-     byte[] bytes = text.getBytes();
-     if (bytes.length != text.getLength()) {
-       bytes = new byte[text.getLength()];
-       System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
-     }
-     return bytes;
-   }
 -      private static byte[] getBytes(Text text) {
 -              byte[] bytes = text.getBytes();
 -              if (bytes.length != text.getLength()) {
 -                      bytes = new byte[text.getLength()];
 -                      System.arraycopy(text.getBytes(), 0, bytes, 0, 
bytes.length);
 -              }
 -              return bytes;
 -      }
--
-   public K fromBytes(Class<K> clazz, byte[] val) {
-     return fromBytes(encoder, clazz, val);
-   }
 -      public K fromBytes(Class<K> clazz, byte[] val) {
 -              return fromBytes(encoder, clazz, val);
 -      }
--
-   @SuppressWarnings("unchecked")
-   public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
-     try {
-       if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-         return (K) Byte.valueOf(encoder.decodeByte(val));
-       } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-         return (K) Boolean.valueOf(encoder.decodeBoolean(val));
-       } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-         return (K) Short.valueOf(encoder.decodeShort(val));
-       } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-         return (K) Integer.valueOf(encoder.decodeInt(val));
-       } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-         return (K) Long.valueOf(encoder.decodeLong(val));
-       } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-         return (K) Float.valueOf(encoder.decodeFloat(val));
-       } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-         return (K) Double.valueOf(encoder.decodeDouble(val));
-       } else if (clazz.equals(String.class)) {
-         return (K) new String(val, "UTF-8");
-       } else if (clazz.equals(Utf8.class)) {
-         return (K) new Utf8(val);
-       }
 -      @SuppressWarnings("unchecked")
 -      public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] 
val) {
 -              try {
 -                      if (clazz.equals(Byte.TYPE) || 
clazz.equals(Byte.class)) {
 -                              return (K) 
Byte.valueOf(encoder.decodeByte(val));
 -                      } else if (clazz.equals(Boolean.TYPE) || 
clazz.equals(Boolean.class)) {
 -                              return (K) 
Boolean.valueOf(encoder.decodeBoolean(val));
 -                      } else if (clazz.equals(Short.TYPE) || 
clazz.equals(Short.class)) {
 -                              return (K) 
Short.valueOf(encoder.decodeShort(val));
 -                      } else if (clazz.equals(Integer.TYPE) || 
clazz.equals(Integer.class)) {
 -                              return (K) 
Integer.valueOf(encoder.decodeInt(val));
 -                      } else if (clazz.equals(Long.TYPE) || 
clazz.equals(Long.class)) {
 -                              return (K) 
Long.valueOf(encoder.decodeLong(val));
 -                      } else if (clazz.equals(Float.TYPE) || 
clazz.equals(Float.class)) {
 -                              return (K) 
Float.valueOf(encoder.decodeFloat(val));
 -                      } else if (clazz.equals(Double.TYPE) || 
clazz.equals(Double.class)) {
 -                              return (K) 
Double.valueOf(encoder.decodeDouble(val));
 -                      } else if (clazz.equals(String.class)) {
 -                              return (K) new String(val, "UTF-8");
 -                      } else if (clazz.equals(Utf8.class)) {
 -                              return (K) new Utf8(val);
 -                      }
--
-       throw new IllegalArgumentException(UNKOWN + clazz.getName());
-     } catch (IOException ioe) {
-       LOG.error(ioe.getMessage());
-       throw new RuntimeException(ioe);
-     }
-   }
 -                      throw new IllegalArgumentException(UNKOWN + 
clazz.getName());
 -              } catch (IOException ioe) {
 -                      LOG.error(ioe.getMessage());
 -                      throw new RuntimeException(ioe);
 -              }
 -      }
--
-   private static byte[] copyIfNeeded(byte b[], int offset, int len) {
-     if (len != b.length || offset != 0) {
-       byte[] copy = new byte[len];
-       System.arraycopy(b, offset, copy, 0, copy.length);
-       b = copy;
-     }
-     return b;
-   }
 -      private static byte[] copyIfNeeded(byte b[], int offset, int len) {
 -              if (len != b.length || offset != 0) {
 -                      byte[] copy = new byte[len];
 -                      System.arraycopy(b, offset, copy, 0, copy.length);
 -                      b = copy;
 -              }
 -              return b;
 -      }
--
-   public byte[] toBytes(Schema toSchema, Object o) {
-     if (toSchema != null && toSchema.getType() == Type.UNION) {
-       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-       org.apache.avro.io.BinaryEncoder avroEncoder = 
EncoderFactory.get().binaryEncoder(baos, null);
-       int unionIndex = 0;
-       try {
-         if (o == null) {
-           unionIndex = firstNullSchemaTypeIndex(toSchema);
-           avroEncoder.writeIndex(unionIndex);
-           avroEncoder.writeNull();
-         } else {
-           unionIndex = firstNotNullSchemaTypeIndex(toSchema);
-           avroEncoder.writeIndex(unionIndex);
-           avroEncoder.writeBytes(toBytes(o));
-         }
-         avroEncoder.flush();
-         return baos.toByteArray();
-       } catch (IOException e) {
-         LOG.error(e.getMessage());
-         return toBytes(o);
-       }
-     } else {
-       return toBytes(o);
-     }
-   }
 -      public byte[] toBytes(Schema toSchema, Object o) {
 -              if (toSchema != null && toSchema.getType() == Type.UNION) {
 -                      ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
 -                      org.apache.avro.io.BinaryEncoder avroEncoder = 
EncoderFactory.get().binaryEncoder(baos, null);
 -                      int unionIndex = 0;
 -                      try {
 -                              if (o == null) {
 -                                      unionIndex = 
firstNullSchemaTypeIndex(toSchema);
 -                                      avroEncoder.writeIndex(unionIndex);
 -                                      avroEncoder.writeNull();
 -                              } else {
 -                                      unionIndex = 
firstNotNullSchemaTypeIndex(toSchema);
 -                                      avroEncoder.writeIndex(unionIndex);
 -                                      avroEncoder.writeBytes(toBytes(o));
 -                              }
 -                              avroEncoder.flush();
 -                              return baos.toByteArray();
 -                      } catch (IOException e) {
 -                              LOG.error(e.getMessage());
 -                              return toBytes(o);
 -                      }
 -              } else {
 -                      return toBytes(o);
 -              }
 -      }
--
-   private int firstNullSchemaTypeIndex(Schema toSchema) {
-     List<Schema> possibleTypes = toSchema.getTypes();
-     int unionIndex = 0;
-     for (int i = 0; i < possibleTypes.size(); i++ ) {
-       Type pType = possibleTypes.get(i).getType();
-       if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
-         unionIndex = i; break;
-       }
-     }
-     return unionIndex;
-   }
 -      private int firstNullSchemaTypeIndex(Schema toSchema) {
 -              List<Schema> possibleTypes = toSchema.getTypes();
 -              int unionIndex = 0;
 -              for (int i = 0; i < possibleTypes.size(); i++) {
 -                      Type pType = possibleTypes.get(i).getType();
 -                      if (pType == Type.NULL) { // FIXME HUGE kludge to pass 
tests
 -                              unionIndex = i;
 -                              break;
 -                      }
 -              }
 -              return unionIndex;
 -      }
--
-   private int firstNotNullSchemaTypeIndex(Schema toSchema) {
-     List<Schema> possibleTypes = toSchema.getTypes();
-     int unionIndex = 0;
-     for (int i = 0; i < possibleTypes.size(); i++ ) {
-       Type pType = possibleTypes.get(i).getType();
-       if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
-         unionIndex = i; break;
-       }
-     }
-     return unionIndex;
-   }
 -      private int firstNotNullSchemaTypeIndex(Schema toSchema) {
 -              List<Schema> possibleTypes = toSchema.getTypes();
 -              int unionIndex = 0;
 -              for (int i = 0; i < possibleTypes.size(); i++) {
 -                      Type pType = possibleTypes.get(i).getType();
 -                      if (pType != Type.NULL) { // FIXME HUGE kludge to pass 
tests
 -                              unionIndex = i;
 -                              break;
 -                      }
 -              }
 -              return unionIndex;
 -      }
--
-   public byte[] toBytes(Object o) {
-     return toBytes(encoder, o);
-   }
 -      public byte[] toBytes(Object o) {
 -              return toBytes(encoder, o);
 -      }
--
-   public static byte[] toBytes(Encoder encoder, Object o) {
 -      public static byte[] toBytes(Encoder encoder, Object o) {
--
-     try {
-       if (o instanceof String) {
-         return ((String) o).getBytes("UTF-8");
-       } else if (o instanceof Utf8) {
-         return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) 
o).getByteLength());
-       } else if (o instanceof ByteBuffer) {
-         return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) 
o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
-       } else if (o instanceof Long) {
-         return encoder.encodeLong((Long) o);
-       } else if (o instanceof Integer) {
-         return encoder.encodeInt((Integer) o);
-       } else if (o instanceof Short) {
-         return encoder.encodeShort((Short) o);
-       } else if (o instanceof Byte) {
-         return encoder.encodeByte((Byte) o);
-       } else if (o instanceof Boolean) {
-         return encoder.encodeBoolean((Boolean) o);
-       } else if (o instanceof Float) {
-         return encoder.encodeFloat((Float) o);
-       } else if (o instanceof Double) {
-         return encoder.encodeDouble((Double) o);
-       } else if (o instanceof Enum) {
-         return encoder.encodeInt(((Enum<?>) o).ordinal());
-       }
-     } catch (IOException ioe) {
-       throw new RuntimeException(ioe);
-     }
 -              try {
 -                      if (o instanceof String) {
 -                              return ((String) o).getBytes("UTF-8");
 -                      } else if (o instanceof Utf8) {
 -                              return copyIfNeeded(((Utf8) o).getBytes(), 0, 
((Utf8) o).getByteLength());
 -                      } else if (o instanceof ByteBuffer) {
 -                              return copyIfNeeded(((ByteBuffer) o).array(),
 -                                              ((ByteBuffer) o).arrayOffset() 
+ ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
 -                      } else if (o instanceof Long) {
 -                              return encoder.encodeLong((Long) o);
 -                      } else if (o instanceof Integer) {
 -                              return encoder.encodeInt((Integer) o);
 -                      } else if (o instanceof Short) {
 -                              return encoder.encodeShort((Short) o);
 -                      } else if (o instanceof Byte) {
 -                              return encoder.encodeByte((Byte) o);
 -                      } else if (o instanceof Boolean) {
 -                              return encoder.encodeBoolean((Boolean) o);
 -                      } else if (o instanceof Float) {
 -                              return encoder.encodeFloat((Float) o);
 -                      } else if (o instanceof Double) {
 -                              return encoder.encodeDouble((Double) o);
 -                      } else if (o instanceof Enum) {
 -                              return encoder.encodeInt(((Enum<?>) 
o).ordinal());
 -                      }
 -              } catch (IOException ioe) {
 -                      throw new RuntimeException(ioe);
 -              }
--
-     throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
-   }
 -              throw new IllegalArgumentException(UNKOWN + 
o.getClass().getName());
 -      }
--
-   private BatchWriter getBatchWriter() throws IOException {
-     if (batchWriter == null)
-       try {
-         BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
-         batchWriterConfig.setMaxMemory(10000000);
-         batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
-         batchWriterConfig.setMaxWriteThreads(4);
-         batchWriter = conn.createBatchWriter(mapping.tableName, 
batchWriterConfig);
-       } catch (TableNotFoundException e) {
-         throw new IOException(e);
-       }
-     return batchWriter;
-   }
 -      private BatchWriter getBatchWriter() throws IOException {
 -              if (batchWriter == null)
 -                      try {
 -                              BatchWriterConfig batchWriterConfig = new 
BatchWriterConfig();
 -                              batchWriterConfig.setMaxMemory(10000000);
 -                              batchWriterConfig.setMaxLatency(60000L, 
TimeUnit.MILLISECONDS);
 -                              batchWriterConfig.setMaxWriteThreads(4);
 -                              batchWriter = 
conn.createBatchWriter(mapping.tableName, batchWriterConfig);
 -                      } catch (TableNotFoundException e) {
 -                              throw new IOException(e);
 -                      }
 -              return batchWriter;
 -      }
--
-   /**
-    * Initialize the data store by reading the credentials, setting the 
client's properties up and
-    * reading the mapping file. Initialize is called when then the call to
-    * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
-    *
-    * @param keyClass
-    * @param persistentClass
-    * @param properties
-    */
-   @Override
-   public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) {
-     try{
-       super.initialize(keyClass, persistentClass, properties);
 -      /**
 -       * Initialize the data store by reading the credentials, setting the
 -       * client's properties up and reading the mapping file. Initialize is 
called
 -       * when then the call to
 -       * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is 
made.
 -       *
 -       * @param keyClass
 -       * @param persistentClass
 -       * @param properties
 -       */
 -      @Override
 -      public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) {
 -              try {
 -                      super.initialize(keyClass, persistentClass, properties);
--
-       String mock = DataStoreFactory.findProperty(properties, this, 
MOCK_PROPERTY, null);
-       String mappingFile = DataStoreFactory.getMappingFile(properties, this, 
DEFAULT_MAPPING_FILE);
-       String user = DataStoreFactory.findProperty(properties, this, 
USERNAME_PROPERTY, null);
-       String password = DataStoreFactory.findProperty(properties, this, 
PASSWORD_PROPERTY, null);
 -                      String mock = DataStoreFactory.findProperty(properties, 
this, MOCK_PROPERTY, null);
 -                      String mappingFile = 
DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
 -                      String user = DataStoreFactory.findProperty(properties, 
this, USERNAME_PROPERTY, null);
 -                      String password = 
DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
--
-       mapping = readMapping(mappingFile);
 -                      mapping = readMapping(mappingFile);
--
-       if (mapping.encoder == null || "".equals(mapping.encoder)) {
-         encoder = new BinaryEncoder();
-       } else {
-         try {
-           encoder = (Encoder) 
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
-         } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
-           throw new IOException(e);
-         }
-       }
 -                      if (mapping.encoder == null || 
"".equals(mapping.encoder)) {
 -                              encoder = new BinaryEncoder();
 -                      } else {
 -                              try {
 -                                      encoder = (Encoder) 
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
 -                              } catch (InstantiationException | 
IllegalAccessException | ClassNotFoundException e) {
 -                                      throw new IOException(e);
 -                              }
 -                      }
--
-       try {
-         AuthenticationToken token = new PasswordToken(password);
-         if (mock == null || !mock.equals("true")) {
-           String instance = DataStoreFactory.findProperty(properties, this, 
INSTANCE_NAME_PROPERTY, null);
-           String zookeepers = DataStoreFactory.findProperty(properties, this, 
ZOOKEEPERS_NAME_PROPERTY, null);
-           conn = new ZooKeeperInstance(instance, 
zookeepers).getConnector(user, token);
-         } else {
-           conn = new MockInstance().getConnector(user, token);
-         }
-         credentials = new Credentials(user, token);
 -                      try {
 -                              AuthenticationToken token = new 
PasswordToken(password);
 -                              if (mock == null || !mock.equals("true")) {
 -                                      String instance = 
DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
 -                                      String zookeepers = 
DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
 -                                      conn = new ZooKeeperInstance(instance, 
zookeepers).getConnector(user, token);
 -                              } else {
 -                                      conn = new 
MockInstance().getConnector(user, token);
 -                              }
 -                              credentials = new Credentials(user, token);
--
-         if (autoCreateSchema && !schemaExists())
-           createSchema();
-       } catch (AccumuloException | AccumuloSecurityException e) {
-         throw new IOException(e);
-       }
-     } catch(IOException e){
-       LOG.error(e.getMessage(), e);
-     }
-   }
 -                              if (autoCreateSchema && !schemaExists())
 -                                      createSchema();
 -                      } catch (AccumuloException | AccumuloSecurityException 
e) {
 -                              throw new IOException(e);
 -                      }
 -              } catch (IOException e) {
 -                      LOG.error(e.getMessage(), e);
 -              }
 -      }
--
-   protected AccumuloMapping readMapping(String filename) throws IOException {
-     try {
 -      protected AccumuloMapping readMapping(String filename) throws 
IOException {
 -              try {
--
-       AccumuloMapping mapping = new AccumuloMapping();
 -                      AccumuloMapping mapping = new AccumuloMapping();
--
-       DocumentBuilder db = 
DocumentBuilderFactory.newInstance().newDocumentBuilder();
-       Document dom = 
db.parse(getClass().getClassLoader().getResourceAsStream(filename));
 -                      DocumentBuilder db = 
DocumentBuilderFactory.newInstance().newDocumentBuilder();
 -                      Document dom = 
db.parse(getClass().getClassLoader().getResourceAsStream(filename));
--
-       Element root = dom.getDocumentElement();
 -                      Element root = dom.getDocumentElement();
--
-       NodeList nl = root.getElementsByTagName("class");
-       for (int i = 0; i < nl.getLength(); i++) {
 -                      NodeList nl = root.getElementsByTagName("class");
 -                      for (int i = 0; i < nl.getLength(); i++) {
--
-         Element classElement = (Element) nl.item(i);
-         if 
(classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
-             && 
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
 -                              Element classElement = (Element) nl.item(i);
 -                              if 
(classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
 -                                              && 
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
--
-           mapping.tableName = 
getSchemaName(classElement.getAttribute("table"), persistentClass);
-           mapping.encoder = classElement.getAttribute("encoder");
 -                                      mapping.tableName = 
getSchemaName(classElement.getAttribute("table"), persistentClass);
 -                                      mapping.encoder = 
classElement.getAttribute("encoder");
--
-           NodeList fields = classElement.getElementsByTagName("field");
-           for (int j = 0; j < fields.getLength(); j++) {
-             Element fieldElement = (Element) fields.item(j);
 -                                      NodeList fields = 
classElement.getElementsByTagName("field");
 -                                      for (int j = 0; j < fields.getLength(); 
j++) {
 -                                              Element fieldElement = 
(Element) fields.item(j);
--
-             String name = fieldElement.getAttribute("name");
-             String family = fieldElement.getAttribute("family");
-             String qualifier = fieldElement.getAttribute("qualifier");
-             if ("".equals(qualifier))
-               qualifier = null;
 -                                              String name = 
fieldElement.getAttribute("name");
 -                                              String family = 
fieldElement.getAttribute("family");
 -                                              String qualifier = 
fieldElement.getAttribute("qualifier");
 -                                              if ("".equals(qualifier))
 -                                                      qualifier = null;
--
-             Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == 
null ? null : new Text(qualifier));
-             mapping.fieldMap.put(name, col);
-             mapping.columnMap.put(col, name);
-           }
-         }
 -                                              Pair<Text, Text> col = new 
Pair<>(new Text(family),
 -                                                              qualifier == 
null ? null : new Text(qualifier));
 -                                              mapping.fieldMap.put(name, col);
 -                                              mapping.columnMap.put(col, 
name);
 -                                      }
 -                              }
--
-       }
 -                      }
--
-       if (mapping.tableName == null) {
-         throw new GoraException("Please define the accumulo 'table' name 
mapping in " + filename + " for " + persistentClass.getCanonicalName());
-       }
 -                      if (mapping.tableName == null) {
 -                              throw new GoraException("Please define the 
accumulo 'table' name mapping in " + filename + " for "
 -                                              + 
persistentClass.getCanonicalName());
 -                      }
--
-       nl = root.getElementsByTagName("table");
-       for (int i = 0; i < nl.getLength(); i++) {
-         Element tableElement = (Element) nl.item(i);
-         if (tableElement.getAttribute("name").equals(mapping.tableName)) {
-           NodeList configs = tableElement.getElementsByTagName("config");
-           for (int j = 0; j < configs.getLength(); j++) {
-             Element configElement = (Element) configs.item(j);
-             String key = configElement.getAttribute("key");
-             String val = configElement.getAttribute("value");
-             mapping.tableConfig.put(key, val);
-           }
-         }
-       }
 -                      nl = root.getElementsByTagName("table");
 -                      for (int i = 0; i < nl.getLength(); i++) {
 -                              Element tableElement = (Element) nl.item(i);
 -                              if 
(tableElement.getAttribute("name").equals(mapping.tableName)) {
 -                                      NodeList configs = 
tableElement.getElementsByTagName("config");
 -                                      for (int j = 0; j < 
configs.getLength(); j++) {
 -                                              Element configElement = 
(Element) configs.item(j);
 -                                              String key = 
configElement.getAttribute("key");
 -                                              String val = 
configElement.getAttribute("value");
 -                                              mapping.tableConfig.put(key, 
val);
 -                                      }
 -                              }
 -                      }
--
-       return mapping;
-     } catch (Exception ex) {
-       throw new IOException("Unable to read " + filename, ex);
-     }
 -                      return mapping;
 -              } catch (Exception ex) {
 -                      throw new IOException("Unable to read " + filename, ex);
 -              }
--
-   }
 -      }
--
-   @Override
-   public String getSchemaName() {
-     return mapping.tableName;
-   }
 -      @Override
 -      public String getSchemaName() {
 -              return mapping.tableName;
 -      }
--
-   @Override
-   public void createSchema() {
-     try {
-       conn.tableOperations().create(mapping.tableName);
-       Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
-       for (Entry<String,String> entry : es) {
-         conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), 
entry.getValue());
-       }
 -      @Override
 -      public void createSchema() {
 -              try {
 -                      conn.tableOperations().create(mapping.tableName);
 -                      Set<Entry<String, String>> es = 
mapping.tableConfig.entrySet();
 -                      for (Entry<String, String> entry : es) {
 -                              
conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), 
entry.getValue());
 -                      }
--
-     } catch (AccumuloException | AccumuloSecurityException e) {
-       LOG.error(e.getMessage(), e);
-     } catch (TableExistsException e) {
-       LOG.debug(e.getMessage(), e);
-     }
-   }
 -              } catch (AccumuloException | AccumuloSecurityException e) {
 -                      LOG.error(e.getMessage(), e);
 -              } catch (TableExistsException e) {
 -                      LOG.debug(e.getMessage(), e);
 -              }
 -      }
--
-   @Override
-   public void deleteSchema() {
-     try {
-       if (batchWriter != null)
-         batchWriter.close();
-       batchWriter = null;
-       conn.tableOperations().delete(mapping.tableName);
-     } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
-       LOG.error(e.getMessage(), e);
-     }
-   }
 -      @Override
 -      public void deleteSchema() {
 -              try {
 -                      if (batchWriter != null)
 -                              batchWriter.close();
 -                      batchWriter = null;
 -                      conn.tableOperations().delete(mapping.tableName);
 -              } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
 -                      LOG.error(e.getMessage(), e);
 -              }
 -      }
--
-   @Override
-   public boolean schemaExists() {
-     return conn.tableOperations().exists(mapping.tableName);
-   }
 -      @Override
 -      public boolean schemaExists() {
 -              return conn.tableOperations().exists(mapping.tableName);
 -      }
--
-   public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) 
throws IOException {
-     ByteSequence row = null;
 -      public ByteSequence populate(Iterator<Entry<Key, Value>> iter, T 
persistent) throws IOException {
 -              ByteSequence row = null;
--
-     Map<Utf8, Object> currentMap = null;
-     List currentArray = null;
-     Text currentFam = null;
-     int currentPos = 0;
-     Schema currentSchema = null;
-     Field currentField = null;
 -              Map<Utf8, Object> currentMap = null;
 -              List currentArray = null;
 -              Text currentFam = null;
 -              int currentPos = 0;
 -              Schema currentSchema = null;
 -              Field currentField = null;
--
-     BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], 
null);
 -              BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new 
byte[0], null);
--
-     while (iter.hasNext()) {
-       Entry<Key,Value> entry = iter.next();
 -              while (iter.hasNext()) {
 -                      Entry<Key, Value> entry = iter.next();
--
-       if (row == null) {
-         row = entry.getKey().getRowData();
-       }
-       byte[] val = entry.getValue().get();
 -                      if (row == null) {
 -                              row = entry.getKey().getRowData();
 -                      }
 -                      byte[] val = entry.getValue().get();
--
-       Field field = fieldMap.get(getFieldName(entry));
 -                      Field field = fieldMap.get(getFieldName(entry));
--
-       if (currentMap != null) {
-         if (currentFam.equals(entry.getKey().getColumnFamily())) {
-           currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
-               fromBytes(currentSchema, entry.getValue().get()));
-           continue;
-         } else {
-           persistent.put(currentPos, currentMap);
-           currentMap = null;
-         }
-       } else if (currentArray != null) {
-         if (currentFam.equals(entry.getKey().getColumnFamily())) {
-           currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-           continue;
-         } else {
-           persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
-           currentArray = null;
-         }
-       }
 -                      if (currentMap != null) {
 -                              if 
(currentFam.equals(entry.getKey().getColumnFamily())) {
 -                                      currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
 -                                                      
fromBytes(currentSchema, entry.getValue().get()));
 -                                      continue;
 -                              } else {
 -                                      persistent.put(currentPos, currentMap);
 -                                      currentMap = null;
 -                              }
 -                      } else if (currentArray != null) {
 -                              if 
(currentFam.equals(entry.getKey().getColumnFamily())) {
 -                                      
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
 -                                      continue;
 -                              } else {
 -                                      persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
 -                                      currentArray = null;
 -                              }
 -                      }
--
-       switch (field.schema().getType()) {
-       case MAP:  // first entry only. Next are handled above on the next loop
-         currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
-         currentPos = field.pos();
-         currentFam = entry.getKey().getColumnFamily();
-         currentSchema = field.schema().getValueType();
 -                      switch (field.schema().getType()) {
 -                      case MAP: // first entry only. Next are handled above 
on the next
 -                                              // loop
 -                              currentMap = new DirtyMapWrapper<>(new 
HashMap<Utf8, Object>());
 -                              currentPos = field.pos();
 -                              currentFam = entry.getKey().getColumnFamily();
 -                              currentSchema = field.schema().getValueType();
--
-         currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
-             fromBytes(currentSchema, entry.getValue().get()));
-         break;
-       case ARRAY:
-         currentArray = new DirtyListWrapper<>(new ArrayList<>());
-         currentPos = field.pos();
-         currentFam = entry.getKey().getColumnFamily();
-         currentSchema = field.schema().getElementType();
-         currentField = field;
 -                              currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
 -                                              fromBytes(currentSchema, 
entry.getValue().get()));
 -                              break;
 -                      case ARRAY:
 -                              currentArray = new DirtyListWrapper<>(new 
ArrayList<>());
 -                              currentPos = field.pos();
 -                              currentFam = entry.getKey().getColumnFamily();
 -                              currentSchema = field.schema().getElementType();
 -                              currentField = field;
--
-         currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
 -                              currentArray.add(fromBytes(currentSchema, 
entry.getValue().get()));
--
-         break;
-       case UNION:// default value of null acts like union with null
-         Schema effectiveSchema = field.schema().getTypes()
-         .get(firstNotNullSchemaTypeIndex(field.schema()));
-         // map and array were coded without union index so need to be read 
the same way
-         if (effectiveSchema.getType() == Type.ARRAY) {
-           currentArray = new DirtyListWrapper<>(new ArrayList<>());
-           currentPos = field.pos();
-           currentFam = entry.getKey().getColumnFamily();
-           currentSchema = field.schema().getElementType();
-           currentField = field;
 -                              break;
 -                      case UNION:// default value of null acts like union 
with null
 -                              Schema effectiveSchema = 
field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
 -                              // map and array were coded without union index 
so need to be
 -                              // read the same way
 -                              if (effectiveSchema.getType() == Type.ARRAY) {
 -                                      currentArray = new 
DirtyListWrapper<>(new ArrayList<>());
 -                                      currentPos = field.pos();
 -                                      currentFam = 
entry.getKey().getColumnFamily();
 -                                      currentSchema = 
field.schema().getElementType();
 -                                      currentField = field;
--
-           currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-           break;
-         }
-         else if (effectiveSchema.getType() == Type.MAP) {
-           currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
-           currentPos = field.pos();
-           currentFam = entry.getKey().getColumnFamily();
-           currentSchema = effectiveSchema.getValueType();
 -                                      
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
 -                                      break;
 -                              } else if (effectiveSchema.getType() == 
Type.MAP) {
 -                                      currentMap = new DirtyMapWrapper<>(new 
HashMap<Utf8, Object>());
 -                                      currentPos = field.pos();
 -                                      currentFam = 
entry.getKey().getColumnFamily();
 -                                      currentSchema = 
effectiveSchema.getValueType();
--
-           currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
-               fromBytes(currentSchema, entry.getValue().get()));
-           break;
-         }
-         // continue like a regular top-level union
-       case RECORD:
-         SpecificDatumReader<?> reader = new 
SpecificDatumReader<Schema>(field.schema());
-         persistent.put(field.pos(), reader.read(null, 
DecoderFactory.get().binaryDecoder(val, decoder)));
-         break;
-       default:
-         persistent.put(field.pos(), fromBytes(field.schema(), 
entry.getValue().get()));
-       }
-     }
 -                                      currentMap.put(new 
Utf8(entry.getKey().getColumnQualifierData().toArray()),
 -                                                      
fromBytes(currentSchema, entry.getValue().get()));
 -                                      break;
 -                              }
 -                              // continue like a regular top-level union
 -                      case RECORD:
 -                              SpecificDatumReader<?> reader = new 
SpecificDatumReader<Schema>(field.schema());
 -                              persistent.put(field.pos(), reader.read(null, 
DecoderFactory.get().binaryDecoder(val, decoder)));
 -                              break;
 -                      default:
 -                              persistent.put(field.pos(), 
fromBytes(field.schema(), entry.getValue().get()));
 -                      }
 -              }
--
-     if (currentMap != null) {
-       persistent.put(currentPos, currentMap);
-     } else if (currentArray != null) {
-       persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
-     }
 -              if (currentMap != null) {
 -                      persistent.put(currentPos, currentMap);
 -              } else if (currentArray != null) {
 -                      persistent.put(currentPos, new 
GenericData.Array<T>(currentField.schema(), currentArray));
 -              }
--
-     persistent.clearDirty();
 -              persistent.clearDirty();
--
-     return row;
-   }
 -              return row;
 -      }
--
-   /**
-    * Retrieve field name from entry.
-    * @param entry The Key-Value entry
-    * @return String The field name
-    */
-   private String getFieldName(Entry<Key, Value> entry) {
-     String fieldName = mapping.columnMap.get(new 
Pair<>(entry.getKey().getColumnFamily(),
-         entry.getKey().getColumnQualifier()));
-     if (fieldName == null) {
-       fieldName = mapping.columnMap.get(new 
Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
-     }
-     return fieldName;
-   }
 -      /**
 -       * Retrieve field name from entry.
 -       * 
 -       * @param entry
 -       *            The Key-Value entry
 -       * @return String The field name
 -       */
 -      private String getFieldName(Entry<Key, Value> entry) {
 -              String fieldName = mapping.columnMap
 -                              .get(new 
Pair<>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
 -              if (fieldName == null) {
 -                      fieldName = mapping.columnMap.get(new Pair<Text, 
Text>(entry.getKey().getColumnFamily(), null));
 -              }
 -              return fieldName;
 -      }
--
-   private void setFetchColumns(Scanner scanner, String[] fields) {
-     fields = getFieldsToQuery(fields);
-     for (String field : fields) {
-       Pair<Text,Text> col = mapping.fieldMap.get(field);
-       if (col != null) {
-         if (col.getSecond() == null) {
-           scanner.fetchColumnFamily(col.getFirst());
-         } else {
-           scanner.fetchColumn(col.getFirst(), col.getSecond());
-         }
-       } else {
-         LOG.error("Mapping not found for field: {}", field);
-       }
-     }
-   }
 -      private void setFetchColumns(Scanner scanner, String[] fields) {
 -              fields = getFieldsToQuery(fields);
 -              for (String field : fields) {
 -                      Pair<Text, Text> col = mapping.fieldMap.get(field);
 -                      if (col != null) {
 -                              if (col.getSecond() == null) {
 -                                      
scanner.fetchColumnFamily(col.getFirst());
 -                              } else {
 -                                      scanner.fetchColumn(col.getFirst(), 
col.getSecond());
 -                              }
 -                      } else {
 -                              LOG.error("Mapping not found for field: {}", 
field);
 -                      }
 -              }
 -      }
--
-   @Override
-   public T get(K key, String[] fields) {
-     try {
-       // TODO make isolated scanner optional?
-       Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
-       Range rowRange = new Range(new Text(toBytes(key)));
 -      @Override
 -      public T get(K key, String[] fields) {
 -              try {
 -                      // TODO make isolated scanner optional?
 -                      Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
 -                      Range rowRange = new Range(new Text(toBytes(key)));
--
-       scanner.setRange(rowRange);
-       setFetchColumns(scanner, fields);
 -                      scanner.setRange(rowRange);
 -                      setFetchColumns(scanner, fields);
--
-       T persistent = newPersistent();
-       ByteSequence row = populate(scanner.iterator(), persistent);
-       if (row == null)
-         return null;
-       return persistent;
-     } catch (TableNotFoundException e) {
-       LOG.error(e.getMessage(), e);
-       return null;
-     } catch (IOException e) {
-       LOG.error(e.getMessage(), e);
-       return null;
-     }
-   }
 -                      T persistent = newPersistent();
 -                      ByteSequence row = populate(scanner.iterator(), 
persistent);
 -                      if (row == null)
 -                              return null;
 -                      return persistent;
 -              } catch (TableNotFoundException e) {
 -                      LOG.error(e.getMessage(), e);
 -                      return null;
 -              } catch (IOException e) {
 -                      LOG.error(e.getMessage(), e);
 -                      return null;
 -              }
 -      }
--
-   @Override
-   public void put(K key, T val) {
 -      @Override
 -      public void put(K key, T val) {
--
-     try{
-       Mutation m = new Mutation(new Text(toBytes(key)));
 -              try {
 -                      Mutation m = new Mutation(new Text(toBytes(key)));
--
-       Schema schema = val.getSchema();
-       List<Field> fields = schema.getFields();
-       int count = 0;
 -                      Schema schema = val.getSchema();
 -                      List<Field> fields = schema.getFields();
 -                      int count = 0;
--
-       for (int i = 0; i < fields.size(); i++) {
-         if (!val.isDirty(i)) {
-           continue;
-         }
-         Field field = fields.get(i);
 -                      for (int i = 0; i < fields.size(); i++) {
 -                              if (!val.isDirty(i)) {
 -                                      continue;
 -                              }
 -                              Field field = fields.get(i);
--
-         Object o = val.get(field.pos());
 -                              Object o = val.get(field.pos());
--
-         Pair<Text,Text> col = mapping.fieldMap.get(field.name());
 -                              Pair<Text, Text> col = 
mapping.fieldMap.get(field.name());
--
-         if (col == null) {
-           throw new GoraException("Please define the gora to accumulo mapping 
for field " + field.name());
-         }
 -                              if (col == null) {
 -                                      throw new GoraException("Please define 
the gora to accumulo mapping for field " + field.name());
 -                              }
--
-         switch (field.schema().getType()) {
-         case MAP:
-           count = putMap(m, count, field.schema().getValueType(), o, col, 
field.name());
-           break;
-         case ARRAY:
-           count = putArray(m, count, o, col, field.name());
-           break;
-         case UNION: // default value of null acts like union with null
-           Schema effectiveSchema = field.schema().getTypes()
-           .get(firstNotNullSchemaTypeIndex(field.schema()));
-           // map and array need to compute qualifier
-           if (effectiveSchema.getType() == Type.ARRAY) {
-             count = putArray(m, count, o, col, field.name());
-             break;
-           }
-           else if (effectiveSchema.getType() == Type.MAP) {
-             count = putMap(m, count, effectiveSchema.getValueType(), o, col, 
field.name());
-             break;
-           }
-           // continue like a regular top-level union
-         case RECORD:
-           final SpecificDatumWriter<Object> writer = new 
SpecificDatumWriter<>(field.schema());
-           final byte[] byteData = IOUtils.serialize(writer,o);
-           m.put(col.getFirst(), col.getSecond(), new Value(byteData));
-           count++;
-           break;
-         default:
-           m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
-           count++;
-         }
 -                              switch (field.schema().getType()) {
 -                              case MAP:
 -                                      count = putMap(m, count, 
field.schema().getValueType(), o, col, field.name());
 -                                      break;
 -                              case ARRAY:
 -                                      count = putArray(m, count, o, col, 
field.name());
 -                                      break;
 -                              case UNION: // default value of null acts like 
union with null
 -                                      Schema effectiveSchema = 
field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
 -                                      // map and array need to compute 
qualifier
 -                                      if (effectiveSchema.getType() == 
Type.ARRAY) {
 -                                              count = putArray(m, count, o, 
col, field.name());
 -                                              break;
 -                                      } else if (effectiveSchema.getType() == 
Type.MAP) {
 -                                              count = putMap(m, count, 
effectiveSchema.getValueType(), o, col, field.name());
 -                                              break;
 -                                      }
 -                                      // continue like a regular top-level 
union
 -                              case RECORD:
 -                                      final SpecificDatumWriter<Object> 
writer = new SpecificDatumWriter<>(field.schema());
 -                                      final byte[] byteData = 
IOUtils.serialize(writer, o);
 -                                      m.put(col.getFirst(), col.getSecond(), 
new Value(byteData));
 -                                      count++;
 -                                      break;
 -                              default:
 -                                      m.put(col.getFirst(), col.getSecond(), 
new Value(toBytes(o)));
 -                                      count++;
 -                              }
--
-       }
 -                      }
--
-       if (count > 0)
-         try {
-           getBatchWriter().addMutation(m);
-         } catch (MutationsRejectedException e) {
-           LOG.error(e.getMessage(), e);
-         }
-     } catch (IOException e) {
-       LOG.error(e.getMessage(), e);
-     }
-   }
 -                      if (count > 0)
 -                              try {
 -                                      getBatchWriter().addMutation(m);
 -                              } catch (MutationsRejectedException e) {
 -                                      LOG.error(e.getMessage(), e);
 -                              }
 -              } catch (IOException e) {
 -                      LOG.error(e.getMessage(), e);
 -              }
 -      }
--
-   private int putMap(Mutation m, int count, Schema valueType, Object o, 
Pair<Text, Text> col, String fieldName) throws GoraException {
 -      private int putMap(Mutation m, int count, Schema valueType, Object o, 
Pair<Text, Text> col, String fieldName)
 -                      throws GoraException {
--
-     // First of all we delete map field on accumulo store
-     Text rowKey = new Text(m.getRow());
-     Query<K, T> query = newQuery();
-     query.setFields(fieldName);
-     query.setStartKey((K)rowKey.toString());
-     query.setEndKey((K)rowKey.toString());
-     deleteByQuery(query);
-     flush();
-     if (o == null){
-       return 0;
-     }
 -              // First of all we delete map field on accumulo store
 -              Text rowKey = new Text(m.getRow());
 -              Query<K, T> query = newQuery();
 -              query.setFields(fieldName);
 -              query.setStartKey((K) rowKey.toString());
 -              query.setEndKey((K) rowKey.toString());
 -              deleteByQuery(query);
 -              flush();
 -              if (o == null) {
 -                      return 0;
 -              }
--
-     Set<?> es = ((Map<?, ?>)o).entrySet();
-     for (Object entry : es) {
-       Object mapKey = ((Entry<?, ?>) entry).getKey();
-       Object mapVal = ((Entry<?, ?>) entry).getValue();
-       if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, 
?>)o).isDirty())
-           || !(o instanceof DirtyMapWrapper)) {
-         m.put(col.getFirst(), new Text(toBytes(mapKey)), new 
Value(toBytes(valueType, mapVal)));
-         count++;
-       }
-       // TODO map value deletion
-     }
-     return count;
-   }
 -              Set<?> es = ((Map<?, ?>) o).entrySet();
 -              for (Object entry : es) {
 -                      Object mapKey = ((Entry<?, ?>) entry).getKey();
 -                      Object mapVal = ((Entry<?, ?>) entry).getValue();
 -                      if ((o instanceof DirtyMapWrapper && 
((DirtyMapWrapper<?, ?>) o).isDirty())
 -                                      || !(o instanceof DirtyMapWrapper)) {
 -                              m.put(col.getFirst(), new 
Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
 -                              count++;
 -                      }
 -                      // TODO map value deletion
 -              }
 -              return count;
 -      }
--
-   private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, 
String fieldName) {
 -      private int putArray(Mutation m, int count, Object o, Pair<Text, Text> 
col, String fieldName) {
--
-     // First of all we delete array field on accumulo store
-     Text rowKey = new Text(m.getRow());
-     Query<K, T> query = newQuery();
-     query.setFields(fieldName);
-     query.setStartKey((K)rowKey.toString());
-     query.setEndKey((K)rowKey.toString());
-     deleteByQuery(query);
-     flush();
-     if (o == null){
-       return 0;
-     }
 -              // First of all we delete array field on accumulo store
 -              Text rowKey = new Text(m.getRow());
 -              Query<K, T> query = newQuery();
 -              query.setFields(fieldName);
 -              query.setStartKey((K) rowKey.toString());
 -              query.setEndKey((K) rowKey.toString());
 -              deleteByQuery(query);
 -              flush();
 -              if (o == null) {
 -                      return 0;
 -              }
--
-     List<?> array = (List<?>) o;  // both GenericArray and DirtyListWrapper
-     int j = 0;
-     for (Object item : array) {
-       m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
-       count++;
-     }
-     return count;
-   }
 -              List<?> array = (List<?>) o; // both GenericArray and 
DirtyListWrapper
 -              int j = 0;
 -              for (Object item : array) {
 -                      m.put(col.getFirst(), new Text(toBytes(j++)), new 
Value(toBytes(item)));
 -                      count++;
 -              }
 -              return count;
 -      }
--
-   @Override
-   public boolean delete(K key) {
-     Query<K,T> q = newQuery();
-     q.setKey(key);
-     return deleteByQuery(q) > 0;
-   }
 -      @Override
 -      public boolean delete(K key) {
 -              Query<K, T> q = newQuery();
 -              q.setKey(key);
 -              return deleteByQuery(q) > 0;
 -      }
--
-   @Override
-   public long deleteByQuery(Query<K,T> query) {
-     try {
-       Scanner scanner = createScanner(query);
-       // add iterator that drops values on the server side
-       scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, 
SortedKeyIterator.class));
-       RowIterator iterator = new RowIterator(scanner.iterator());
 -      @Override
 -      public long deleteByQuery(Query<K, T> query) {
 -              try {
 -                      Scanner scanner = createScanner(query);
 -                      // add iterator that drops values on the server side
 -                      scanner.addScanIterator(new 
IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
 -                      RowIterator iterator = new 
RowIterator(scanner.iterator());
--
-       long count = 0;
 -                      long count = 0;
--
-       while (iterator.hasNext()) {
-         Iterator<Entry<Key,Value>> row = iterator.next();
-         Mutation m = null;
-         while (row.hasNext()) {
-           Entry<Key,Value> entry = row.next();
-           Key key = entry.getKey();
-           if (m == null)
-             m = new Mutation(key.getRow());
-           // TODO optimize to avoid continually creating column vis? prob 
does not matter for empty
-           m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new 
ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
-         }
-         getBatchWriter().addMutation(m);
-         count++;
-       }
 -                      while (iterator.hasNext()) {
 -                              Iterator<Entry<Key, Value>> row = 
iterator.next();
 -                              Mutation m = null;
 -                              while (row.hasNext()) {
 -                                      Entry<Key, Value> entry = row.next();
 -                                      Key key = entry.getKey();
 -                                      if (m == null)
 -                                              m = new Mutation(key.getRow());
 -                                      // TODO optimize to avoid continually 
creating column vis?
 -                                      // prob does not matter for empty
 -                                      m.putDelete(key.getColumnFamily(), 
key.getColumnQualifier(),
 -                                                      new 
ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
 -                              }
 -                              getBatchWriter().addMutation(m);
 -                              count++;
 -                      }
--
-       return count;
-     } catch (TableNotFoundException e) {
-       // TODO return 0?
-       LOG.error(e.getMessage(), e);
-       return 0;
-     } catch (MutationsRejectedException e) {
-       LOG.error(e.getMessage(), e);
-       return 0;
-     } catch (IOException e){
-       LOG.error(e.getMessage(), e);
-       return 0;
-     }
-   }
 -                      return count;
 -              } catch (TableNotFoundException e) {
 -                      // TODO return 0?
 -                      LOG.error(e.getMessage(), e);
 -                      return 0;
 -              } catch (MutationsRejectedException e) {
 -                      LOG.error(e.getMessage(), e);
 -                      return 0;
 -              } catch (IOException e) {
 -                      LOG.error(e.getMessage(), e);
 -                      return 0;
 -              }
 -      }
--
-   private Range createRange(Query<K,T> query) {
-     Text startRow = null;
-     Text endRow = null;
 -      private Range createRange(Query<K, T> query) {
 -              Text startRow = null;
 -              Text endRow = null;
--
-     if (query.getStartKey() != null)
-       startRow = new Text(toBytes(query.getStartKey()));
 -              if (query.getStartKey() != null)
 -                      startRow = new Text(toBytes(query.getStartKey()));
--
-     if (query.getEndKey() != null)
-       endRow = new Text(toBytes(query.getEndKey()));
 -              if (query.getEndKey() != null)
 -                      endRow = new Text(toBytes(query.getEndKey()));
--
-     return new Range(startRow, true, endRow, true);
 -              return new Range(startRow, true, endRow, true);
--
-   }
 -      }
--
-   private Scanner createScanner(Query<K,T> query) throws 
TableNotFoundException {
-     // TODO make isolated scanner optional?
-     Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
-     setFetchColumns(scanner, query.getFields());
 -      private Scanner createScanner(Query<K, T> query) throws 
TableNotFoundException {
 -              // TODO make isolated scanner optional?
 -              Scanner scanner = new 
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
 -              setFetchColumns(scanner, query.getFields());
--
-     scanner.setRange(createRange(query));
 -              scanner.setRange(createRange(query));
--
-     if (query.getStartTime() != -1 || query.getEndTime() != -1) {
-       IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
-       if (query.getStartTime() != -1)
-         TimestampFilter.setStart(is, query.getStartTime(), true);
-       if (query.getEndTime() != -1)
-         TimestampFilter.setEnd(is, query.getEndTime(), true);
 -              if (query.getStartTime() != -1 || query.getEndTime() != -1) {
 -                      IteratorSetting is = new IteratorSetting(30, 
TimestampFilter.class);
 -                      if (query.getStartTime() != -1)
 -                              TimestampFilter.setStart(is, 
query.getStartTime(), true);
 -                      if (query.getEndTime() != -1)
 -                              TimestampFilter.setEnd(is, query.getEndTime(), 
true);
--
-       scanner.addScanIterator(is);
-     }
 -                      scanner.addScanIterator(is);
 -              }
--
-     return scanner;
-   }
 -              return scanner;
 -      }
--
-   /**
-    * Execute the query and return the result.
-    */
-   @Override
-   public Result<K,T> execute(Query<K,T> query) {
-     try {
-       Scanner scanner = createScanner(query);
-       return new AccumuloResult<>(this, query, scanner);
-     } catch (TableNotFoundException e) {
-       // TODO return empty result?
-       LOG.error(e.getMessage(), e);
-       return null;
-     }
-   }
 -      /**
 -       * Execute the query and return the result.
 -       */
 -      @Override
 -      public Result<K, T> execute(Query<K, T> query) {
 -              try {
 -                      Scanner scanner = createScanner(query);
 -                      return new AccumuloResult<>(this, query, scanner);
 -              } catch (TableNotFoundException e) {
 -                      // TODO return empty result?
 -                      LOG.error(e.getMessage(), e);
 -                      return null;
 -              }
 -      }
--
-   @Override
-   public Query<K,T> newQuery() {
-     return new AccumuloQuery<>(this);
-   }
 -      @Override
 -      public Query<K, T> newQuery() {
 -              return new AccumuloQuery<>(this);
 -      }
--
-   Text pad(Text key, int bytes) {
-     if (key.getLength() < bytes)
-       key = new Text(key);
 -      Text pad(Text key, int bytes) {
 -              if (key.getLength() < bytes)
 -                      key = new Text(key);
--
-     while (key.getLength() < bytes) {
-       key.append(new byte[] {0}, 0, 1);
-     }
 -              while (key.getLength() < bytes) {
 -                      key.append(new byte[] { 0 }, 0, 1);
 -              }
--
-     return key;
-   }
 -              return key;
 -      }
--
-   @Override
-   public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws 
IOException {
-     try {
-       TabletLocator tl;
-       if (conn instanceof MockConnector)
-         tl = new MockTabletLocator();
-       else
-         tl = TabletLocator.getLocator(new ClientContext(conn.getInstance(), 
credentials, AccumuloConfiguration.getTableConfiguration(conn, 
Tables.getTableId(conn.getInstance(), mapping.tableName))), new 
Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
 -      @Override
 -      public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) 
throws IOException {
 -              try {
 -                      TabletLocator tl;
 -                      if (conn instanceof MockConnector)
 -                              tl = new MockTabletLocator();
 -                      else
 -                              tl = TabletLocator.getLocator(
 -                                              new 
ClientContext(conn.getInstance(), credentials,
 -                                                              
AccumuloConfiguration.getTableConfiguration(conn,
 -                                                                              
Tables.getTableId(conn.getInstance(), mapping.tableName))),
 -                                              new 
Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
--
-       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 -                      Map<String, Map<KeyExtent, List<Range>>> binnedRanges = 
new HashMap<>();
--
-       tl.invalidateCache();
-       while (tl.binRanges(new ClientContext(conn.getInstance(), credentials, 
AccumuloConfiguration.getTableConfiguration(conn, 
Tables.getTableId(conn.getInstance(), mapping.tableName))), 
Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
-         // TODO log?
-         if (!Tables.exists(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName)))
-           throw new 
TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
-         else if (Tables.getTableState(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
-           throw new TableOfflineException(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName));
-         UtilWaitThread.sleep(100);
-         tl.invalidateCache();
-       }
 -                      tl.invalidateCache();
 -                      while (tl.binRanges(
 -                                      new ClientContext(conn.getInstance(), 
credentials,
 -                                                      
AccumuloConfiguration.getTableConfiguration(conn,
 -                                                                      
Tables.getTableId(conn.getInstance(), mapping.tableName))),
 -                                      
Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
 -                              // TODO log?
 -                              if (!Tables.exists(conn.getInstance(), 
Tables.getTableId(conn.getInstance(), mapping.tableName)))
 -                                      throw new 
TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
 -                              else if 
(Tables.getTableState(conn.getInstance(),
 -                                              
Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
 -                                      throw new 
TableOfflineException(conn.getInstance(),
 -                                                      
Tables.getTableId(conn.getInstance(), mapping.tableName));
 -                              UtilWaitThread.sleep(100);
 -                              tl.invalidateCache();
 -                      }
--
-       List<PartitionQuery<K,T>> ret = new ArrayList<>();
 -                      List<PartitionQuery<K, T>> ret = new ArrayList<>();
--
-       Text startRow = null;
-       Text endRow = null;
-       if (query.getStartKey() != null)
-         startRow = new Text(toBytes(query.getStartKey()));
-       if (query.getEndKey() != null)
-         endRow = new Text(toBytes(query.getEndKey()));
 -                      Text startRow = null;
 -                      Text endRow = null;
 -                      if (query.getStartKey() != null)
 -                              startRow = new 
Text(toBytes(query.getStartKey()));
 -                      if (query.getEndKey() != null)
 -                              endRow = new Text(toBytes(query.getEndKey()));
--
-       //hadoop expects hostnames, accumulo keeps track of IPs... so need to 
convert
-       HashMap<String,String> hostNameCache = new HashMap<>();
 -                      // hadoop expects hostnames, accumulo keeps track of 
IPs... so need
 -                      // to convert
 -                      HashMap<String, String> hostNameCache = new HashMap<>();
--
-       for (Entry<String,Map<KeyExtent,List<Range>>> entry : 
binnedRanges.entrySet()) {
-         String ip = entry.getKey().split(":", 2)[0];
-         String location = hostNameCache.get(ip);
-         if (location == null) {
-           InetAddress inetAddress = InetAddress.getByName(ip);
-           location = inetAddress.getHostName();
-           hostNameCache.put(ip, location);
-         }
 -                      for (Entry<String, Map<KeyExtent, List<Range>>> entry : 
binnedRanges.entrySet()) {
 -                              String ip = entry.getKey().split(":", 2)[0];
 -                              String location = hostNameCache.get(ip);
 -                              if (location == null) {
 -                                      InetAddress inetAddress = 
InetAddress.getByName(ip);
 -                                      location = inetAddress.getHostName();
 -                                      hostNameCache.put(ip, location);
 -                              }
--
-         Map<KeyExtent,List<Range>> tablets = entry.getValue();
-         for (KeyExtent ke : tablets.keySet()) {
 -                              Map<KeyExtent, List<Range>> tablets = 
entry.getValue();
 -                              for (KeyExtent ke : tablets.keySet()) {
--
-           K startKey = null;
-           if (startRow == null || !ke.contains(startRow)) {
-             if (ke.getPrevEndRow() != null) {
-               startKey = followingKey(encoder, getKeyClass(), 
getBytes(ke.getPrevEndRow()));
-             }
-           } else {
-             startKey = fromBytes(getKeyClass(), getBytes(startRow));
-           }
 -                                      K startKey = null;
 -                                      if (startRow == null || 
!ke.contains(startRow)) {
 -                                              if (ke.getPrevEndRow() != null) 
{
 -                                                      startKey = 
followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow()));
 -                                              }
 -                                      } else {
 -                                              startKey = 
fromBytes(getKeyClass(), getBytes(startRow));
 -                                      }
--
-           K endKey = null;
-           if (endRow == null || !ke.contains(endRow)) {
-             if (ke.getEndRow() != null)
-               endKey = lastPossibleKey(encoder, getKeyClass(), 
getBytes(ke.getEndRow()));
-           } else {
-             endKey = fromBytes(getKeyClass(), getBytes(endRow));
-           }
 -                                      K endKey = null;
 -                                      if (endRow == null || 
!ke.contains(endRow)) {
 -                                              if (ke.getEndRow() != null)
 -                                                      endKey = 
lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow()));
 -                                      } else {
 -                                              endKey = 
fromBytes(getKeyClass(), getBytes(endRow));
 -                                      }
--
-           PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, 
startKey, endKey, location);
-           pqi.setConf(getConf());
-           ret.add(pqi);
-         }
-       }
 -                                      PartitionQueryImpl<K, T> pqi = new 
PartitionQueryImpl<>(query, startKey, endKey, location);
 -                                      pqi.setConf(getConf());
 -                                      ret.add(pqi);
 -                              }
 -                      }
--
-       return ret;
-     } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
-       throw new IOException(e);
-     }
 -                      return ret;
 -              } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
 -                      throw new IOException(e);
 -              }
--
-   }
 -      }
--
-   static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
 -      static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] 
er) {
--
-     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-       throw new UnsupportedOperationException();
-     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-       throw new UnsupportedOperationException();
-     } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-       return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
-     } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-       return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
-     } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-       return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
-     } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-       return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
-     } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-       return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
-     } else if (clazz.equals(String.class)) {
-       throw new UnsupportedOperationException();
-     } else if (clazz.equals(Utf8.class)) {
-       return fromBytes(encoder, clazz, er);
-     }
 -              if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
 -                      throw new UnsupportedOperationException();
 -              } else if (clazz.equals(Boolean.TYPE) || 
clazz.equals(Boolean.class)) {
 -                      throw new UnsupportedOperationException();
 -              } else if (clazz.equals(Short.TYPE) || 
clazz.equals(Short.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.lastPossibleKey(2, er));
 -              } else if (clazz.equals(Integer.TYPE) || 
clazz.equals(Integer.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.lastPossibleKey(4, er));
 -              } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) 
{
 -                      return fromBytes(encoder, clazz, 
encoder.lastPossibleKey(8, er));
 -              } else if (clazz.equals(Float.TYPE) || 
clazz.equals(Float.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.lastPossibleKey(4, er));
 -              } else if (clazz.equals(Double.TYPE) || 
clazz.equals(Double.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.lastPossibleKey(8, er));
 -              } else if (clazz.equals(String.class)) {
 -                      throw new UnsupportedOperationException();
 -              } else if (clazz.equals(Utf8.class)) {
 -                      return fromBytes(encoder, clazz, er);
 -              }
--
-     throw new IllegalArgumentException(UNKOWN + clazz.getName());
-   }
 -              throw new IllegalArgumentException(UNKOWN + clazz.getName());
 -      }
--
-   @SuppressWarnings("unchecked")
-   static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
 -      @SuppressWarnings("unchecked")
 -      static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
--
-     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-       return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
-     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-       throw new UnsupportedOperationException();
-     } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-       return fromBytes(encoder, clazz, encoder.followingKey(2, per));
-     } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-       return fromBytes(encoder, clazz, encoder.followingKey(4, per));
-     } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-       return fromBytes(encoder, clazz, encoder.followingKey(8, per));
-     } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-       return fromBytes(encoder, clazz, encoder.followingKey(4, per));
-     } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-       return fromBytes(encoder, clazz, encoder.followingKey(8, per));
-     } else if (clazz.equals(String.class)) {
-       throw new UnsupportedOperationException();
-     } else if (clazz.equals(Utf8.class)) {
-       return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
-     }
 -              if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
 -                      return (K) Byte.valueOf(encoder.followingKey(1, 
per)[0]);
 -              } else if (clazz.equals(Boolean.TYPE) || 
clazz.equals(Boolean.class)) {
 -                      throw new UnsupportedOperationException();
 -              } else if (clazz.equals(Short.TYPE) || 
clazz.equals(Short.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.followingKey(2, per));
 -              } else if (clazz.equals(Integer.TYPE) || 
clazz.equals(Integer.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.followingKey(4, per));
 -              } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) 
{
 -                      return fromBytes(encoder, clazz, 
encoder.followingKey(8, per));
 -              } else if (clazz.equals(Float.TYPE) || 
clazz.equals(Float.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.followingKey(4, per));
 -              } else if (clazz.equals(Double.TYPE) || 
clazz.equals(Double.class)) {
 -                      return fromBytes(encoder, clazz, 
encoder.followingKey(8, per));
 -              } else if (clazz.equals(String.class)) {
 -                      throw new UnsupportedOperationException();
 -              } else if (clazz.equals(Utf8.class)) {
 -                      return fromBytes(encoder, clazz, Arrays.copyOf(per, 
per.length + 1));
 -              }
--
-     throw new IllegalArgumentException(UNKOWN + clazz.getName());
-   }
 -              throw new IllegalArgumentException(UNKOWN + clazz.getName());
 -      }
--
-   @Override
-   public void flush() {
-     try {
-       if (batchWriter != null) {
-         batchWriter.flush();
-       }
-     } catch (MutationsRejectedException e) {
-       LOG.error(e.getMessage(), e);
-     }
-   }
 -      @Override
 -      public void flush() {
 -              try {
 -                      if (batchWriter != null) {
 -                              batchWriter.flush();
 -                      }
 -              } catch (MutationsRejectedException e) {
 -                      LOG.error(e.getMessage(), e);
 -              }
 -      }
--
-   @Override
-   public void close() {
-     try {
-       if (batchWriter != null) {
-         batchWriter.close();
-         batchWriter = null;
-       }
-     } catch (MutationsRejectedException e) {
-       LOG.error(e.getMessage(), e);
-     }
-   }
 -      @Override
 -      public void close() {
 -              try {
 -                      if (batchWriter != null) {
 -                              batchWriter.close();
 -                              batchWriter = null;
 -                      }
 -              } catch (MutationsRejectedException e) {
 -                      LOG.error(e.getMessage(), e);
 -              }
 -      }
--}
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.gora.accumulo.store;
++
++import java.io.ByteArrayOutputStream;
++import java.io.IOException;
++import java.net.InetAddress;
++import java.nio.ByteBuffer;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.HashMap;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Properties;
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import javax.xml.parsers.DocumentBuilder;
++import javax.xml.parsers.DocumentBuilderFactory;
++
++import org.apache.accumulo.core.client.AccumuloException;
++import org.apache.accumulo.core.client.AccumuloSecurityException;
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.BatchWriterConfig;
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.IsolatedScanner;
++import org.apache.accumulo.core.client.IteratorSetting;
++import org.apache.accumulo.core.client.MutationsRejectedException;
++import org.apache.accumulo.core.client.RowIterator;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.client.TableDeletedException;
++import org.apache.accumulo.core.client.TableExistsException;
++import org.apache.accumulo.core.client.TableNotFoundException;
++import org.apache.accumulo.core.client.TableOfflineException;
++import org.apache.accumulo.core.client.ZooKeeperInstance;
++import org.apache.accumulo.core.client.impl.ClientContext;
++import org.apache.accumulo.core.client.impl.Tables;
++import org.apache.accumulo.core.client.impl.TabletLocator;
++import org.apache.accumulo.core.client.mock.MockConnector;
++import org.apache.accumulo.core.client.mock.MockInstance;
++import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
++import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
++import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
++import org.apache.accumulo.core.data.ByteSequence;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.impl.KeyExtent;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.iterators.SortedKeyIterator;
++import org.apache.accumulo.core.iterators.user.TimestampFilter;
++import org.apache.accumulo.core.master.state.tables.TableState;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.security.ColumnVisibility;
++import org.apache.accumulo.core.client.impl.Credentials;
++import org.apache.accumulo.core.util.Pair;
++import org.apache.accumulo.core.util.UtilWaitThread;
++import org.apache.avro.Schema;
++import org.apache.avro.Schema.Field;
++import org.apache.avro.Schema.Type;
++import org.apache.avro.generic.GenericData;
++import org.apache.avro.io.BinaryDecoder;
++import org.apache.avro.io.Decoder;
++import org.apache.avro.io.DecoderFactory;
++import org.apache.avro.io.EncoderFactory;
++import org.apache.avro.specific.SpecificDatumReader;
++import org.apache.avro.specific.SpecificDatumWriter;
++import org.apache.avro.util.Utf8;
++import org.apache.gora.accumulo.encoders.BinaryEncoder;
++import org.apache.gora.accumulo.encoders.Encoder;
++import org.apache.gora.accumulo.query.AccumuloQuery;
++import org.apache.gora.accumulo.query.AccumuloResult;
++import org.apache.gora.persistency.impl.DirtyListWrapper;
++import org.apache.gora.persistency.impl.DirtyMapWrapper;
++import org.apache.gora.persistency.impl.PersistentBase;
++import org.apache.gora.query.PartitionQuery;
++import org.apache.gora.query.Query;
++import org.apache.gora.query.Result;
++import org.apache.gora.query.impl.PartitionQueryImpl;
++import org.apache.gora.store.DataStoreFactory;
++import org.apache.gora.store.impl.DataStoreBase;
++import org.apache.gora.util.AvroUtils;
++import org.apache.gora.util.GoraException;
++import org.apache.gora.util.IOUtils;
++import org.apache.hadoop.io.Text;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import org.w3c.dom.Document;
++import org.w3c.dom.Element;
++import org.w3c.dom.NodeList;
++
++/**
++ * Implementation of a Accumulo data store to be used by gora.
++ *
++ * @param <K>
++ *            class to be used for the key
++ * @param <T>
++ *            class to be persisted within the store
++ */
++public class AccumuloStore<K,T extends PersistentBase> extends 
DataStoreBase<K,T> {
++
++  protected static final String MOCK_PROPERTY = "accumulo.mock";
++  protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
++  protected static final String ZOOKEEPERS_NAME_PROPERTY = 
"accumulo.zookeepers";
++  protected static final String USERNAME_PROPERTY = "accumulo.user";
++  protected static final String PASSWORD_PROPERTY = "accumulo.password";
++  protected static final String DEFAULT_MAPPING_FILE = 
"gora-accumulo-mapping.xml";
++
++  private final static String UNKOWN = "Unknown type ";
++
++  private Connector conn;
++  private BatchWriter batchWriter;
++  private AccumuloMapping mapping;
++  private Credentials credentials;
++  private Encoder encoder;
++
++  public static final Logger LOG = 
LoggerFactory.getLogger(AccumuloStore.class);
++
++  public Object fromBytes(Schema schema, byte[] data) throws IOException {
++    Schema fromSchema = null;
++    if (schema.getType() == Type.UNION) {
++      try {
++        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
++        int unionIndex = decoder.readIndex();
++        List<Schema> possibleTypes = schema.getTypes();
++        fromSchema = possibleTypes.get(unionIndex);
++        Schema effectiveSchema = possibleTypes.get(unionIndex);
++        if (effectiveSchema.getType() == Type.NULL) {
++          decoder.readNull();
++          return null;
++        } else {
++          data = decoder.readBytes(null).array();
++        }
++      } catch (IOException e) {
++        LOG.error(e.getMessage());
++        throw new GoraException("Error decoding union type: ", e);
++      }
++    } else {
++      fromSchema = schema;
++    }
++    return fromBytes(encoder, fromSchema, data);
++  }
++
++  public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) 
throws IOException {
++    switch (schema.getType()) {
++    case BOOLEAN:
++      return encoder.decodeBoolean(data);
++    case DOUBLE:
++      return encoder.decodeDouble(data);
++    case FLOAT:
++      return encoder.decodeFloat(data);
++    case INT:
++      return encoder.decodeInt(data);
++    case LONG:
++      return encoder.decodeLong(data);
++    case STRING:
++      return new Utf8(data);
++    case BYTES:
++      return ByteBuffer.wrap(data);
++    case ENUM:
++      return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
++    case ARRAY:
++      break;
++    case FIXED:
++      break;
++    case MAP:
++      break;
++    case NULL:
++      break;
++    case RECORD:
++      break;
++    case UNION:
++      break;
++    default:
++      break;
++    }
++    throw new IllegalArgumentException(UNKOWN + schema.getType());
++
++  }
++
++  private static byte[] getBytes(Text text) {
++    byte[] bytes = text.getBytes();
++    if (bytes.length != text.getLength()) {
++      bytes = new byte[text.getLength()];
++      System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
++    }
++    return bytes;
++  }
++
++  public K fromBytes(Class<K> clazz, byte[] val) {
++    return fromBytes(encoder, clazz, val);
++  }
++
++  @SuppressWarnings("unchecked")
++  public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
++    try {
++      if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
++        return (K) Byte.valueOf(encoder.decodeByte(val));
++      } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
++        return (K) Boolean.valueOf(encoder.decodeBoolean(val));
++      } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
++        return (K) Short.valueOf(encoder.decodeShort(val));
++      } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
++        return (K) Integer.valueOf(encoder.decodeInt(val));
++      } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
++        return (K) Long.valueOf(encoder.decodeLong(val));
++      } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
++        return (K) Float.valueOf(encoder.decodeFloat(val));
++      } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
++        return (K) Double.valueOf(encoder.decodeDouble(val));
++      } else if (clazz.equals(String.class)) {
++        return (K) new String(val, "UTF-8");
++      } else if (clazz.equals(Utf8.class)) {
++        return (K) new Utf8(val);
++      }
++
++      throw new IllegalArgumentException(UNKOWN + clazz.getName());
++    } catch (IOException ioe) {
++      LOG.error(ioe.getMessage());
++      throw new RuntimeException(ioe);
++    }
++  }
++
++  private static byte[] copyIfNeeded(byte b[], int offset, int len) {
++    if (len != b.length || offset != 0) {
++      byte[] copy = new byte[len];
++      System.arraycopy(b, offset, copy, 0, copy.length);
++      b = copy;
++    }
++    return b;
++  }
++
++  public byte[] toBytes(Schema toSchema, Object o) {
++    if (toSchema != null && toSchema.getType() == Type.UNION) {
++      ByteArrayOutputStream baos = new ByteArrayOutputStream();
++      org.apache.avro.io.BinaryEncoder avroEncoder = 
EncoderFactory.get().binaryEncoder(baos, null);
++      int unionIndex = 0;
++      try {
++        if (o == null) {
++          unionIndex = firstNullSchemaTypeIndex(toSchema);
++          avroEncoder.writeIndex(unionIndex);
++          avroEncoder.writeNull();
++        } else {
++          unionIndex = firstNotNullSchemaTypeIndex(toSchema);
++          avroEncoder.writeIndex(unionIndex);
++          avroEncoder.writeBytes(toBytes(o));
++        }
++        avroEncoder.flush();
++        return baos.toByteArray();
++      } catch (IOException e) {
++        LOG.error(e.getMessage());
++        return toBytes(o);
++      }
++    } else {
++      return toBytes(o);
++    }
++  }
++
++  private int firstNullSchemaTypeIndex(Schema toSchema) {
++    List<Schema> possibleTypes = toSchema.getTypes();
++    int unionIndex = 0;
++    for (int i = 0; i < possibleTypes.size(); i++ ) {
++      Type pType = possibleTypes.get(i).getType();
++      if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
++        unionIndex = i; break;
++      }
++    }
++    return unionIndex;
++  }
++
++  private int firstNotNullSchemaTypeIndex(Schema toSchema) {
++    List<Schema> possibleTypes = toSchema.getTypes();
++    int unionIndex = 0;
++    for (int i = 0; i < possibleTypes.size(); i++ ) {
++      Type pType = possibleTypes.get(i).getType();
++      if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
++        unionIndex = i; break;
++      }
++    }
++    return unionIndex;
++  }
++
++  public byte[] toBytes(Object o) {
++    return toBytes(encoder, o);
++  }
++
++  public static byte[] toBytes(Encoder encoder, Object o) {
++
++    try {
++      if (o instanceof String) {
++        return ((String) o).getBytes("UTF-8");
++      } else if (o instanceof Utf8) {
++        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) 
o).getByteLength());
++      } else if (o instanceof ByteBuffer) {
++        return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) 
o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
++      } else if (o instanceof Long) {
++        return encoder.encodeLong((Long) o);
++      } else if (o instanceof Integer) {
++        return encoder.encodeInt((Integer) o);
++      } else if (o instanceof Short) {
++        return encoder.encodeShort((Short) o);
++      } else if (o instanceof Byte) {
++        return encoder.encodeByte((Byte) o);
++      } else if (o instanceof Boolean) {
++        return encoder.encodeBoolean((Boolean) o);
++      } else if (o instanceof Float) {
++        return encoder.encodeFloat((Float) o);
++      } else if (o instanceof Double) {
++        return encoder.encodeDouble((Double) o);
++      } else if (o instanceof Enum) {
++        return encoder.encodeInt(((Enum<?>) o).ordinal());
++      }
++    } catch (IOException ioe) {
++      throw new RuntimeException(ioe);
++    }
++
++    throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
++  }
++
++  private BatchWriter getBatchWriter() throws IOException {
++    if (batchWriter == null)
++      try {
++        BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
++        batchWriterConfig.setMaxMemory(10000000);
++        batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
++        batchWriterConfig.setMaxWriteThreads(4);
++        batchWriter = conn.createBatchWriter(mapping.tableName, 
batchWriterConfig);
++      } catch (TableNotFoundException e) {
++        throw new IOException(e);
++      }
++    return batchWriter;
++  }
++
++  /**
++   * Initialize the data store by reading the credentials, setting the 
client's properties up and
++   * reading the mapping file. Initialize is called when then the call to
++   * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
++   *
++   * @param keyClass
++   * @param persistentClass
++   * @param properties
++   */
++  @Override
++  public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) {
++    try{
++      super.initialize(keyClass, persistentClass, properties);
++
++      String mock = DataStoreFactory.findProperty(properties, this, 
MOCK_PROPERTY, null);
++      String mappingFile = DataStoreFactory.getMappingFile(properties, this, 
DEFAULT_MAPPING_FILE);
++      String user = DataStoreFactory.findProperty(properties, this, 
USERNAME_PROPERTY, null);
++      String password = DataStoreFactory.findProperty(properties, this, 
PASSWORD_PROPERTY, null);
++
++      mapping = readMapping(mappingFile);
++
++      if (mapping.encoder == null || "".equals(mapping.encoder)) {
++        encoder = new BinaryEncoder();
++      } else {
++        try {
++          encoder = (Encoder) 
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
++        } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
++          throw new IOException(e);
++        }
++      }
++
++      try {
++        AuthenticationToken token = new PasswordToken(password);

<TRUNCATED>

Reply via email to