This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit db339e0bd2b519d5013eb0f85906e661a2cdafd4
Merge: c03e0fe 375cfbc
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Fri Oct 4 20:14:50 2019 -0400

    Merge branch '1.9' into 2.0

 .../mapreduce/lib/InputConfigurator.java           | 20 ++++-
 .../test/mapreduce/AccumuloInputFormatIT.java      | 86 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 2 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
index 62be56f,0000000..dd9568a
mode 100644,000000..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/InputConfigurator.java
@@@ -1,948 -1,0 +1,964 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl.mapreduce.lib;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static java.util.Objects.requireNonNull;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Base64;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.StringTokenizer;
 +
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.clientImpl.Tables;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.security.NamespacePermission;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.MapWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.util.StringUtils;
 +
 +import com.google.common.collect.Maps;
 +
 +@SuppressWarnings("deprecation")
 +public class InputConfigurator extends ConfiguratorBase {
 +
 +  /**
 +   * Configuration keys for {@link Scanner}.
 +   *
 +   * @since 1.6.0
 +   */
 +  public static enum ScanOpts {
 +    TABLE_NAME,
 +    AUTHORIZATIONS,
 +    RANGES,
 +    COLUMNS,
 +    ITERATORS,
 +    TABLE_CONFIGS,
 +    SAMPLER_CONFIG,
 +    CLASSLOADER_CONTEXT
 +  }
 +
 +  /**
 +   * Configuration keys for various features.
 +   *
 +   * @since 1.6.0
 +   */
 +  public static enum Features {
 +    AUTO_ADJUST_RANGES,
 +    SCAN_ISOLATION,
 +    USE_LOCAL_ITERATORS,
 +    SCAN_OFFLINE,
 +    BATCH_SCANNER,
 +    BATCH_SCANNER_THREADS
 +  }
 +
 +  /**
 +   * Sets the name of the context classloader to use for scans
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param context
 +   *          the name of the context classloader
 +   * @since 1.8.0
 +   */
 +  public static void setClassLoaderContext(Class<?> implementingClass, 
Configuration conf,
 +      String context) {
 +    checkArgument(context != null, "context is null");
 +    conf.set(enumToConfKey(implementingClass, ScanOpts.CLASSLOADER_CONTEXT), 
context);
 +  }
 +
 +  /**
 +   * Gets the name of the context classloader to use for scans
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the classloader context name
 +   * @since 1.8.0
 +   */
 +  public static String getClassLoaderContext(Class<?> implementingClass, 
Configuration conf) {
 +    return conf.get(enumToConfKey(implementingClass, 
ScanOpts.CLASSLOADER_CONTEXT), null);
 +  }
 +
 +  /**
 +   * Sets the name of the input table, over which this job will scan.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table to use when the tablename is null in the write call
 +   * @since 1.6.0
 +   */
 +  public static void setInputTableName(Class<?> implementingClass, 
Configuration conf,
 +      String tableName) {
 +    checkArgument(tableName != null, "tableName is null");
 +    conf.set(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME), 
tableName);
 +  }
 +
 +  /**
 +   * Sets the name of the input table, over which this job will scan.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @since 1.6.0
 +   */
 +  public static String getInputTableName(Class<?> implementingClass, 
Configuration conf) {
 +    return conf.get(enumToConfKey(implementingClass, ScanOpts.TABLE_NAME));
 +  }
 +
 +  /**
 +   * Sets the {@link Authorizations} used to scan. Must be a subset of the 
user's authorization.
 +   * Defaults to the empty set.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param auths
 +   *          the user's authorizations
 +   * @since 1.6.0
 +   */
 +  public static void setScanAuthorizations(Class<?> implementingClass, 
Configuration conf,
 +      Authorizations auths) {
 +    if (auths != null && !auths.isEmpty())
 +      conf.set(enumToConfKey(implementingClass, ScanOpts.AUTHORIZATIONS), 
auths.serialize());
 +  }
 +
 +  /**
 +   * Gets the authorizations to set for the scans from the configuration.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the Accumulo scan authorizations
 +   * @since 1.6.0
 +   * @see #setScanAuthorizations(Class, Configuration, Authorizations)
 +   */
 +  public static Authorizations getScanAuthorizations(Class<?> 
implementingClass,
 +      Configuration conf) {
 +    String authString = conf.get(enumToConfKey(implementingClass, 
ScanOpts.AUTHORIZATIONS));
 +    return authString == null ? Authorizations.EMPTY
 +        : new Authorizations(authString.getBytes(UTF_8));
 +  }
 +
 +  /**
 +   * Sets the input ranges to scan on all input tables for this job. If not 
set, the entire table
 +   * will be scanned.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param ranges
 +   *          the ranges that will be mapped over
 +   * @throws IllegalArgumentException
 +   *           if the ranges cannot be encoded into base 64
 +   * @since 1.6.0
 +   */
 +  public static void setRanges(Class<?> implementingClass, Configuration conf,
 +      Collection<Range> ranges) {
 +    checkArgument(ranges != null, "ranges is null");
 +
 +    ArrayList<String> rangeStrings = new ArrayList<>(ranges.size());
 +    try {
 +      for (Range r : ranges) {
 +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        r.write(new DataOutputStream(baos));
 +        
rangeStrings.add(Base64.getEncoder().encodeToString(baos.toByteArray()));
 +      }
 +      conf.setStrings(enumToConfKey(implementingClass, ScanOpts.RANGES),
 +          rangeStrings.toArray(new String[0]));
 +    } catch (IOException ex) {
 +      throw new IllegalArgumentException("Unable to encode ranges to Base64", 
ex);
 +    }
 +  }
 +
 +  /**
 +   * Gets the ranges to scan over from a job.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return the ranges
 +   * @throws IOException
 +   *           if the ranges have been encoded improperly
 +   * @since 1.6.0
 +   * @see #setRanges(Class, Configuration, Collection)
 +   */
 +  public static List<Range> getRanges(Class<?> implementingClass, 
Configuration conf)
 +      throws IOException {
 +
 +    Collection<String> encodedRanges =
 +        conf.getStringCollection(enumToConfKey(implementingClass, 
ScanOpts.RANGES));
 +    List<Range> ranges = new ArrayList<>();
 +    for (String rangeString : encodedRanges) {
 +      ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.getDecoder().decode(rangeString));
 +      Range range = new Range();
 +      range.readFields(new DataInputStream(bais));
 +      ranges.add(range);
 +    }
 +    return ranges;
 +  }
 +
 +  /**
 +   * Gets a list of the iterator settings (for iterators to apply to a 
scanner) from this
 +   * configuration.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a list of iterators
 +   * @since 1.6.0
 +   * @see #addIterator(Class, Configuration, IteratorSetting)
 +   */
 +  public static List<IteratorSetting> getIterators(Class<?> 
implementingClass, Configuration conf) {
 +    String iterators = conf.get(enumToConfKey(implementingClass, 
ScanOpts.ITERATORS));
 +
 +    // If no iterators are present, return an empty list
 +    if (iterators == null || iterators.isEmpty())
 +      return new ArrayList<>();
 +
 +    // Compose the set of iterators encoded in the job configuration
 +    StringTokenizer tokens = new StringTokenizer(iterators, 
StringUtils.COMMA_STR);
 +    List<IteratorSetting> list = new ArrayList<>();
 +    try {
 +      while (tokens.hasMoreTokens()) {
 +        String itstring = tokens.nextToken();
 +        ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.getDecoder().decode(itstring));
 +        list.add(new IteratorSetting(new DataInputStream(bais)));
 +        bais.close();
 +      }
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("couldn't decode iterator settings");
 +    }
 +    return list;
 +  }
 +
 +  /**
 +   * Restricts the columns that will be mapped over for the single input 
table on this job.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param columnFamilyColumnQualifierPairs
 +   *          a pair of {@link Text} objects corresponding to column family 
and column qualifier. If
 +   *          the column qualifier is null, the entire column family is 
selected. An empty set is
 +   *          the default and is equivalent to scanning the all columns.
 +   * @throws IllegalArgumentException
 +   *           if the column family is null
 +   * @since 1.6.0
 +   */
 +  public static void fetchColumns(Class<?> implementingClass, Configuration 
conf,
 +      Collection<Pair<Text,Text>> columnFamilyColumnQualifierPairs) {
 +    checkArgument(columnFamilyColumnQualifierPairs != null,
 +        "columnFamilyColumnQualifierPairs is null");
 +    String[] columnStrings = 
serializeColumns(columnFamilyColumnQualifierPairs);
 +    conf.setStrings(enumToConfKey(implementingClass, ScanOpts.COLUMNS), 
columnStrings);
 +  }
 +
 +  public static String[]
 +      serializeColumns(Collection<Pair<Text,Text>> 
columnFamilyColumnQualifierPairs) {
 +    checkArgument(columnFamilyColumnQualifierPairs != null,
 +        "columnFamilyColumnQualifierPairs is null");
 +    ArrayList<String> columnStrings = new 
ArrayList<>(columnFamilyColumnQualifierPairs.size());
 +    for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
 +
 +      if (column.getFirst() == null)
 +        throw new IllegalArgumentException("Column family can not be null");
 +
 +      String col = 
Base64.getEncoder().encodeToString(TextUtil.getBytes(column.getFirst()));
 +      if (column.getSecond() != null)
 +        col += ":" + 
Base64.getEncoder().encodeToString(TextUtil.getBytes(column.getSecond()));
 +      columnStrings.add(col);
 +    }
 +
 +    return columnStrings.toArray(new String[0]);
 +  }
 +
 +  /**
 +   * Gets the columns to be mapped over from this job.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return a set of columns
 +   * @since 1.6.0
 +   * @see #fetchColumns(Class, Configuration, Collection)
 +   */
 +  public static Set<Pair<Text,Text>> getFetchedColumns(Class<?> 
implementingClass,
 +      Configuration conf) {
 +    checkArgument(conf != null, "conf is null");
 +    String confValue = conf.get(enumToConfKey(implementingClass, 
ScanOpts.COLUMNS));
 +    List<String> serialized = new ArrayList<>();
 +    if (confValue != null) {
 +      // Split and include any trailing empty strings to allow empty column 
families
 +      for (String val : confValue.split(",", -1)) {
 +        serialized.add(val);
 +      }
 +    }
 +    return deserializeFetchedColumns(serialized);
 +  }
 +
 +  public static Set<Pair<Text,Text>> 
deserializeFetchedColumns(Collection<String> serialized) {
 +    Set<Pair<Text,Text>> columns = new HashSet<>();
 +
 +    if (serialized == null) {
 +      return columns;
 +    }
 +
 +    for (String col : serialized) {
 +      int idx = col.indexOf(":");
 +      Text cf = new Text(idx < 0 ? Base64.getDecoder().decode(col)
 +          : Base64.getDecoder().decode(col.substring(0, idx)));
 +      Text cq = idx < 0 ? null : new 
Text(Base64.getDecoder().decode(col.substring(idx + 1)));
 +      columns.add(new Pair<>(cf, cq));
 +    }
 +    return columns;
 +  }
 +
 +  /**
 +   * Encode an iterator on the input for the single input table associated 
with this job.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param cfg
 +   *          the configuration of the iterator
 +   * @throws IllegalArgumentException
 +   *           if the iterator can't be serialized into the configuration
 +   * @since 1.6.0
 +   */
 +  public static void addIterator(Class<?> implementingClass, Configuration 
conf,
 +      IteratorSetting cfg) {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    String newIter;
 +    try {
 +      cfg.write(new DataOutputStream(baos));
 +      newIter = Base64.getEncoder().encodeToString(baos.toByteArray());
 +      baos.close();
 +    } catch (IOException e) {
 +      throw new IllegalArgumentException("unable to serialize 
IteratorSetting");
 +    }
 +
 +    String confKey = enumToConfKey(implementingClass, ScanOpts.ITERATORS);
 +    String iterators = conf.get(confKey);
 +    // No iterators specified yet, create a new string
 +    if (iterators == null || iterators.isEmpty()) {
 +      iterators = newIter;
 +    } else {
 +      // append the next iterator & reset
 +      iterators = iterators.concat(StringUtils.COMMA_STR + newIter);
 +    }
 +    // Store the iterators w/ the job
 +    conf.set(confKey, iterators);
 +  }
 +
 +  /**
 +   * Controls the automatic adjustment of ranges for this job. This feature 
merges overlapping
 +   * ranges, then splits them to align with tablet boundaries. Disabling this 
feature will cause
 +   * exactly one Map task to be created for each specified range. The default 
setting is enabled. *
 +   *
 +   * <p>
 +   * By default, this feature is <b>enabled</b>.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @see #setRanges(Class, Configuration, Collection)
 +   * @since 1.6.0
 +   */
 +  public static void setAutoAdjustRanges(Class<?> implementingClass, 
Configuration conf,
 +      boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, 
Features.AUTO_ADJUST_RANGES), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has auto-adjust ranges enabled.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return false if the feature is disabled, true otherwise
 +   * @since 1.6.0
 +   * @see #setAutoAdjustRanges(Class, Configuration, boolean)
 +   */
 +  public static Boolean getAutoAdjustRanges(Class<?> implementingClass, 
Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.AUTO_ADJUST_RANGES), true);
 +  }
 +
 +  /**
 +   * Controls the use of the {@link IsolatedScanner} in this job.
 +   *
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setScanIsolation(Class<?> implementingClass, 
Configuration conf,
 +      boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, 
Features.SCAN_ISOLATION), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has isolation enabled.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setScanIsolation(Class, Configuration, boolean)
 +   */
 +  public static Boolean isIsolated(Class<?> implementingClass, Configuration 
conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.SCAN_ISOLATION), false);
 +  }
 +
 +  /**
 +   * Controls the use of the {@link ClientSideIteratorScanner} in this job. 
Enabling this feature
 +   * will cause the iterator stack to be constructed within the Map task, 
rather than within the
 +   * Accumulo TServer. To use this feature, all classes needed for those 
iterators must be available
 +   * on the classpath for the task.
 +   *
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setLocalIterators(Class<?> implementingClass, 
Configuration conf,
 +      boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, 
Features.USE_LOCAL_ITERATORS), enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration uses local iterators.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setLocalIterators(Class, Configuration, boolean)
 +   */
 +  public static Boolean usesLocalIterators(Class<?> implementingClass, 
Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.USE_LOCAL_ITERATORS), false);
 +  }
 +
 +  /**
 +   * Enable reading offline tables. By default, this feature is disabled and 
only online tables are
 +   * scanned. This will make the map reduce job directly read the table's 
files. If the table is not
 +   * offline, then the job will fail. If the table comes online during the 
map reduce job, it is
 +   * likely that the job will fail.
 +   *
 +   * <p>
 +   * To use this option, the map reduce user will need access to read the 
Accumulo directory in
 +   * HDFS.
 +   *
 +   * <p>
 +   * Reading the offline table will create the scan time iterator stack in 
the map process. So any
 +   * iterators that are configured for the table will need to be on the 
mapper's classpath.
 +   *
 +   * <p>
 +   * One way to use this feature is to clone a table, take the clone offline, 
and use the clone as
 +   * the input table for a map reduce job. If you plan to map reduce over the 
data many times, it
 +   * may be better to the compact the table, clone it, take it offline, and 
use the clone for all
 +   * map reduce jobs. The reason to do this is that compaction will reduce 
each tablet in the table
 +   * to one file, and it is faster to read from one file.
 +   *
 +   * <p>
 +   * There are two possible advantages to reading a tables file directly out 
of HDFS. First, you may
 +   * see better read performance. Second, it will support speculative 
execution better. When reading
 +   * an online table speculative execution can put more load on an already 
slow tablet server.
 +   *
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.6.0
 +   */
 +  public static void setOfflineTableScan(Class<?> implementingClass, 
Configuration conf,
 +      boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.SCAN_OFFLINE), 
enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has the offline table scan feature 
enabled.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.6.0
 +   * @see #setOfflineTableScan(Class, Configuration, boolean)
 +   */
 +  public static Boolean isOfflineScan(Class<?> implementingClass, 
Configuration conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.SCAN_OFFLINE), false);
 +  }
 +
 +  /**
 +   * Controls the use of the {@link BatchScanner} in this job. Using this 
feature will group ranges
 +   * by their source tablet per InputSplit and use BatchScanner to read them.
 +   *
 +   * <p>
 +   * By default, this feature is <b>disabled</b>.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param enableFeature
 +   *          the feature is enabled if true, disabled otherwise
 +   * @since 1.7.0
 +   */
 +  public static void setBatchScan(Class<?> implementingClass, Configuration 
conf,
 +      boolean enableFeature) {
 +    conf.setBoolean(enumToConfKey(implementingClass, Features.BATCH_SCANNER), 
enableFeature);
 +  }
 +
 +  /**
 +   * Determines whether a configuration has the BatchScanner feature enabled.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return true if the feature is enabled, false otherwise
 +   * @since 1.7.0
 +   * @see #setBatchScan(Class, Configuration, boolean)
 +   */
 +  public static Boolean isBatchScan(Class<?> implementingClass, Configuration 
conf) {
 +    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.BATCH_SCANNER), false);
 +  }
 +
 +  /**
 +   * Sets configurations for multiple tables at a time.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param configs
 +   *          an array of InputTableConfig objects to associate with the job
 +   * @since 1.6.0
 +   */
 +  public static void setInputTableConfigs(Class<?> implementingClass, 
Configuration conf,
 +      Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
configs) {
 +    MapWritable mapWritable = new MapWritable();
 +    for (Map.Entry<String,
 +        org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
tableConfig : configs
 +            .entrySet())
 +      mapWritable.put(new Text(tableConfig.getKey()), tableConfig.getValue());
 +
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    try {
 +      mapWritable.write(new DataOutputStream(baos));
 +    } catch (IOException e) {
 +      throw new IllegalStateException("Table configuration could not be 
serialized.");
 +    }
 +
 +    String confKey = enumToConfKey(implementingClass, ScanOpts.TABLE_CONFIGS);
 +    conf.set(confKey, Base64.getEncoder().encodeToString(baos.toByteArray()));
 +  }
 +
 +  /**
 +   * Returns all InputTableConfig objects associated with this job.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @return all of the table query configs for the job
 +   * @since 1.6.0
 +   */
 +  public static 
Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig>
 +      getInputTableConfigs(Class<?> implementingClass, Configuration conf) {
 +    Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
configs =
 +        new HashMap<>();
 +    
Map.Entry<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
defaultConfig =
 +        getDefaultInputTableConfig(implementingClass, conf);
 +    if (defaultConfig != null)
 +      configs.put(defaultConfig.getKey(), defaultConfig.getValue());
 +    String configString = conf.get(enumToConfKey(implementingClass, 
ScanOpts.TABLE_CONFIGS));
 +    MapWritable mapWritable = new MapWritable();
 +    if (configString != null) {
 +      try {
 +        byte[] bytes = Base64.getDecoder().decode(configString);
 +        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 +        mapWritable.readFields(new DataInputStream(bais));
 +        bais.close();
 +      } catch (IOException e) {
 +        throw new IllegalStateException("The table query configurations could 
not be deserialized"
 +            + " from the given configuration");
 +      }
 +    }
 +    for (Map.Entry<Writable,Writable> entry : mapWritable.entrySet())
 +      configs.put(entry.getKey().toString(),
 +          (org.apache.accumulo.core.client.mapreduce.InputTableConfig) 
entry.getValue());
 +
 +    return configs;
 +  }
 +
 +  /**
 +   * Returns the InputTableConfig for the given table
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @param tableName
 +   *          the table name for which to fetch the table query config
 +   * @return the table query config for the given table name (if it exists) 
and null if it does not
 +   * @since 1.6.0
 +   */
 +  public static org.apache.accumulo.core.client.mapreduce.InputTableConfig
 +      getInputTableConfig(Class<?> implementingClass, Configuration conf, 
String tableName) {
 +    Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
queryConfigs =
 +        getInputTableConfigs(implementingClass, conf);
 +    return queryConfigs.get(tableName);
 +  }
 +
++  private static String extractNamespace(final String tableName) {
++    final int delimiterPos = tableName.indexOf('.');
++    if (delimiterPos < 1) {
++      return ""; // default namespace
++    } else {
++      return tableName.substring(0, delimiterPos);
++    }
++  }
++
 +  /**
 +   * Validates that the user has permissions on the requested tables
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop configuration object to configure
 +   * @since 1.7.0
 +   */
 +  public static void validatePermissions(Class<?> implementingClass, 
Configuration conf)
 +      throws IOException {
 +    Map<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
inputTableConfigs =
 +        getInputTableConfigs(implementingClass, conf);
 +    try {
 +      AccumuloClient client = client(implementingClass, conf);
 +      if (getInputTableConfigs(implementingClass, conf).size() == 0)
 +        throw new IOException("No table set.");
 +
 +      String principal = getPrincipal(implementingClass, conf);
 +      if (principal == null) {
 +        principal = client.whoami();
 +      }
 +
 +      for (Map.Entry<String,
 +          org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
tableConfig : inputTableConfigs
 +              .entrySet()) {
-         if (!client.securityOperations().hasTablePermission(principal, 
tableConfig.getKey(),
-             TablePermission.READ))
++        final String tableName = tableConfig.getKey();
++        final String namespace = extractNamespace(tableName);
++        final boolean hasTableRead = 
client.securityOperations().hasTablePermission(principal,
++            tableName, TablePermission.READ);
++        final boolean hasNamespaceRead = client.securityOperations()
++            .hasNamespacePermission(principal, namespace, 
NamespacePermission.READ);
++        if (!hasTableRead && !hasNamespaceRead) {
 +          throw new IOException("Unable to access table");
++        }
 +      }
 +      for (Map.Entry<String,
 +          org.apache.accumulo.core.client.mapreduce.InputTableConfig> 
tableConfigEntry : inputTableConfigs
 +              .entrySet()) {
 +        org.apache.accumulo.core.client.mapreduce.InputTableConfig 
tableConfig =
 +            tableConfigEntry.getValue();
 +        if (!tableConfig.shouldUseLocalIterators()) {
 +          if (tableConfig.getIterators() != null) {
 +            for (IteratorSetting iter : tableConfig.getIterators()) {
 +              if 
(!client.tableOperations().testClassLoad(tableConfigEntry.getKey(),
 +                  iter.getIteratorClass(), 
SortedKeyValueIterator.class.getName()))
 +                throw new AccumuloException("Servers are unable to load " + 
iter.getIteratorClass()
 +                    + " as a " + SortedKeyValueIterator.class.getName());
 +            }
 +          }
 +        }
 +      }
 +    } catch (AccumuloException | TableNotFoundException | 
AccumuloSecurityException e) {
 +      throw new IOException(e);
 +    }
 +  }
 +
 +  /**
 +   * Returns the InputTableConfig for the configuration based on the 
properties set using the
 +   * single-table input methods.
 +   *
 +   * @param implementingClass
 +   *          the class whose name will be used as a prefix for the property 
configuration key
 +   * @param conf
 +   *          the Hadoop instance for which to retrieve the configuration
 +   * @return the config object built from the single input table properties 
set on the job
 +   * @since 1.6.0
 +   */
 +  protected static 
Map.Entry<String,org.apache.accumulo.core.client.mapreduce.InputTableConfig>
 +      getDefaultInputTableConfig(Class<?> implementingClass, Configuration 
conf) {
 +    String tableName = getInputTableName(implementingClass, conf);
 +    if (tableName != null) {
 +      org.apache.accumulo.core.client.mapreduce.InputTableConfig queryConfig =
 +          new org.apache.accumulo.core.client.mapreduce.InputTableConfig();
 +      List<IteratorSetting> itrs = getIterators(implementingClass, conf);
 +      if (itrs != null)
 +        queryConfig.setIterators(itrs);
 +      Set<Pair<Text,Text>> columns = getFetchedColumns(implementingClass, 
conf);
 +      if (columns != null)
 +        queryConfig.fetchColumns(columns);
 +      List<Range> ranges = null;
 +      try {
 +        ranges = getRanges(implementingClass, conf);
 +      } catch (IOException e) {
 +        throw new RuntimeException(e);
 +      }
 +      if (ranges != null)
 +        queryConfig.setRanges(ranges);
 +
 +      SamplerConfiguration samplerConfig = 
getSamplerConfiguration(implementingClass, conf);
 +      if (samplerConfig != null) {
 +        queryConfig.setSamplerConfiguration(samplerConfig);
 +      }
 +
 +      queryConfig.setAutoAdjustRanges(getAutoAdjustRanges(implementingClass, 
conf))
 +          .setUseIsolatedScanners(isIsolated(implementingClass, conf))
 +          .setUseLocalIterators(usesLocalIterators(implementingClass, conf))
 +          .setOfflineScan(isOfflineScan(implementingClass, conf));
 +      return Maps.immutableEntry(tableName, queryConfig);
 +    }
 +    return null;
 +  }
 +
 +  public static Map<String,Map<KeyExtent,List<Range>>> binOffline(TableId 
tableId,
 +      List<Range> ranges, ClientContext context) throws AccumuloException, 
TableNotFoundException {
 +
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
 +
 +    if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
 +      Tables.clearCache(context);
 +      if (Tables.getTableState(context, tableId) != TableState.OFFLINE) {
 +        throw new AccumuloException(
 +            "Table is online tableId:" + tableId + " cannot scan table in 
offline mode ");
 +      }
 +    }
 +
 +    for (Range range : ranges) {
 +      Text startRow;
 +
 +      if (range.getStartKey() != null)
 +        startRow = range.getStartKey().getRow();
 +      else
 +        startRow = new Text();
 +
 +      Range metadataRange =
 +          new Range(new KeyExtent(tableId, startRow, 
null).getMetadataEntry(), true, null, false);
 +      Scanner scanner = context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
 +      
MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME);
 +      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
 +      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME);
 +      scanner.setRange(metadataRange);
 +
 +      RowIterator rowIter = new RowIterator(scanner);
 +      KeyExtent lastExtent = null;
 +      while (rowIter.hasNext()) {
 +        Iterator<Map.Entry<Key,Value>> row = rowIter.next();
 +        String last = "";
 +        KeyExtent extent = null;
 +        String location = null;
 +
 +        while (row.hasNext()) {
 +          Map.Entry<Key,Value> entry = row.next();
 +          Key key = entry.getKey();
 +
 +          if (key.getColumnFamily()
 +              
.equals(MetadataSchema.TabletsSection.LastLocationColumnFamily.NAME)) {
 +            last = entry.getValue().toString();
 +          }
 +
 +          if (key.getColumnFamily()
 +              
.equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)
 +              || key.getColumnFamily()
 +                  
.equals(MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME)) {
 +            location = entry.getValue().toString();
 +          }
 +
 +          if 
(MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key))
 {
 +            extent = new KeyExtent(key.getRow(), entry.getValue());
 +          }
 +
 +        }
 +
 +        if (location != null)
 +          return null;
 +
 +        if (!extent.getTableId().equals(tableId)) {
 +          throw new AccumuloException("Saw unexpected table Id " + tableId + 
" " + extent);
 +        }
 +
 +        if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
 +          throw new AccumuloException(" " + lastExtent + " is not previous 
extent " + extent);
 +        }
 +
 +        Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
 +        if (tabletRanges == null) {
 +          tabletRanges = new HashMap<>();
 +          binnedRanges.put(last, tabletRanges);
 +        }
 +
 +        List<Range> rangeList = tabletRanges.get(extent);
 +        if (rangeList == null) {
 +          rangeList = new ArrayList<>();
 +          tabletRanges.put(extent, rangeList);
 +        }
 +
 +        rangeList.add(range);
 +
 +        if (extent.getEndRow() == null
 +            || range.afterEndKey(new 
Key(extent.getEndRow()).followingKey(PartialKey.ROW))) {
 +          break;
 +        }
 +
 +        lastExtent = extent;
 +      }
 +
 +    }
 +    return binnedRanges;
 +  }
 +
 +  private static String toBase64(Writable writable) {
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    try {
 +      writable.write(dos);
 +      dos.close();
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    return Base64.getEncoder().encodeToString(baos.toByteArray());
 +  }
 +
 +  private static <T extends Writable> T fromBase64(T writable, String enc) {
 +    ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.getDecoder().decode(enc));
 +    DataInputStream dis = new DataInputStream(bais);
 +    try {
 +      writable.readFields(dis);
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +    return writable;
 +  }
 +
 +  public static void setSamplerConfiguration(Class<?> implementingClass, 
Configuration conf,
 +      SamplerConfiguration samplerConfig) {
 +    requireNonNull(samplerConfig);
 +
 +    String key = enumToConfKey(implementingClass, ScanOpts.SAMPLER_CONFIG);
 +    String val = toBase64(new SamplerConfigurationImpl(samplerConfig));
 +
 +    conf.set(key, val);
 +  }
 +
 +  private static SamplerConfiguration getSamplerConfiguration(Class<?> 
implementingClass,
 +      Configuration conf) {
 +    String key = enumToConfKey(implementingClass, ScanOpts.SAMPLER_CONFIG);
 +
 +    String encodedSC = conf.get(key);
 +    if (encodedSC == null)
 +      return null;
 +
 +    return fromBase64(new SamplerConfigurationImpl(), 
encodedSC).toSamplerConfiguration();
 +  }
 +
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
index 58bb333,afce3ee..3cb49df
--- 
a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
@@@ -45,9 -55,10 +45,10 @@@ import org.apache.accumulo.core.data.Mu
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.core.security.TablePermission;
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
 -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.conf.Configured;
  import org.apache.hadoop.io.Text;
@@@ -396,56 -428,172 +397,141 @@@ public class AccumuloInputFormatIT exte
      boolean isolated = true, localIters = true;
      Level level = Level.WARN;
  
 -    Connector connector = getConnector();
 -    connector.tableOperations().create(table);
 -
 -    AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
 -    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), 
getAdminToken());
 +    try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      accumuloClient.tableOperations().create(table);
  
 -    AccumuloInputFormat.setInputTableName(job, table);
 -    AccumuloInputFormat.setScanAuthorizations(job, auths);
 -    AccumuloInputFormat.setScanIsolation(job, isolated);
 -    AccumuloInputFormat.setLocalIterators(job, localIters);
 -    AccumuloInputFormat.fetchColumns(job, fetchColumns);
 -    AccumuloInputFormat.setLogLevel(job, level);
 +      ClientInfo ci = getClientInfo();
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
 +          ci.getInstanceName(), ci.getZooKeepers());
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job,
 +          ci.getPrincipal(), ci.getAuthenticationToken());
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
 table);
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
 +          auths);
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanIsolation(job,
 isolated);
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLocalIterators(job,
 +          localIters);
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.fetchColumns(job, 
fetchColumns);
 +      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLogLevel(job, 
level);
  
 -    AccumuloInputFormat aif = new AccumuloInputFormat();
 +      org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat aif =
 +          new org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat();
  
 -    List<InputSplit> splits = aif.getSplits(job);
 +      List<InputSplit> splits = aif.getSplits(job);
  
 -    assertEquals(1, splits.size());
 +      assertEquals(1, splits.size());
  
 -    InputSplit split = splits.get(0);
 +      InputSplit split = splits.get(0);
  
 -    assertEquals(RangeInputSplit.class, split.getClass());
 +      
assertEquals(org.apache.accumulo.core.client.mapreduce.RangeInputSplit.class,
 +          split.getClass());
  
 -    RangeInputSplit risplit = (RangeInputSplit) split;
 +      org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit =
 +          (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
  
 -    assertEquals(getAdminPrincipal(), risplit.getPrincipal());
 -    assertEquals(table, risplit.getTableName());
 -    assertEquals(getAdminToken(), risplit.getToken());
 -    assertEquals(auths, risplit.getAuths());
 -    assertEquals(getConnector().getInstance().getInstanceName(), 
risplit.getInstanceName());
 -    assertEquals(isolated, risplit.isIsolatedScan());
 -    assertEquals(localIters, risplit.usesLocalIterators());
 -    assertEquals(fetchColumns, risplit.getFetchedColumns());
 -    assertEquals(level, risplit.getLogLevel());
 +      assertEquals(table, risplit.getTableName());
 +      assertEquals(isolated, risplit.isIsolatedScan());
 +      assertEquals(localIters, risplit.usesLocalIterators());
 +      assertEquals(fetchColumns, risplit.getFetchedColumns());
 +      assertEquals(level, risplit.getLogLevel());
 +    }
    }
  
+   @Test(expected = IOException.class)
+   public void testGetSplitsNoReadPermission() throws Exception {
+     Job job = Job.getInstance();
+ 
+     String table = getUniqueNames(1)[0];
+     Authorizations auths = new Authorizations("foo");
+     Collection<Pair<Text,Text>> fetchColumns =
+         Collections.singleton(new Pair<>(new Text("foo"), new Text("bar")));
+     boolean isolated = true, localIters = true;
+     Level level = Level.WARN;
+ 
 -    Connector connector = getConnector();
 -    connector.tableOperations().create(table);
 -    connector.securityOperations().revokeTablePermission(connector.whoami(), 
table,
 -        TablePermission.READ);
 -
 -    AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
 -    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), 
getAdminToken());
 -
 -    AccumuloInputFormat.setInputTableName(job, table);
 -    AccumuloInputFormat.setScanAuthorizations(job, auths);
 -    AccumuloInputFormat.setScanIsolation(job, isolated);
 -    AccumuloInputFormat.setLocalIterators(job, localIters);
 -    AccumuloInputFormat.fetchColumns(job, fetchColumns);
 -    AccumuloInputFormat.setLogLevel(job, level);
 -
 -    AccumuloInputFormat aif = new AccumuloInputFormat();
 -
 -    aif.getSplits(job);
++    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
++      client.tableOperations().create(table);
++      client.securityOperations().revokeTablePermission(client.whoami(), 
table,
++          TablePermission.READ);
++
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
++          cluster.getClientConfig());
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job,
++          getAdminPrincipal(), getAdminToken());
++
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
 table);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
++          auths);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanIsolation(job,
 isolated);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLocalIterators(job,
++          localIters);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.fetchColumns(job, 
fetchColumns);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLogLevel(job, 
level);
++
++      org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat aif =
++          new org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat();
++
++      aif.getSplits(job);
++    }
+   }
+ 
+   /*
+    * This tests the case where we do not have Table.READ permission, but we 
do have Namespace.READ.
+    * See issue #1370.
+    */
+   @Test
+   public void testGetSplitsWithNamespaceReadPermission() throws Exception {
+     Job job = Job.getInstance();
+ 
+     final String[] namespaceAndTable = getUniqueNames(2);
+     final String namespace = namespaceAndTable[0];
+     final String tableSimpleName = namespaceAndTable[1];
+     final String table = namespace + "." + tableSimpleName;
+     Authorizations auths = new Authorizations("foo");
+     Collection<Pair<Text,Text>> fetchColumns =
+         Collections.singleton(new Pair<>(new Text("foo"), new Text("bar")));
+     final boolean isolated = true;
+     final boolean localIters = true;
+     Level level = Level.WARN;
+ 
 -    Connector connector = getConnector();
 -    connector.namespaceOperations().create(namespace); // creating namespace 
implies Namespace.READ
 -    connector.tableOperations().create(table);
 -    connector.securityOperations().revokeTablePermission(connector.whoami(), 
table,
 -        TablePermission.READ);
 -
 -    AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
 -    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), 
getAdminToken());
++    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
++      client.namespaceOperations().create(namespace); // creating namespace 
implies Namespace.READ
++      client.tableOperations().create(table);
++      client.securityOperations().revokeTablePermission(client.whoami(), 
table,
++          TablePermission.READ);
+ 
 -    AccumuloInputFormat.setInputTableName(job, table);
 -    AccumuloInputFormat.setScanAuthorizations(job, auths);
 -    AccumuloInputFormat.setScanIsolation(job, isolated);
 -    AccumuloInputFormat.setLocalIterators(job, localIters);
 -    AccumuloInputFormat.fetchColumns(job, fetchColumns);
 -    AccumuloInputFormat.setLogLevel(job, level);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setZooKeeperInstance(job,
++          cluster.getClientConfig());
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setConnectorInfo(job,
++          getAdminPrincipal(), getAdminToken());
+ 
 -    AccumuloInputFormat aif = new AccumuloInputFormat();
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setInputTableName(job,
 table);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanAuthorizations(job,
++          auths);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setScanIsolation(job,
 isolated);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLocalIterators(job,
++          localIters);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.fetchColumns(job, 
fetchColumns);
++      
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.setLogLevel(job, 
level);
+ 
 -    List<InputSplit> splits = aif.getSplits(job);
++      org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat aif =
++          new org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat();
+ 
 -    assertEquals(1, splits.size());
 -  }
++      List<InputSplit> splits = aif.getSplits(job);
+ 
 -  @Test
 -  public void testPartialInputSplitDelegationToConfiguration() throws 
Exception {
 -    String table = getUniqueNames(1)[0];
 -    Connector c = getConnector();
 -    c.tableOperations().create(table);
 -    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
 -    for (int i = 0; i < 100; i++) {
 -      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
 -      m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
 -      bw.addMutation(m);
++      assertEquals(1, splits.size());
+     }
 -    bw.close();
 -
 -    assertEquals(0,
 -        MRTester.main(new String[] {table, 
EmptySplitsAccumuloInputFormat.class.getName()}));
 -    assertEquals(1, assertionErrors.get(table + "_map").size());
 -    assertEquals(1, assertionErrors.get(table + "_cleanup").size());
+   }
+ 
    @Test
 -  public void testPartialFailedInputSplitDelegationToConfiguration() throws 
Exception {
 +  public void testPartialInputSplitDelegationToConfiguration() throws 
Exception {
      String table = getUniqueNames(1)[0];
 -    Connector c = getConnector();
 -    c.tableOperations().create(table);
 -    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
 -    for (int i = 0; i < 100; i++) {
 -      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
 -      m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
 -      bw.addMutation(m);
 -    }
 -    bw.close();
 -
 -    assertEquals(1,
 -        MRTester.main(new String[] {table, 
BadPasswordSplitsAccumuloInputFormat.class.getName()}));
 -    assertEquals(1, assertionErrors.get(table + "_map").size());
 -    // We should fail when the RecordReader fails to get the next key/value 
pair, because the record
 -    // reader is set up with a clientcontext, rather than a
 -    // connector, so it doesn't do fast-fail on bad credentials
 -    assertEquals(2, assertionErrors.get(table + "_cleanup").size());
 -  }
 -
 -  /**
 -   * AccumuloInputFormat which returns an "empty" RangeInputSplit
 -   */
 -  public static class BadPasswordSplitsAccumuloInputFormat extends 
AccumuloInputFormat {
 -
 -    @Override
 -    public List<InputSplit> getSplits(JobContext context) throws IOException {
 -      List<InputSplit> splits = super.getSplits(context);
 -
 -      for (InputSplit split : splits) {
 -        org.apache.accumulo.core.client.mapreduce.RangeInputSplit rangeSplit =
 -            (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split;
 -        rangeSplit.setToken(new PasswordToken("anythingelse"));
 -      }
 -
 -      return splits;
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      c.tableOperations().create(table);
 +      AccumuloOutputFormatIT.insertData(c, table);
 +      assertEquals(0,
 +          MRTester.main(new String[] {table, 
EmptySplitsAccumuloInputFormat.class.getName()}));
 +      assertEquals(1, assertionErrors.get(table + "_map").size());
 +      assertEquals(1, assertionErrors.get(table + "_cleanup").size());
      }
    }
  

Reply via email to