Added: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java?rev=1437073&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java
 (added)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java
 Tue Jan 22 18:07:17 2013
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * @since 1.5.0
+ */
+public class InputConfigurator extends ConfiguratorBase {
+  
+  /**
+   * Configuration keys for {@link Scanner}.
+   * 
+   * @since 1.5.0
+   */
+  public static enum ScanOpts {
+    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS
+  }
+  
+  /**
+   * Configuration keys for various features.
+   * 
+   * @since 1.5.0
+   */
+  public static enum Features {
+    AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
+  }
+  
+  /**
+   * Sets the name of the input table, over which this job will scan.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
+   * @since 1.5.0
+   */
+  public static void setInputTableName(Class<?> implementingClass, 
Configuration conf, String tableName) {
+    ArgumentChecker.notNull(tableName);
+    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), tableName);
+  }
+  
+  /**
+   * Gets the table name from the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the table name
+   * @since 1.5.0
+   * @see #setInputTableName(Class, Configuration, String)
+   */
+  public static String getInputTableName(Class<?> implementingClass, 
Configuration conf) {
+    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
+  }
+  
+  /**
+   * Sets the {@link Authorizations} used to scan. Must be a subset of the 
user's authorization. Defaults to the empty set.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param auths
+   *          the user's authorizations
+   * @since 1.5.0
+   */
+  public static void setScanAuthorizations(Class<?> implementingClass, 
Configuration conf, Authorizations auths) {
+    if (auths != null && !auths.isEmpty())
+      conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), 
auths.serialize());
+  }
+  
+  /**
+   * Gets the authorizations to set for the scans from the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the Accumulo scan authorizations
+   * @since 1.5.0
+   * @see #setScanAuthorizations(Class, Configuration, Authorizations)
+   */
+  public static Authorizations getScanAuthorizations(Class<?> 
implementingClass, Configuration conf) {
+    String authString = conf.get(enumToConfKey(implementingClass, 
ScanOpts.AUTHORIZATIONS));
+    return authString == null ? Constants.NO_AUTHS : new 
Authorizations(authString.getBytes());
+  }
+  
+  /**
+   * Sets the input ranges to scan for this job. If not set, the entire table 
will be scanned.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param ranges
+   *          the ranges that will be mapped over
+   * @since 1.5.0
+   */
+  public static void setRanges(Class<?> implementingClass, Configuration conf, 
Collection<Range> ranges) {
+    ArgumentChecker.notNull(ranges);
+    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
+    try {
+      for (Range r : ranges) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        r.write(new DataOutputStream(baos));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()), 
Charset.forName("UTF-8")));
+      }
+    } catch (IOException ex) {
+      throw new IllegalArgumentException("Unable to encode ranges to Base64", 
ex);
+    }
+    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES), 
rangeStrings.toArray(new String[0]));
+  }
+  
+  /**
+   * Gets the ranges to scan over from a job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the ranges
+   * @throws IOException
+   *           if the ranges have been encoded improperly
+   * @since 1.5.0
+   * @see #setRanges(Class, Configuration, Collection)
+   */
+  public static List<Range> getRanges(Class<?> implementingClass, 
Configuration conf) throws IOException {
+    ArrayList<Range> ranges = new ArrayList<Range>();
+    for (String rangeString : 
conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.RANGES))) {
+      ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes(Charset.forName("UTF-8"))));
+      Range range = new Range();
+      range.readFields(new DataInputStream(bais));
+      ranges.add(range);
+    }
+    return ranges;
+  }
+  
+  /**
+   * Restricts the columns that will be mapped over for this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param columnFamilyColumnQualifierPairs
+   *          a pair of {@link Text} objects corresponding to column family 
and column qualifier. If the column qualifier is null, the entire column family 
is
+   *          selected. An empty set is the default and is equivalent to 
scanning the all columns.
+   * @since 1.5.0
+   */
+  public static void fetchColumns(Class<?> implementingClass, Configuration 
conf, Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
+    ArgumentChecker.notNull(columnFamilyColumnQualifierPairs);
+    ArrayList<String> columnStrings = new 
ArrayList<String>(columnFamilyColumnQualifierPairs.size());
+    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+      if (column.getFirst() == null)
+        throw new IllegalArgumentException("Column family can not be null");
+      
+      String col = new 
String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())), 
Charset.forName("UTF-8"));
+      if (column.getSecond() != null)
+        col += ":" + new 
String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())), 
Charset.forName("UTF-8"));
+      columnStrings.add(col);
+    }
+    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), 
columnStrings.toArray(new String[0]));
+  }
+  
+  /**
+   * Gets the columns to be mapped over from this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return a set of columns
+   * @since 1.5.0
+   * @see #fetchColumns(Class, Configuration, Collection)
+   */
+  public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> 
implementingClass, Configuration conf) {
+    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
+    for (String col : 
conf.getStringCollection(enumToConfKey(implementingClass, ScanOpts.COLUMNS))) {
+      int idx = col.indexOf(":");
+      Text cf = new Text(idx < 0 ? 
Base64.decodeBase64(col.getBytes(Charset.forName("UTF-8"))) : 
Base64.decodeBase64(col.substring(0, idx).getBytes(
+          Charset.forName("UTF-8"))));
+      Text cq = idx < 0 ? null : new 
Text(Base64.decodeBase64(col.substring(idx + 1).getBytes()));
+      columns.add(new Pair<Text,Text>(cf, cq));
+    }
+    return columns;
+  }
+  
+  /**
+   * Encode an iterator on the input for this job.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param cfg
+   *          the configuration of the iterator
+   * @since 1.5.0
+   */
+  public static void addIterator(Class<?> implementingClass, Configuration 
conf, IteratorSetting cfg) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    String newIter;
+    try {
+      cfg.write(new DataOutputStream(baos));
+      newIter = new String(Base64.encodeBase64(baos.toByteArray()), 
Charset.forName("UTF-8"));
+      baos.close();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("unable to serialize 
IteratorSetting");
+    }
+    
+    String iterators = conf.get(enumToConfKey(implementingClass, 
ScanOpts.ITERATORS));
+    // No iterators specified yet, create a new string
+    if (iterators == null || iterators.isEmpty()) {
+      iterators = newIter;
+    } else {
+      // append the next iterator & reset
+      iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
+    }
+    // Store the iterators w/ the job
+    conf.set(enumToConfKey(implementingClass, ScanOpts.ITERATORS), iterators);
+  }
+  
+  /**
+   * Gets a list of the iterator settings (for iterators to apply to a 
scanner) from this configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return a list of iterators
+   * @since 1.5.0
+   * @see #addIterator(Class, Configuration, IteratorSetting)
+   */
+  public static List<IteratorSetting> getIterators(Class<?> implementingClass, 
Configuration conf) {
+    String iterators = conf.get(enumToConfKey(implementingClass, 
ScanOpts.ITERATORS));
+    
+    // If no iterators are present, return an empty list
+    if (iterators == null || iterators.isEmpty())
+      return new ArrayList<IteratorSetting>();
+    
+    // Compose the set of iterators encoded in the job configuration
+    StringTokenizer tokens = new StringTokenizer(iterators, 
StringUtils.COMMA_STR);
+    List<IteratorSetting> list = new ArrayList<IteratorSetting>();
+    try {
+      while (tokens.hasMoreTokens()) {
+        String itstring = tokens.nextToken();
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.decodeBase64(itstring.getBytes()));
+        list.add(new IteratorSetting(new DataInputStream(bais)));
+        bais.close();
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("couldn't decode iterator settings");
+    }
+    return list;
+  }
+  
+  /**
+   * Controls the automatic adjustment of ranges for this job. This feature 
merges overlapping ranges, then splits them to align with tablet boundaries.
+   * Disabling this feature will cause exactly one Map task to be created for 
each specified range. The default setting is enabled. *
+   * 
+   * <p>
+   * By default, this feature is <b>enabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @see #setRanges(Class, Configuration, Collection)
+   * @since 1.5.0
+   */
+  public static void setAutoAdjustRanges(Class<?> implementingClass, 
Configuration conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, 
Features.AUTO_ADJUST_RANGES), enableFeature);
+  }
+  
+  /**
+   * Determines whether a configuration has auto-adjust ranges enabled.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return false if the feature is disabled, true otherwise
+   * @since 1.5.0
+   * @see #setAutoAdjustRanges(Class, Configuration, boolean)
+   */
+  public static Boolean getAutoAdjustRanges(Class<?> implementingClass, 
Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.AUTO_ADJUST_RANGES), true);
+  }
+  
+  /**
+   * Controls the use of the {@link IsolatedScanner} in this job.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setScanIsolation(Class<?> implementingClass, 
Configuration conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_ISOLATION), 
enableFeature);
+  }
+  
+  /**
+   * Determines whether a configuration has isolation enabled.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setScanIsolation(Class, Configuration, boolean)
+   */
+  public static Boolean isIsolated(Class<?> implementingClass, Configuration 
conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.SCAN_ISOLATION), false);
+  }
+  
+  /**
+   * Controls the use of the {@link ClientSideIteratorScanner} in this job. 
Enabling this feature will cause the iterator stack to be constructed within 
the Map
+   * task, rather than within the Accumulo TServer. To use this feature, all 
classes needed for those iterators must be available on the classpath for the 
task.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setLocalIterators(Class<?> implementingClass, 
Configuration conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, 
Features.USE_LOCAL_ITERATORS), enableFeature);
+  }
+  
+  /**
+   * Determines whether a configuration uses local iterators.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setLocalIterators(Class, Configuration, boolean)
+   */
+  public static Boolean usesLocalIterators(Class<?> implementingClass, 
Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.USE_LOCAL_ITERATORS), false);
+  }
+  
+  /**
+   * <p>
+   * Enable reading offline tables. By default, this feature is disabled and 
only online tables are scanned. This will make the map reduce job directly read 
the
+   * table's files. If the table is not offline, then the job will fail. If 
the table comes online during the map reduce job, it is likely that the job will
+   * fail.
+   * 
+   * <p>
+   * To use this option, the map reduce user will need access to read the 
Accumulo directory in HDFS.
+   * 
+   * <p>
+   * Reading the offline table will create the scan time iterator stack in the 
map process. So any iterators that are configured for the table will need to be
+   * on the mapper's classpath. The accumulo-site.xml may need to be on the 
mapper's classpath if HDFS or the Accumulo directory in HDFS are non-standard.
+   * 
+   * <p>
+   * One way to use this feature is to clone a table, take the clone offline, 
and use the clone as the input table for a map reduce job. If you plan to map
+   * reduce over the data many times, it may be better to the compact the 
table, clone it, take it offline, and use the clone for all map reduce jobs. The
+   * reason to do this is that compaction will reduce each tablet in the table 
to one file, and it is faster to read from one file.
+   * 
+   * <p>
+   * There are two possible advantages to reading a tables file directly out 
of HDFS. First, you may see better read performance. Second, it will support
+   * speculative execution better. When reading an online table speculative 
execution can put more load on an already slow tablet server.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setOfflineTableScan(Class<?> implementingClass, 
Configuration conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), 
enableFeature);
+  }
+  
+  /**
+   * Determines whether a configuration has the offline table scan feature 
enabled.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setOfflineTableScan(Class, Configuration, boolean)
+   */
+  public static Boolean isOfflineScan(Class<?> implementingClass, 
Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.SCAN_OFFLINE), false);
+  }
+  
+  /**
+   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return an Accumulo tablet locator
+   * @throws TableNotFoundException
+   *           if the table name set on the configuration doesn't exist
+   * @since 1.5.0
+   */
+  public static TabletLocator getTabletLocator(Class<?> implementingClass, 
Configuration conf) throws TableNotFoundException {
+    String instanceType = conf.get(enumToConfKey(implementingClass, 
InstanceOpts.TYPE));
+    if ("MockInstance".equals(instanceType))
+      return new MockTabletLocator();
+    Instance instance = getInstance(implementingClass, conf);
+    String username = getUsername(implementingClass, conf);
+    byte[] password = getPassword(implementingClass, conf);
+    String tableName = getInputTableName(implementingClass, conf);
+    return TabletLocator.getInstance(instance, new AuthInfo(username, 
ByteBuffer.wrap(password), instance.getInstanceID()),
+        new Text(Tables.getTableId(instance, tableName)));
+  }
+  
+  // InputFormat doesn't have the equivalent of OutputFormat's 
checkOutputSpecs(JobContext job)
+  /**
+   * Check whether a configuration is fully configured to be used with an 
Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @throws IOException
+   *           if the context is improperly configured
+   * @since 1.5.0
+   */
+  public static void validateOptions(Class<?> implementingClass, Configuration 
conf) throws IOException {
+    if (!isConnectorInfoSet(implementingClass, conf))
+      throw new IOException("Input info has not been set.");
+    String instanceKey = conf.get(enumToConfKey(implementingClass, 
InstanceOpts.TYPE));
+    if (!"MockInstance".equals(instanceKey) && 
!"ZooKeeperInstance".equals(instanceKey))
+      throw new IOException("Instance info has not been set.");
+    // validate that we can connect as configured
+    try {
+      Connector c = getInstance(implementingClass, 
conf).getConnector(getUsername(implementingClass, conf), 
getPassword(implementingClass, conf));
+      if 
(!c.securityOperations().authenticateUser(getUsername(implementingClass, conf), 
getPassword(implementingClass, conf)))
+        throw new IOException("Unable to authenticate user");
+      if 
(!c.securityOperations().hasTablePermission(getUsername(implementingClass, 
conf), getInputTableName(implementingClass, conf), TablePermission.READ))
+        throw new IOException("Unable to access table");
+      
+      if (!conf.getBoolean(enumToConfKey(implementingClass, 
Features.USE_LOCAL_ITERATORS), false)) {
+        // validate that any scan-time iterators can be loaded by the the 
tablet servers
+        for (IteratorSetting iter : getIterators(implementingClass, conf)) {
+          if (!c.instanceOperations().testClassLoad(iter.getIteratorClass(), 
SortedKeyValueIterator.class.getName()))
+            throw new AccumuloException("Servers are unable to load " + 
iter.getIteratorClass() + " as a " + SortedKeyValueIterator.class.getName());
+        }
+      }
+      
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+  
+}

Added: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java?rev=1437073&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java
 (added)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java
 Tue Jan 22 18:07:17 2013
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * @since 1.5.0
+ */
+public class OutputConfigurator extends ConfiguratorBase {
+  
+  /**
+   * Configuration keys for {@link BatchWriter}.
+   * 
+   * @since 1.5.0
+   */
+  public static enum WriteOpts {
+    DEFAULT_TABLE_NAME, BATCH_WRITER_CONFIG
+  }
+  
+  /**
+   * Configuration keys for various features.
+   * 
+   * @since 1.5.0
+   */
+  public static enum Features {
+    CAN_CREATE_TABLES, SIMULATION_MODE
+  }
+  
+  /**
+   * Sets the default table name to use if one emits a null in place of a 
table name for a given mutation. Table names can only be alpha-numeric and
+   * underscores.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param tableName
+   *          the table to use when the tablename is null in the write call
+   * @since 1.5.0
+   */
+  public static void setDefaultTableName(Class<?> implementingClass, 
Configuration conf, String tableName) {
+    if (tableName != null)
+      conf.set(enumToConfKey(implementingClass, WriteOpts.DEFAULT_TABLE_NAME), 
tableName);
+  }
+  
+  /**
+   * Gets the default table name from the configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the default table name
+   * @since 1.5.0
+   * @see #setDefaultTableName(Class, Configuration, String)
+   */
+  public static String getDefaultTableName(Class<?> implementingClass, 
Configuration conf) {
+    return conf.get(enumToConfKey(implementingClass, 
WriteOpts.DEFAULT_TABLE_NAME));
+  }
+  
+  /**
+   * Sets the configuration for for the job's {@link BatchWriter} instances. 
If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
+   * used. Setting the configuration multiple times overwrites any previous 
configuration.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param bwConfig
+   *          the configuration for the {@link BatchWriter}
+   * @since 1.5.0
+   */
+  public static void setBatchWriterOptions(Class<?> implementingClass, 
Configuration conf, BatchWriterConfig bwConfig) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    String serialized;
+    try {
+      bwConfig.write(new DataOutputStream(baos));
+      serialized = new String(baos.toByteArray(), Charset.forName("UTF-8"));
+      baos.close();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
+    }
+    conf.set(enumToConfKey(implementingClass, WriteOpts.BATCH_WRITER_CONFIG), 
serialized);
+  }
+  
+  /**
+   * Gets the {@link BatchWriterConfig} settings.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the configuration object
+   * @since 1.5.0
+   * @see #setBatchWriterOptions(Class, Configuration, BatchWriterConfig)
+   */
+  public static BatchWriterConfig getBatchWriterOptions(Class<?> 
implementingClass, Configuration conf) {
+    String serialized = conf.get(enumToConfKey(implementingClass, 
WriteOpts.BATCH_WRITER_CONFIG));
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    if (serialized == null || serialized.isEmpty()) {
+      return bwConfig;
+    } else {
+      try {
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8")));
+        bwConfig.readFields(new DataInputStream(bais));
+        bais.close();
+        return bwConfig;
+      } catch (IOException e) {
+        throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
+      }
+    }
+  }
+  
+  /**
+   * Sets the directive to create new tables, as necessary. Table names can 
only be alpha-numeric and underscores.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setCreateTables(Class<?> implementingClass, Configuration 
conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, 
Features.CAN_CREATE_TABLES), enableFeature);
+  }
+  
+  /**
+   * Determines whether tables are permitted to be created as needed.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is disabled, false otherwise
+   * @since 1.5.0
+   * @see #setCreateTables(Class, Configuration, boolean)
+   */
+  public static Boolean canCreateTables(Class<?> implementingClass, 
Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.CAN_CREATE_TABLES), false);
+  }
+  
+  /**
+   * Sets the directive to use simulation mode for this job. In simulation 
mode, no output is produced. This is useful for testing.
+   * 
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.5.0
+   */
+  public static void setSimulationMode(Class<?> implementingClass, 
Configuration conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, 
Features.SIMULATION_MODE), enableFeature);
+  }
+  
+  /**
+   * Determines whether this feature is enabled.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.5.0
+   * @see #setSimulationMode(Class, Configuration, boolean)
+   */
+  public static Boolean getSimulationMode(Class<?> implementingClass, 
Configuration conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.SIMULATION_MODE), false);
+  }
+  
+}

Added: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java?rev=1437073&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java
 (added)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java
 Tue Jan 22 18:07:17 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package exists to store common helpers for configuring MapReduce jobs 
in a single location. It contains static configurator methods, stored in classes
+ * separate from the things they configure (typically, {@link 
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat}/
+ * {@link org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat} and 
related classes in compatible frameworks), rather than storing them in those
+ * InputFormats/OutputFormats, so as not to clutter their API with methods 
that don't match the conventions for that framework. These classes may be 
useful to
+ * input/output plugins for other frameworks, so they can reuse the same 
configuration options and/or serialize them into a
+ * {@link org.apache.hadoop.conf.Configuration} instance in a standard way.
+ * 
+ * <p>
+ * It is not expected these will change much (except when new features are 
added), but end users should not use these classes. They should use the static
+ * configurators on the {@link org.apache.hadoop.mapreduce.InputFormat} or 
{@link org.apache.hadoop.mapreduce.OutputFormat} they are configuring, which in 
turn
+ * may use these classes to implement their own static configurators. Once 
again, these classes are intended for internal use, but may be useful to 
developers
+ * of plugins for other frameworks that read/write to Accumulo.
+ * 
+ * @since 1.5.0
+ */
+package org.apache.accumulo.core.client.mapreduce.util;
+

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
 Tue Jan 22 18:07:17 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -135,10 +136,9 @@ public class AccumuloFileOutputFormatTes
       job.setJarByClass(this.getClass());
       
       job.setInputFormatClass(AccumuloInputFormat.class);
-      Authorizations authorizations;
-      authorizations = Constants.NO_AUTHS;
       
-      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table, 
authorizations);
+      AccumuloInputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setMockInstance(job, "testinstance");
       AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3]));
       

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
 Tue Jan 22 18:07:17 2013
@@ -23,9 +23,9 @@ import static org.junit.Assert.assertTru
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.List;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -64,7 +64,7 @@ public class AccumuloInputFormatTest {
     Configuration conf = job.getConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     is.write(new DataOutputStream(baos));
-    String iterators = conf.get("AccumuloInputFormat.iterators");
+    String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators");
     assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), 
iterators);
   }
   
@@ -236,7 +236,8 @@ public class AccumuloInputFormatTest {
       
       job.setInputFormatClass(AccumuloInputFormat.class);
       
-      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table, 
Constants.NO_AUTHS);
+      AccumuloInputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setMockInstance(job, "testmapinstance");
       
       job.setMapperClass(TestMapper.class);

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 Tue Jan 22 18:07:17 2013
@@ -23,11 +23,11 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -97,7 +97,8 @@ public class AccumuloOutputFormatTest {
       
       job.setInputFormatClass(AccumuloInputFormat.class);
       
-      AccumuloInputFormat.setInputInfo(job, user, pass.getBytes(), table1, 
Constants.NO_AUTHS);
+      AccumuloInputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      AccumuloInputFormat.setInputTableName(job, table1);
       AccumuloInputFormat.setMockInstance(job, "testmrinstance");
       
       job.setMapperClass(TestMapper.class);
@@ -107,7 +108,9 @@ public class AccumuloOutputFormatTest {
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Mutation.class);
       
-      AccumuloOutputFormat.setOutputInfo(job, user, pass.getBytes(), false, 
table2);
+      AccumuloOutputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      AccumuloOutputFormat.setCreateTables(job, false);
+      AccumuloOutputFormat.setDefaultTableName(job, table2);
       AccumuloOutputFormat.setMockInstance(job, "testmrinstance");
       
       job.setNumReduceTasks(0);

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
 Tue Jan 22 18:07:17 2013
@@ -21,12 +21,12 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -153,7 +153,8 @@ public class AccumuloRowInputFormatTest 
       
       job.setInputFormatClass(AccumuloRowInputFormat.class);
       
-      AccumuloRowInputFormat.setInputInfo(job, user, pass.getBytes(), table, 
Constants.NO_AUTHS);
+      AccumuloInputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      AccumuloInputFormat.setInputTableName(job, table);
       AccumuloRowInputFormat.setMockInstance(job, "instance1");
       
       job.setMapperClass(TestMapper.class);

Modified: 
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
 (original)
+++ 
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
 Tue Jan 22 18:07:17 2013
@@ -18,6 +18,7 @@ package org.apache.accumulo.examples.sim
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
@@ -197,7 +198,9 @@ public class ChunkInputFormatTest extend
       
       job.setInputFormatClass(ChunkInputFormat.class);
       
-      ChunkInputFormat.setInputInfo(job, user, pass.getBytes(), table, AUTHS);
+      ChunkInputFormat.setConnectorInfo(job, user, 
pass.getBytes(Charset.forName("UTF-8")));
+      ChunkInputFormat.setInputTableName(job, table);
+      ChunkInputFormat.setScanAuthorizations(job, AUTHS);
       ChunkInputFormat.setMockInstance(job, instance);
       
       @SuppressWarnings("unchecked")

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
 Tue Jan 22 18:07:17 2013
@@ -145,7 +145,8 @@ public class IndexMeta extends Configure
     
     job.setOutputFormatClass(AccumuloOutputFormat.class);
     AccumuloOutputFormat.setZooKeeperInstance(job, opts.instance, 
opts.zookeepers);
-    AccumuloOutputFormat.setOutputInfo(job, opts.user, opts.getPassword(), 
false, null);
+    AccumuloOutputFormat.setConnectorInfo(job, opts.user, opts.getPassword());
+    AccumuloOutputFormat.setCreateTables(job, false);
     
     job.setMapperClass(IndexMapper.class);
     

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
 Tue Jan 22 18:07:17 2013
@@ -17,13 +17,14 @@
 package org.apache.accumulo.server.test.randomwalk.multitable;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -54,7 +55,9 @@ public class CopyTool extends Configured
     }
     
     job.setInputFormatClass(AccumuloInputFormat.class);
-    AccumuloInputFormat.setInputInfo(job, args[0], args[1].getBytes(), 
args[2], new Authorizations());
+    AccumuloInputFormat.setConnectorInfo(job, args[0], 
args[1].getBytes(Charset.forName("UTF-8")));
+    AccumuloInputFormat.setInputTableName(job, args[2]);
+    AccumuloInputFormat.setScanAuthorizations(job, Constants.NO_AUTHS);
     AccumuloInputFormat.setZooKeeperInstance(job, args[3], args[4]);
     
     job.setMapperClass(SeqMapClass.class);
@@ -64,7 +67,9 @@ public class CopyTool extends Configured
     job.setNumReduceTasks(0);
     
     job.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setOutputInfo(job, args[0], args[1].getBytes(), true, 
args[5]);
+    AccumuloOutputFormat.setConnectorInfo(job, args[0], 
args[1].getBytes(Charset.forName("UTF-8")));
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, args[5]);
     AccumuloOutputFormat.setZooKeeperInstance(job, args[3], args[4]);
     
     job.waitForCompletion(true);

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java?rev=1437073&r1=1437072&r2=1437073&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java
 Tue Jan 22 18:07:17 2013
@@ -17,6 +17,7 @@
 package org.apache.accumulo.server.test.randomwalk.sequential;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Iterator;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -24,7 +25,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -86,7 +86,8 @@ public class MapRedVerifyTool extends Co
     }
     
     job.setInputFormatClass(AccumuloInputFormat.class);
-    AccumuloInputFormat.setInputInfo(job, args[0], args[1].getBytes(), 
args[2], new Authorizations());
+    AccumuloInputFormat.setConnectorInfo(job, args[0], 
args[1].getBytes(Charset.forName("UTF-8")));
+    AccumuloInputFormat.setInputTableName(job, args[2]);
     AccumuloInputFormat.setZooKeeperInstance(job, args[3], args[4]);
     
     job.setMapperClass(SeqMapClass.class);
@@ -97,7 +98,9 @@ public class MapRedVerifyTool extends Co
     job.setNumReduceTasks(1);
     
     job.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setOutputInfo(job, args[0], args[1].getBytes(), true, 
args[5]);
+    AccumuloOutputFormat.setConnectorInfo(job, args[0], 
args[1].getBytes(Charset.forName("UTF-8")));
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, args[5]);
     AccumuloOutputFormat.setZooKeeperInstance(job, args[3], args[4]);
     
     job.waitForCompletion(true);


Reply via email to