Author: ctubbsii
Date: Tue Jan 22 18:07:17 2013
New Revision: 1437073

URL: http://svn.apache.org/viewvc?rev=1437073&view=rev
Log:
ACCUMULO-769 Modify mapreduce API to use the Hadoop static configurator 
conventions, but done in a way that allows us to standardize and reuse 
configurator code to support multiple frameworks.

Added:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java
Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
    
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
 Tue Jan 22 18:07:17 2013
@@ -41,8 +41,12 @@ public class ClientOnDefaultTable extend
   @Override
   public void setAccumuloConfigs(Job job) {
     super.setAccumuloConfigs(job);
-    AccumuloInputFormat.setInputInfo(job, user, getPassword(), getTableName(), 
auths);
-    AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, 
getTableName());
+    AccumuloInputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloInputFormat.setInputTableName(job, getTableName());
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloOutputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, getTableName());
   }
   
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
 Tue Jan 22 18:07:17 2013
@@ -30,7 +30,11 @@ public class ClientOnRequiredTable exten
   @Override
   public void setAccumuloConfigs(Job job) {
     super.setAccumuloConfigs(job);
-    AccumuloInputFormat.setInputInfo(job, user, getPassword(), tableName, 
auths);
-    AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, 
tableName);
+    AccumuloInputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloInputFormat.setInputTableName(job, tableName);
+    AccumuloInputFormat.setScanAuthorizations(job, auths);
+    AccumuloOutputFormat.setConnectorInfo(job, user, getPassword());
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, tableName);
   }
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
 Tue Jan 22 18:07:17 2013
@@ -18,20 +18,18 @@ package org.apache.accumulo.core.client.
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.util.FileOutputConfigurator;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +39,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
 
 /**
  * This class allows MapReduce jobs to write output in the Accumulo data file 
format.<br />
@@ -53,70 +52,33 @@ import org.apache.hadoop.mapreduce.lib.o
  * supported at this time.
  */
 public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
-  private static final String PREFIX = 
AccumuloOutputFormat.class.getSimpleName() + ".";
-  private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + 
"accumuloProperties.";
+  
+  private static final Class<?> CLASS = AccumuloFileOutputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
    * This helper method provides an AccumuloConfiguration object constructed 
from the Accumulo defaults, and overridden with Accumulo properties that have 
been
    * stored in the Job's configuration.
    * 
+   * @param context
+   *          the Hadoop context for the configured job
    * @since 1.5.0
-   * @see #setAccumuloProperty(Job, Property, Object)
    */
   protected static AccumuloConfiguration getAccumuloConfiguration(JobContext 
context) {
-    ConfigurationCopy acuConf = new 
ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
-    for (Entry<String,String> entry : context.getConfiguration())
-      if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX))
-        
acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())),
 entry.getValue());
-    return acuConf;
-  }
-  
-  /**
-   * The supported Accumulo properties we set in this OutputFormat, that 
change the behavior of the RecordWriter.<br />
-   * These properties correspond to the supported public static setter methods 
available to this class.
-   * 
-   * @since 1.5.0
-   */
-  protected static boolean isSupportedAccumuloProperty(Property property) {
-    switch (property) {
-      case TABLE_FILE_COMPRESSION_TYPE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE:
-      case TABLE_FILE_BLOCK_SIZE:
-      case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX:
-      case TABLE_FILE_REPLICATION:
-        return true;
-      default:
-        return false;
-    }
-  }
-  
-  /**
-   * Helper for transforming Accumulo configuration properties into something 
that can be stored safely inside the Hadoop Job configuration.
-   * 
-   * @since 1.5.0
-   */
-  protected static <T> void setAccumuloProperty(Job job, Property property, T 
value) {
-    if (isSupportedAccumuloProperty(property)) {
-      String val = String.valueOf(value);
-      if (property.getType().isValidFormat(val))
-        job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + 
property.getKey(), val);
-      else
-        throw new IllegalArgumentException("Value is not appropriate for 
property type '" + property.getType() + "'");
-    } else
-      throw new IllegalArgumentException("Unsupported configuration property " 
+ property.getKey());
+    return FileOutputConfigurator.getAccumuloConfiguration(CLASS, 
context.getConfiguration());
   }
   
   /**
    * Sets the compression type to use for data blocks. Specifying a 
compression may require additional libraries to be available to your Job.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
    * @param compressionType
    *          one of "none", "gz", "lzo", or "snappy"
    * @since 1.5.0
    */
   public static void setCompressionType(Job job, String compressionType) {
-    if (compressionType == null || !Arrays.asList("none", "gz", "lzo", 
"snappy").contains(compressionType))
-      throw new IllegalArgumentException("Compression type must be one of: 
none, gz, lzo, snappy");
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, 
compressionType);
+    FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), 
compressionType);
   }
   
   /**
@@ -126,38 +88,54 @@ public class AccumuloFileOutputFormat ex
    * <p>
    * Making this value smaller may increase seek performance, but at the cost 
of increasing the size of the indexes (which can also affect seek performance).
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param dataBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setDataBlockSize(Job job, long dataBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, 
dataBlockSize);
+    FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), 
dataBlockSize);
   }
   
   /**
    * Sets the size for file blocks in the file system; file blocks are 
managed, and replicated, by the underlying file system.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param fileBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setFileBlockSize(Job job, long fileBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize);
+    FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), 
fileBlockSize);
   }
   
   /**
    * Sets the size for index blocks within each file; smaller blocks means a 
deeper index hierarchy within the file, while larger blocks mean a more shallow
    * index hierarchy within the file. This can affect the performance of 
queries.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param indexBlockSize
+   *          the block size, in bytes
    * @since 1.5.0
    */
   public static void setIndexBlockSize(Job job, long indexBlockSize) {
-    setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, 
indexBlockSize);
+    FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), 
indexBlockSize);
   }
   
   /**
    * Sets the file system replication factor for the resulting file, 
overriding the file system default.
    * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param replication
+   *          the number of replicas for produced files
    * @since 1.5.0
    */
   public static void setReplication(Job job, int replication) {
-    setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication);
+    FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), 
replication);
   }
   
   @Override
@@ -204,35 +182,13 @@ public class AccumuloFileOutputFormat ex
   // 
----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   */
-  @Deprecated
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + 
"instanceConfigured";
-  
-  /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #getInstance(Configuration)
-   */
-  @Deprecated
-  private static final String INSTANCE_NAME = PREFIX + "instanceName";
-  
-  /**
-   * @deprecated since 1.5.0;
-   * @see #setZooKeeperInstance(Configuration, String, String)
-   * @see #getInstance(Configuration)
-   */
-  @Deprecated
-  private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
-  
-  /**
-   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link 
#getAccumuloConfiguration(JobContext)}
+   * @deprecated since 1.5.0; Retrieve the relevant block size from {@link 
#getAccumuloConfiguration(JobContext)} and configure hadoop's
+   *             io.seqfile.compress.blocksize with the same value. No longer 
needed, as {@link RFile} does not use this field.
    */
   @Deprecated
   protected static void handleBlockSize(Configuration conf) {
     conf.setInt("io.seqfile.compress.blocksize",
-        (int) 
AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+        (int) FileOutputConfigurator.getAccumuloConfiguration(CLASS, 
conf).getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
   }
   
   /**
@@ -246,7 +202,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   public static void setBlockSize(Configuration conf, int blockSize) {
-    conf.set(ACCUMULO_PROPERTY_PREFIX + 
Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize));
+    FileOutputConfigurator.setDataBlockSize(CLASS, conf, blockSize);
   }
   
   /**
@@ -255,13 +211,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String 
instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per 
job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    FileOutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, 
zooKeepers);
   }
   
   /**
@@ -270,7 +220,7 @@ public class AccumuloFileOutputFormat ex
    */
   @Deprecated
   protected static Instance getInstance(Configuration conf) {
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), 
conf.get(ZOOKEEPERS));
+    return FileOutputConfigurator.getInstance(CLASS, conf);
   }
   
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
 Tue Jan 22 18:07:17 2013
@@ -21,9 +21,11 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -34,8 +36,10 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link 
AccumuloInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, 
byte[], String, org.apache.accumulo.core.security.Authorizations)}
- * <li>{@link 
AccumuloInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, 
String, String)}
+ * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
+ * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} 
OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 Tue Jan 22 18:07:17 2013
@@ -16,12 +16,7 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -39,14 +34,13 @@ import org.apache.accumulo.core.client.M
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.util.OutputConfigurator;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.util.ArgumentChecker;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -66,146 +60,70 @@ import org.apache.log4j.Logger;
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloOutputFormat#setOutputInfo(Job, String, byte[], boolean, 
String)}
- * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)}
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} 
OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.
  */
 public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
-  private static final Logger log = 
Logger.getLogger(AccumuloOutputFormat.class);
   
-  /**
-   * Prefix shared by all job configuration property names for this class
-   */
-  private static final String PREFIX = 
AccumuloOutputFormat.class.getSimpleName();
+  private static final Class<?> CLASS = AccumuloOutputFormat.class;
+  protected static final Logger log = Logger.getLogger(CLASS);
   
   /**
-   * Used to limit the times a job can be configured with output information 
to 1
+   * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getUsername(JobContext)
-   * @see #getPassword(JobContext)
-   * @see #canCreateTables(JobContext)
-   * @see #getDefaultTableName(JobContext)
-   */
-  private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + 
".configured";
-  
-  /**
-   * Used to limit the times a job can be configured with instance information 
to 1
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String INSTANCE_HAS_BEEN_SET = PREFIX + 
".instanceConfigured";
-  
-  /**
-   * Key for storing the Accumulo user's name
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getUsername(JobContext)
-   */
-  private static final String USERNAME = PREFIX + ".username";
-  
-  /**
-   * Key for storing the Accumulo user's password
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #getPassword(JobContext)
-   */
-  private static final String PASSWORD = PREFIX + ".password";
-  
-  /**
-   * Key for storing the default table to use when the output key is null
-   * 
-   * @see #getDefaultTableName(JobContext)
-   */
-  private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
-  
-  /**
-   * Key for storing the Accumulo instance name to connect to
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String INSTANCE_NAME = PREFIX + ".instanceName";
-  
-  /**
-   * Key for storing the set of Accumulo zookeeper servers to communicate with
-   * 
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
-  
-  /**
-   * Key for storing the directive to use the mock instance type
-   * 
-   * @see #setMockInstance(Job, String)
-   * @see #getInstance(JobContext)
-   */
-  private static final String MOCK = PREFIX + ".useMockInstance";
-  
-  /**
-   * Key for storing the {@link BatchWriterConfig}.
-   * 
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
-   * @see #getBatchWriterOptions(JobContext)
-   */
-  private static final String BATCH_WRITER_CONFIG = PREFIX + ".bwConfig";
-  
-  /**
-   * Key for storing the directive to create tables that don't exist
-   * 
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
-   * @see #canCreateTables(JobContext)
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param user
+   *          a valid Accumulo user name (user must have Table.CREATE 
permission if {@link #setCreateTables(Job, boolean)} is set to true)
+   * @param passwd
+   *          the user's password
+   * @since 1.5.0
    */
-  private static final String CREATETABLES = PREFIX + ".createtables";
+  public static void setConnectorInfo(Job job, String user, byte[] passwd) {
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), user, 
passwd);
+  }
   
   /**
-   * Key for storing the desired logging level
+   * Determines if the connector has been configured.
    * 
-   * @see #setLogLevel(Job, Level)
-   * @see #getLogLevel(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return true if the connector has been configured, false otherwise
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String LOGLEVEL = PREFIX + ".loglevel";
+  protected static Boolean isConnectorInfoSet(JobContext context) {
+    return OutputConfigurator.isConnectorInfoSet(CLASS, 
context.getConfiguration());
+  }
   
   /**
-   * Key for storing the directive to simulate output instead of actually 
writing to a file
+   * Gets the user name from the configuration.
    * 
-   * @see #setSimulationMode(Job, boolean)
-   * @see #getSimulationMode(JobContext)
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the user name
+   * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  private static final String SIMULATE = PREFIX + ".simulate";
+  protected static String getUsername(JobContext context) {
+    return OutputConfigurator.getUsername(CLASS, context.getConfiguration());
+  }
   
   /**
-   * Sets the minimum information needed to write to Accumulo in this job.
+   * Gets the password from the configuration. WARNING: The password is stored 
in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded 
to
+   * provide a charset safe conversion to a string, and is not intended to be 
secure.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param user
-   *          a valid Accumulo user name (user must have Table.CREATE 
permission)
-   * @param passwd
-   *          the user's password
-   * @param createTables
-   *          if true, the output format will create new tables as necessary. 
Table names can only be alpha-numeric and underscores.
-   * @param defaultTable
-   *          the table to use when the tablename is null in the write call
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the decoded user password
    * @since 1.5.0
+   * @see #setConnectorInfo(Job, String, byte[])
    */
-  public static void setOutputInfo(Job job, String user, byte[] passwd, 
boolean createTables, String defaultTable) {
-    if (job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per 
job");
-    job.getConfiguration().setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd);
-    job.getConfiguration().set(USERNAME, user);
-    job.getConfiguration().set(PASSWORD, new 
String(Base64.encodeBase64(passwd)));
-    job.getConfiguration().setBoolean(CREATETABLES, createTables);
-    if (defaultTable != null)
-      job.getConfiguration().set(DEFAULT_TABLE_NAME, defaultTable);
+  protected static byte[] getPassword(JobContext context) {
+    return OutputConfigurator.getPassword(CLASS, context.getConfiguration());
   }
   
   /**
@@ -220,13 +138,7 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setZooKeeperInstance(Job job, String instanceName, String 
zooKeepers) {
-    if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per 
job");
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
-    job.getConfiguration().set(ZOOKEEPERS, zooKeepers);
-    System.out.println("instance set: " + 
job.getConfiguration().get(INSTANCE_HAS_BEEN_SET));
+    OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), 
instanceName, zooKeepers);
   }
   
   /**
@@ -239,32 +151,21 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setMockInstance(Job job, String instanceName) {
-    job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    job.getConfiguration().setBoolean(MOCK, true);
-    job.getConfiguration().set(INSTANCE_NAME, instanceName);
+    OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), 
instanceName);
   }
   
   /**
-   * Sets the configuration for for the job's {@link BatchWriter} instances. 
If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
-   * used. Setting the configuration multiple times overwrites any previous 
configuration.
+   * Initializes an Accumulo {@link Instance} based on the configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param bwConfig
-   *          the configuration for the {@link BatchWriter}
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return an Accumulo instance
    * @since 1.5.0
+   * @see #setZooKeeperInstance(Job, String, String)
+   * @see #setMockInstance(Job, String)
    */
-  public static void setBatchWriterOptions(Job job, BatchWriterConfig 
bwConfig) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    String serialized;
-    try {
-      bwConfig.write(new DataOutputStream(baos));
-      serialized = new String(baos.toByteArray(), Charset.forName("UTF-8"));
-      baos.close();
-    } catch (IOException e) {
-      throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
-    }
-    job.getConfiguration().set(BATCH_WRITER_CONFIG, serialized);
+  protected static Instance getInstance(JobContext context) {
+    return OutputConfigurator.getInstance(CLASS, context.getConfiguration());
   }
   
   /**
@@ -277,134 +178,119 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    */
   public static void setLogLevel(Job job, Level level) {
-    ArgumentChecker.notNull(level);
-    job.getConfiguration().setInt(LOGLEVEL, level.toInt());
+    OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level);
   }
   
   /**
-   * Sets the directive to use simulation mode for this job. In simulation 
mode, no output is produced. This is useful for testing.
-   * 
-   * <p>
-   * By default, this feature is <b>disabled</b>.
+   * Gets the log level from this configuration.
    * 
-   * @param job
-   *          the Hadoop job instance to be configured
-   * @param enableFeature
-   *          the feature is enabled if true, disabled otherwise
+   * @param context
+   *          the Hadoop context for the configured job
+   * @return the log level
    * @since 1.5.0
+   * @see #setLogLevel(Job, Level)
    */
-  public static void setSimulationMode(Job job, boolean enableFeature) {
-    job.getConfiguration().setBoolean(SIMULATE, enableFeature);
+  protected static Level getLogLevel(JobContext context) {
+    return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration());
   }
   
   /**
-   * Gets the user name from the configuration.
+   * Sets the default table name to use if one emits a null in place of a 
table name for a given mutation. Table names can only be alpha-numeric and
+   * underscores.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the user name
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
    */
-  protected static String getUsername(JobContext context) {
-    return context.getConfiguration().get(USERNAME);
+  public static void setDefaultTableName(Job job, String tableName) {
+    OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), 
tableName);
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored 
in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded 
to
-   * provide a charset safe conversion to a string, and is not intended to be 
secure.
+   * Gets the default table name from the configuration.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the decoded user password
+   * @return the default table name
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
+   * @see #setDefaultTableName(Job, String)
    */
-  protected static byte[] getPassword(JobContext context) {
-    return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, 
"").getBytes());
+  protected static String getDefaultTableName(JobContext context) {
+    return OutputConfigurator.getDefaultTableName(CLASS, 
context.getConfiguration());
   }
   
   /**
-   * Determines whether tables are permitted to be created as needed.
+   * Sets the configuration for for the job's {@link BatchWriter} instances. 
If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
+   * used. Setting the configuration multiple times overwrites any previous 
configuration.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return true if the feature is disabled, false otherwise
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param bwConfig
+   *          the configuration for the {@link BatchWriter}
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
    */
-  protected static boolean canCreateTables(JobContext context) {
-    return context.getConfiguration().getBoolean(CREATETABLES, false);
+  public static void setBatchWriterOptions(Job job, BatchWriterConfig 
bwConfig) {
+    OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), 
bwConfig);
   }
   
   /**
-   * Gets the default table name from the configuration.
+   * Gets the {@link BatchWriterConfig} settings.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the default table name
+   * @return the configuration object
    * @since 1.5.0
-   * @see #setOutputInfo(Job, String, byte[], boolean, String)
+   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
    */
-  protected static String getDefaultTableName(JobContext context) {
-    return context.getConfiguration().get(DEFAULT_TABLE_NAME);
+  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) 
{
+    return OutputConfigurator.getBatchWriterOptions(CLASS, 
context.getConfiguration());
   }
   
   /**
-   * Initializes an Accumulo {@link Instance} based on the configuration.
+   * Sets the directive to create new tables, as necessary. Table names can 
only be alpha-numeric and underscores.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return an Accumulo instance
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #setZooKeeperInstance(Job, String, String)
-   * @see #setMockInstance(Job, String)
    */
-  protected static Instance getInstance(JobContext context) {
-    if (context.getConfiguration().getBoolean(MOCK, false))
-      return new MockInstance(context.getConfiguration().get(INSTANCE_NAME));
-    return new 
ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), 
context.getConfiguration().get(ZOOKEEPERS));
+  public static void setCreateTables(Job job, boolean enableFeature) {
+    OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), 
enableFeature);
   }
   
   /**
-   * Gets the {@link BatchWriterConfig} settings.
+   * Determines whether tables are permitted to be created as needed.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the configuration object
+   * @return true if the feature is disabled, false otherwise
    * @since 1.5.0
-   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
+   * @see #setCreateTables(Job, boolean)
    */
-  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) 
{
-    String serialized = context.getConfiguration().get(BATCH_WRITER_CONFIG);
-    BatchWriterConfig bwConfig = new BatchWriterConfig();
-    if (serialized == null || serialized.isEmpty()) {
-      return bwConfig;
-    } else {
-      try {
-        ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8")));
-        bwConfig.readFields(new DataInputStream(bais));
-        bais.close();
-        return bwConfig;
-      } catch (IOException e) {
-        throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
-      }
-    }
+  protected static Boolean canCreateTables(JobContext context) {
+    return OutputConfigurator.canCreateTables(CLASS, 
context.getConfiguration());
   }
   
   /**
-   * Gets the log level from this configuration.
+   * Sets the directive to use simulation mode for this job. In simulation 
mode, no output is produced. This is useful for testing.
    * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the log level
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
    * @since 1.5.0
-   * @see #setLogLevel(Job, Level)
    */
-  protected static Level getLogLevel(JobContext context) {
-    if (context.getConfiguration().get(LOGLEVEL) != null)
-      return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, 
Level.INFO.toInt()));
-    return null;
+  public static void setSimulationMode(Job job, boolean enableFeature) {
+    OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), 
enableFeature);
   }
   
   /**
@@ -416,8 +302,8 @@ public class AccumuloOutputFormat extend
    * @since 1.5.0
    * @see #setSimulationMode(Job, boolean)
    */
-  protected static boolean getSimulationMode(JobContext context) {
-    return context.getConfiguration().getBoolean(SIMULATE, false);
+  protected static Boolean getSimulationMode(JobContext context) {
+    return OutputConfigurator.getSimulationMode(CLASS, 
context.getConfiguration());
   }
   
   /**
@@ -582,11 +468,10 @@ public class AccumuloOutputFormat extend
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    if (!job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IOException("Output info has not been set.");
-    if (!job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IOException("Instance info has not been set.");
+    if (!isConnectorInfoSet(job))
+      throw new IOException("Connector info has not been set.");
     try {
+      // if the instance isn't configured, it will complain here
       Connector c = getInstance(job).getConnector(getUsername(job), 
getPassword(job));
       if (!c.securityOperations().authenticateUser(getUsername(job), 
getPassword(job)))
         throw new IOException("Unable to authenticate user");
@@ -616,38 +501,14 @@ public class AccumuloOutputFormat extend
   // 
----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
-  
-  /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String MAX_LATENCY = PREFIX + ".maxlatency";
-  
-  /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
-  
-  /**
-   * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], 
boolean, String)} instead.
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, 
byte[])}, {@link #setCreateTables(Job, boolean)}, and
+   *             {@link #setDefaultTableName(Job, String)} instead.
    */
   @Deprecated
   public static void setOutputInfo(Configuration conf, String user, byte[] 
passwd, boolean createTables, String defaultTable) {
-    if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Output info can only be set once per 
job");
-    conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(user, passwd);
-    conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
-    conf.setBoolean(CREATETABLES, createTables);
-    if (defaultTable != null)
-      conf.set(DEFAULT_TABLE_NAME, defaultTable);
+    OutputConfigurator.setConnectorInfo(CLASS, conf, user, passwd);
+    OutputConfigurator.setCreateTables(CLASS, conf, createTables);
+    OutputConfigurator.setDefaultTableName(CLASS, conf, defaultTable);
   }
   
   /**
@@ -655,13 +516,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setZooKeeperInstance(Configuration conf, String 
instanceName, String zooKeepers) {
-    if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
-      throw new IllegalStateException("Instance info can only be set once per 
job");
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    
-    ArgumentChecker.notNull(instanceName, zooKeepers);
-    conf.set(INSTANCE_NAME, instanceName);
-    conf.set(ZOOKEEPERS, zooKeepers);
+    OutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, 
zooKeepers);
   }
   
   /**
@@ -669,9 +524,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMockInstance(Configuration conf, String instanceName) {
-    conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
-    conf.setBoolean(MOCK, true);
-    conf.set(INSTANCE_NAME, instanceName);
+    OutputConfigurator.setMockInstance(CLASS, conf, instanceName);
   }
   
   /**
@@ -679,7 +532,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxMutationBufferSize(Configuration conf, long 
numberOfBytes) {
-    conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+    BatchWriterConfig bwConfig = 
OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxMemory(numberOfBytes);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -687,7 +542,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxLatency(Configuration conf, int 
numberOfMilliseconds) {
-    conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+    BatchWriterConfig bwConfig = 
OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxLatency(numberOfMilliseconds, TimeUnit.MILLISECONDS);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -695,7 +552,9 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setMaxWriteThreads(Configuration conf, int 
numberOfThreads) {
-    conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+    BatchWriterConfig bwConfig = 
OutputConfigurator.getBatchWriterOptions(CLASS, conf);
+    bwConfig.setMaxWriteThreads(numberOfThreads);
+    OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig);
   }
   
   /**
@@ -703,8 +562,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setLogLevel(Configuration conf, Level level) {
-    ArgumentChecker.notNull(level);
-    conf.setInt(LOGLEVEL, level.toInt());
+    OutputConfigurator.setLogLevel(CLASS, conf, level);
   }
   
   /**
@@ -712,7 +570,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   public static void setSimulationMode(Configuration conf) {
-    conf.setBoolean(SIMULATE, true);
+    OutputConfigurator.setSimulationMode(CLASS, conf, true);
   }
   
   /**
@@ -720,7 +578,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static String getUsername(Configuration conf) {
-    return conf.get(USERNAME);
+    return OutputConfigurator.getUsername(CLASS, conf);
   }
   
   /**
@@ -728,7 +586,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+    return OutputConfigurator.getPassword(CLASS, conf);
   }
   
   /**
@@ -736,7 +594,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static boolean canCreateTables(Configuration conf) {
-    return conf.getBoolean(CREATETABLES, false);
+    return OutputConfigurator.canCreateTables(CLASS, conf);
   }
   
   /**
@@ -744,7 +602,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static String getDefaultTableName(Configuration conf) {
-    return conf.get(DEFAULT_TABLE_NAME);
+    return OutputConfigurator.getDefaultTableName(CLASS, conf);
   }
   
   /**
@@ -752,9 +610,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static Instance getInstance(Configuration conf) {
-    if (conf.getBoolean(MOCK, false))
-      return new MockInstance(conf.get(INSTANCE_NAME));
-    return new ZooKeeperInstance(conf.get(INSTANCE_NAME), 
conf.get(ZOOKEEPERS));
+    return OutputConfigurator.getInstance(CLASS, conf);
   }
   
   /**
@@ -762,7 +618,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static long getMaxMutationBufferSize(Configuration conf) {
-    return conf.getLong(MAX_MUTATION_BUFFER_SIZE, new 
BatchWriterConfig().getMaxMemory());
+    return OutputConfigurator.getBatchWriterOptions(CLASS, 
conf).getMaxMemory();
   }
   
   /**
@@ -770,9 +626,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static int getMaxLatency(Configuration conf) {
-    Long maxLatency = new 
BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS);
-    Integer max = maxLatency >= Integer.MAX_VALUE ? Integer.MAX_VALUE : 
Integer.parseInt(Long.toString(maxLatency));
-    return conf.getInt(MAX_LATENCY, max);
+    return (int) OutputConfigurator.getBatchWriterOptions(CLASS, 
conf).getMaxLatency(TimeUnit.MILLISECONDS);
   }
   
   /**
@@ -780,7 +634,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static int getMaxWriteThreads(Configuration conf) {
-    return conf.getInt(NUM_WRITE_THREADS, new 
BatchWriterConfig().getMaxWriteThreads());
+    return OutputConfigurator.getBatchWriterOptions(CLASS, 
conf).getMaxWriteThreads();
   }
   
   /**
@@ -788,9 +642,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static Level getLogLevel(Configuration conf) {
-    if (conf.get(LOGLEVEL) != null)
-      return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
-    return null;
+    return OutputConfigurator.getLogLevel(CLASS, conf);
   }
   
   /**
@@ -798,7 +650,7 @@ public class AccumuloOutputFormat extend
    */
   @Deprecated
   protected static boolean getSimulationMode(Configuration conf) {
-    return conf.getBoolean(SIMULATE, false);
+    return OutputConfigurator.getSimulationMode(CLASS, conf);
   }
   
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
 Tue Jan 22 18:07:17 2013
@@ -22,10 +22,12 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.PeekingIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -36,8 +38,10 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link 
AccumuloRowInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, 
byte[], String, org.apache.accumulo.core.security.Authorizations)}
- * <li>{@link 
AccumuloRowInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, 
String, String)}
+ * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
+ * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, 
Authorizations)}
+ * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, 
String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
  * Other static methods are optional.


Reply via email to