Author: ctubbsii Date: Tue Jan 15 23:50:42 2013 New Revision: 1433745 URL: http://svn.apache.org/viewvc?rev=1433745&view=rev Log: ACCUMULO-769 Deprecated and replaced 1.4.x Mapreduce APIs and updated javadocs for all Mapreduce classes and updated referencing classes.
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java Tue Jan 15 23:50:42 2013 @@ -29,7 +29,7 @@ public class ClientOnDefaultTable extend this.defaultTable = table; } - @Parameter(names="--table", description="table to use") + @Parameter(names = "--table", description = "table to use") String tableName; public String getTableName() { @@ -38,10 +38,11 @@ public class ClientOnDefaultTable extend return tableName; } + @Override public void setAccumuloConfigs(Job job) { super.setAccumuloConfigs(job); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, getPassword(), getTableName(), auths); - AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, getPassword(), true, getTableName()); + AccumuloInputFormat.setInputInfo(job, user, getPassword(), getTableName(), auths); + AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, getTableName()); } - + } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java Tue Jan 15 23:50:42 2013 @@ -24,13 +24,13 @@ import com.beust.jcommander.Parameter; public class ClientOnRequiredTable extends ClientOpts { - @Parameter(names={"-t", "--table"}, required=true, description="table to use") + @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") public String tableName = null; - + @Override public void setAccumuloConfigs(Job job) { super.setAccumuloConfigs(job); - AccumuloInputFormat.setInputInfo(job.getConfiguration(), user, getPassword(), tableName, auths); - AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user, getPassword(), true, tableName); + AccumuloInputFormat.setInputInfo(job, user, getPassword(), tableName, auths); + AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, tableName); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java Tue Jan 15 23:50:42 2013 @@ -48,7 +48,7 @@ import com.beust.jcommander.IStringConve import com.beust.jcommander.Parameter; public class ClientOpts extends Help { - + public static class TimeConverter implements IStringConverter<Long> { @Override public Long convert(String value) { @@ -72,7 +72,12 @@ public class ClientOpts extends Help { public static class Password { public byte[] value; - public Password(String dfault) { value = dfault.getBytes(); } + + public Password(String dfault) { + value = dfault.getBytes(); + } + + @Override public String toString() { return new String(value); } @@ -92,13 +97,13 @@ public class ClientOpts extends Help { } } - @Parameter(names={"-u", "--user"}, description = "Connection user") + @Parameter(names = {"-u", "--user"}, description = "Connection user") public String user = System.getProperty("user.name"); - @Parameter(names="-p", converter=PasswordConverter.class, description = "Connection password") + @Parameter(names = "-p", converter = PasswordConverter.class, description = "Connection password") public Password password = new Password("secret"); - @Parameter(names="--password", converter=PasswordConverter.class, description = "Enter the connection password", password=true) + @Parameter(names = "--password", converter = PasswordConverter.class, description = "Enter the connection password", password = true) public Password securePassword = null; public byte[] getPassword() { @@ -108,22 +113,22 @@ public class ClientOpts extends Help { return securePassword.value; } - @Parameter(names={"-z", "--keepers"}, description="Comma separated list of zookeeper hosts (host:port,host:port)") + @Parameter(names = {"-z", "--keepers"}, description = "Comma separated list of zookeeper hosts (host:port,host:port)") public String zookeepers = "localhost:2181"; - @Parameter(names={"-i", "--instance"}, description="The name of the accumulo instance") + @Parameter(names = {"-i", "--instance"}, description = "The name of the accumulo instance") public String instance = null; - @Parameter(names={"-auths", "--auths"}, converter=AuthConverter.class, description="the authorizations to use when reading or writing") + @Parameter(names = {"-auths", "--auths"}, converter = AuthConverter.class, description = "the authorizations to use when reading or writing") public Authorizations auths = Constants.NO_AUTHS; - @Parameter(names="--debug", description="turn on TRACE-level log messages") + @Parameter(names = "--debug", description = "turn on TRACE-level log messages") public boolean debug = false; - @Parameter(names={"-fake", "--mock"}, description="Use a mock Instance") - public boolean mock=false; + @Parameter(names = {"-fake", "--mock"}, description = "Use a mock Instance") + public boolean mock = false; - @Parameter(names="--site-file", description="Read the given accumulo site file to find the accumulo instance") + @Parameter(names = "--site-file", description = "Read the given accumulo site file to find the accumulo instance") public String siteFile = null; public void startDebugLogging() { @@ -131,7 +136,7 @@ public class ClientOpts extends Help { Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(Level.TRACE); } - @Parameter(names="--trace", description="turn on distributed tracing") + @Parameter(names = "--trace", description = "turn on distributed tracing") public boolean trace = false; public void startTracing(String applicationName) { @@ -144,7 +149,8 @@ public class ClientOpts extends Help { Trace.off(); } - public void parseArgs(String programName, String[] args, Object ... others) { + @Override + public void parseArgs(String programName, String[] args, Object... others) { super.parseArgs(programName, args, others); startDebugLogging(); startTracing(programName); @@ -167,10 +173,10 @@ public class ClientOpts extends Help { @Override public Iterator<Entry<String,String>> iterator() { - TreeMap<String, String> map = new TreeMap<String, String>(); - for (Entry<String, String> props : DefaultConfiguration.getInstance()) + TreeMap<String,String> map = new TreeMap<String,String>(); + for (Entry<String,String> props : DefaultConfiguration.getInstance()) map.put(props.getKey(), props.getValue()); - for (Entry<String, String> props : xml) + for (Entry<String,String> props : xml) map.put(props.getKey(), props.getValue()); return map.entrySet().iterator(); } @@ -199,8 +205,8 @@ public class ClientOpts extends Help { } public void setAccumuloConfigs(Job job) { - AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers); - AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), instance, zookeepers); + AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers); + AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Tue Jan 15 23:50:42 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -57,9 +58,10 @@ public class AccumuloFileOutputFormat ex /** * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been - * stored in the Job's configuration + * stored in the Job's configuration. * * @since 1.5.0 + * @see #setAccumuloProperty(Job, Property, Object) */ protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); @@ -105,12 +107,15 @@ public class AccumuloFileOutputFormat ex } /** + * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. + * * @param compressionType - * The type of compression to use. One of "none", "gz", "lzo", or "snappy". Specifying a compression may require additional libraries to be available - * to your Job. + * one of "none", "gz", "lzo", or "snappy" * @since 1.5.0 */ public static void setCompressionType(Job job, String compressionType) { + if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType)) + throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy"); setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType); } @@ -128,7 +133,7 @@ public class AccumuloFileOutputFormat ex } /** - * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system + * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. * * @since 1.5.0 */ @@ -165,7 +170,7 @@ public class AccumuloFileOutputFormat ex final Path file = this.getDefaultWorkFile(context, "." + extension); final LRUMap validVisibilities = new LRUMap(1000); - + return new RecordWriter<Key,Value>() { FileSKVWriter out = null; @@ -184,7 +189,7 @@ public class AccumuloFileOutputFormat ex new ColumnVisibility(cv); validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } - + if (out == null) { out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf); out.startDefaultLocalityGroup(); @@ -200,32 +205,23 @@ public class AccumuloFileOutputFormat ex /** * @deprecated since 1.5.0; - */ - @SuppressWarnings("unused") - @Deprecated - private static final String FILE_TYPE = PREFIX + "file_type"; - - /** - * @deprecated since 1.5.0; - */ - @SuppressWarnings("unused") - @Deprecated - private static final String BLOCK_SIZE = PREFIX + "block_size"; - - /** - * @deprecated since 1.5.0; + * @see #setZooKeeperInstance(Configuration, String, String) */ @Deprecated private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured"; /** * @deprecated since 1.5.0; + * @see #setZooKeeperInstance(Configuration, String, String) + * @see #getInstance(Configuration) */ @Deprecated private static final String INSTANCE_NAME = PREFIX + "instanceName"; /** * @deprecated since 1.5.0; + * @see #setZooKeeperInstance(Configuration, String, String) + * @see #getInstance(Configuration) */ @Deprecated private static final String ZOOKEEPERS = PREFIX + "zooKeepers"; @@ -254,7 +250,8 @@ public class AccumuloFileOutputFormat ex } /** - * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration. + * @deprecated since 1.5.0; This {@link OutputFormat} does not communicate with Accumulo. If this is needed, subclasses must implement their own + * configuration. */ @Deprecated public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { @@ -268,7 +265,8 @@ public class AccumuloFileOutputFormat ex } /** - * @deprecated since 1.5.0; This OutputFormat does not communicate with Accumulo. If this is needed, subclasses must implement their own configuration. + * @deprecated since 1.5.0; This {@link OutputFormat} does not communicate with Accumulo. If this is needed, subclasses must implement their own + * configuration. */ @Deprecated protected static Instance getInstance(Configuration conf) { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java Tue Jan 15 23:50:42 2013 @@ -22,28 +22,28 @@ import java.util.Map.Entry; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** - * This class allows MapReduce jobs to use Accumulo as the source of data. This input format provides keys and values of type Key and Value to the Map() and - * Reduce() functions. + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and + * {@link Value} to the Map function. * - * The user must specify the following via static methods: + * The user must specify the following via static configurator methods: * * <ul> - * <li>AccumuloInputFormat.setInputTableInfo(job, username, password, table, auths) - * <li>AccumuloInputFormat.setZooKeeperInstance(job, instanceName, hosts) + * <li>{@link AccumuloInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)} * </ul> * - * Other static methods are optional + * Other static methods are optional. */ - public class AccumuloInputFormat extends InputFormatBase<Key,Value> { @Override public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - log.setLevel(getLogLevel(context.getConfiguration())); + log.setLevel(getLogLevel(context)); return new RecordReaderBase<Key,Value>() { @Override public boolean nextKeyValue() throws IOException, InterruptedException { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Tue Jan 15 23:50:42 2013 @@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Arg import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -54,182 +55,396 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; /** - * This class allows MapReduce jobs to use Accumulo as the sink of data. This output format accepts keys and values of type Text (for a table name) and Mutation - * from the Map() and Reduce() functions. + * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} accepts keys and values of type {@link Text} (for a table + * name) and {@link Mutation} from the Map and Reduce functions. * - * The user must specify the following via static methods: + * The user must specify the following via static configurator methods: * * <ul> - * <li>AccumuloOutputFormat.setOutputInfo(job, username, password, createTables, defaultTableName) - * <li>AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, hosts) + * <li>{@link AccumuloOutputFormat#setOutputInfo(Job, String, byte[], boolean, String)} + * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} * </ul> * - * Other static methods are optional + * Other static methods are optional. */ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { private static final Logger log = Logger.getLogger(AccumuloOutputFormat.class); + /** + * Prefix shared by all job configuration property names for this class + */ private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName(); + + /** + * Used to limit the times a job can be configured with output information to 1 + * + * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #getUsername(JobContext) + * @see #getPassword(JobContext) + * @see #canCreateTables(JobContext) + * @see #getDefaultTableName(JobContext) + */ private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; + + /** + * Used to limit the times a job can be configured with instance information to 1 + * + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) + * @see #getInstance(JobContext) + */ private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; + + /** + * Key for storing the Accumulo user's name + * + * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #getUsername(JobContext) + */ private static final String USERNAME = PREFIX + ".username"; + + /** + * Key for storing the Accumulo user's password + * + * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #getPassword(JobContext) + */ private static final String PASSWORD = PREFIX + ".password"; + + /** + * Key for storing the default table to use when the output key is null + * + * @see #getDefaultTableName(JobContext) + */ private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable"; + /** + * Key for storing the Accumulo instance name to connect to + * + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) + * @see #getInstance(JobContext) + */ private static final String INSTANCE_NAME = PREFIX + ".instanceName"; + + /** + * Key for storing the set of Accumulo zookeeper servers to communicate with + * + * @see #setZooKeeperInstance(Job, String, String) + * @see #getInstance(JobContext) + */ private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; + + /** + * Key for storing the directive to use the mock instance type + * + * @see #setMockInstance(Job, String) + * @see #getInstance(JobContext) + */ private static final String MOCK = ".useMockInstance"; + /** + * Key for storing the directive to create tables that don't exist + * + * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #canCreateTables(JobContext) + */ private static final String CREATETABLES = PREFIX + ".createtables"; - private static final String LOGLEVEL = PREFIX + ".loglevel"; - private static final String SIMULATE = PREFIX + ".simulate"; - // BatchWriter options - private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; - private static final String MAX_LATENCY = PREFIX + ".maxlatency"; - private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads"; - private static final String TIMEOUT = PREFIX + ".timeout"; + /** + * Key for storing the desired logging level + * + * @see #setLogLevel(Job, Level) + * @see #getLogLevel(JobContext) + */ + private static final String LOGLEVEL = PREFIX + ".loglevel"; - private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB - private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute - private static final int DEFAULT_NUM_WRITE_THREADS = 2; + /** + * Key for storing the directive to simulate output instead of actually writing to a file + * + * @see #setSimulationMode(Job, boolean) + * @see #getSimulationMode(JobContext) + */ + private static final String SIMULATE = PREFIX + ".simulate"; /** - * Configure the output format. + * Sets the minimum information needed to write to Accumulo in this job. * - * @param conf - * the Map/Reduce job object + * @param job + * the Hadoop job instance to be configured * @param user - * the username, which must have the Table.CREATE permission to create tables + * a valid Accumulo user name (user must have Table.CREATE permission) * @param passwd - * the passwd for the username + * the user's password * @param createTables - * the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores. + * if true, the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores. * @param defaultTable * the table to use when the tablename is null in the write call + * @since 1.5.0 */ - public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { - if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) + public static void setOutputInfo(Job job, String user, byte[] passwd, boolean createTables, String defaultTable) { + if (job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) throw new IllegalStateException("Output info can only be set once per job"); - conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); + job.getConfiguration().setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); ArgumentChecker.notNull(user, passwd); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - conf.setBoolean(CREATETABLES, createTables); + job.getConfiguration().set(USERNAME, user); + job.getConfiguration().set(PASSWORD, new String(Base64.encodeBase64(passwd))); + job.getConfiguration().setBoolean(CREATETABLES, createTables); if (defaultTable != null) - conf.set(DEFAULT_TABLE_NAME, defaultTable); + job.getConfiguration().set(DEFAULT_TABLE_NAME, defaultTable); } - public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) + /** + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 + */ + public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { + if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false)) throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); + job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true); ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); - System.out.println("instance set: " + conf.get(INSTANCE_HAS_BEEN_SET)); + job.getConfiguration().set(INSTANCE_NAME, instanceName); + job.getConfiguration().set(ZOOKEEPERS, zooKeepers); + System.out.println("instance set: " + job.getConfiguration().get(INSTANCE_HAS_BEEN_SET)); } - public static void setMockInstance(Configuration conf, String instanceName) { - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - conf.setBoolean(MOCK, true); - conf.set(INSTANCE_NAME, instanceName); + /** + * Configures a {@link MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(Job job, String instanceName) { + job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true); + job.getConfiguration().setBoolean(MOCK, true); + job.getConfiguration().set(INSTANCE_NAME, instanceName); } /** - * see {@link BatchWriterConfig#setMaxMemory(long)} + * @since 1.5.0 + * @see BatchWriterConfig#setMaxMemory(long) */ - - public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { - conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); + public static void setMaxMutationBufferSize(Job job, long numberOfBytes) { + job.getConfiguration().setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); } /** - * see {@link BatchWriterConfig#setMaxLatency(long, TimeUnit)} + * @since 1.5.0 + * @see BatchWriterConfig#setMaxLatency(long, TimeUnit) */ - - public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { - conf.setInt(MAX_LATENCY, numberOfMilliseconds); + public static void setMaxLatency(Job job, int numberOfMilliseconds) { + job.getConfiguration().setInt(MAX_LATENCY, numberOfMilliseconds); } /** - * see {@link BatchWriterConfig#setMaxWriteThreads(int)} + * @since 1.5.0 + * @see BatchWriterConfig#setMaxWriteThreads(int) */ - - public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { - conf.setInt(NUM_WRITE_THREADS, numberOfThreads); + public static void setMaxWriteThreads(Job job, int numberOfThreads) { + job.getConfiguration().setInt(NUM_WRITE_THREADS, numberOfThreads); } /** - * see {@link BatchWriterConfig#setTimeout(long, TimeUnit)} + * @since 1.5.0 + * @see BatchWriterConfig#setTimeout(long, TimeUnit) */ - - public static void setTimeout(Configuration conf, long time, TimeUnit timeUnit) { - conf.setLong(TIMEOUT, timeUnit.toMillis(time)); + public static void setTimeout(Job job, long time, TimeUnit timeUnit) { + job.getConfiguration().setLong(TIMEOUT, timeUnit.toMillis(time)); } - public static void setLogLevel(Configuration conf, Level level) { + /** + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 + */ + public static void setLogLevel(Job job, Level level) { ArgumentChecker.notNull(level); - conf.setInt(LOGLEVEL, level.toInt()); + job.getConfiguration().setInt(LOGLEVEL, level.toInt()); } - public static void setSimulationMode(Configuration conf) { - conf.setBoolean(SIMULATE, true); + /** + * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setSimulationMode(Job job, boolean enableFeature) { + job.getConfiguration().setBoolean(SIMULATE, enableFeature); } - protected static String getUsername(Configuration conf) { - return conf.get(USERNAME); + /** + * Gets the user name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setOutputInfo(Job, String, byte[], boolean, String) + */ + protected static String getUsername(JobContext context) { + return context.getConfiguration().get(USERNAME); } /** - * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a - * string, and is not intended to be secure. + * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to + * provide a charset safe conversion to a string, and is not intended to be secure. + * + * @param context + * the Hadoop context for the configured job + * @return the decoded user password + * @since 1.5.0 + * @see #setOutputInfo(Job, String, byte[], boolean, String) */ - protected static byte[] getPassword(Configuration conf) { - return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); + protected static byte[] getPassword(JobContext context) { + return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, "").getBytes()); } - protected static boolean canCreateTables(Configuration conf) { - return conf.getBoolean(CREATETABLES, false); + /** + * Determines whether tables are permitted to be created as needed. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is disabled, false otherwise + * @since 1.5.0 + * @see #setOutputInfo(Job, String, byte[], boolean, String) + */ + protected static boolean canCreateTables(JobContext context) { + return context.getConfiguration().getBoolean(CREATETABLES, false); } - protected static String getDefaultTableName(Configuration conf) { - return conf.get(DEFAULT_TABLE_NAME); + /** + * Gets the default table name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the default table name + * @since 1.5.0 + * @see #setOutputInfo(Job, String, byte[], boolean, String) + */ + protected static String getDefaultTableName(JobContext context) { + return context.getConfiguration().get(DEFAULT_TABLE_NAME); } - protected static Instance getInstance(Configuration conf) { - if (conf.getBoolean(MOCK, false)) - return new MockInstance(conf.get(INSTANCE_NAME)); - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + /** + * Initializes an Accumulo {@link Instance} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) + */ + protected static Instance getInstance(JobContext context) { + if (context.getConfiguration().getBoolean(MOCK, false)) + return new MockInstance(context.getConfiguration().get(INSTANCE_NAME)); + return new ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), context.getConfiguration().get(ZOOKEEPERS)); } - protected static long getMaxMutationBufferSize(Configuration conf) { - return conf.getLong(MAX_MUTATION_BUFFER_SIZE, DEFAULT_MAX_MUTATION_BUFFER_SIZE); + /** + * Gets the corresponding {@link BatchWriterConfig} setting. + * + * @param context + * the Hadoop context for the configured job + * @return the max memory to use (in bytes) + * @since 1.5.0 + * @see #setMaxMutationBufferSize(Job, long) + */ + protected static long getMaxMutationBufferSize(JobContext context) { + return context.getConfiguration().getLong(MAX_MUTATION_BUFFER_SIZE, new BatchWriterConfig().getMaxMemory()); } - protected static int getMaxLatency(Configuration conf) { - return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY); + /** + * Gets the corresponding {@link BatchWriterConfig} setting. + * + * @param context + * the Hadoop context for the configured job + * @return the max latency to use (in millis) + * @since 1.5.0 + * @see #setMaxLatency(Job, int) + */ + protected static long getMaxLatency(JobContext context) { + return context.getConfiguration().getLong(MAX_LATENCY, new BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS)); } - protected static int getMaxWriteThreads(Configuration conf) { - return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS); + /** + * Gets the corresponding {@link BatchWriterConfig} setting. + * + * @param context + * the Hadoop context for the configured job + * @return the max write threads to use + * @since 1.5.0 + * @see #setMaxWriteThreads(Job, int) + */ + protected static int getMaxWriteThreads(JobContext context) { + return context.getConfiguration().getInt(NUM_WRITE_THREADS, new BatchWriterConfig().getMaxWriteThreads()); } - protected static long getTimeout(Configuration conf) { - return conf.getLong(TIMEOUT, Long.MAX_VALUE); + /** + * Gets the corresponding {@link BatchWriterConfig} setting. + * + * @param context + * the Hadoop context for the configured job + * @return the timeout for write operations + * @since 1.5.0 + * @see #setTimeout(Job, long, TimeUnit) + */ + protected static long getTimeout(JobContext context) { + return context.getConfiguration().getLong(TIMEOUT, Long.MAX_VALUE); } - protected static Level getLogLevel(Configuration conf) { - if (conf.get(LOGLEVEL) != null) - return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); + /** + * Gets the log level from this configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(Job, Level) + */ + protected static Level getLogLevel(JobContext context) { + if (context.getConfiguration().get(LOGLEVEL) != null) + return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt())); return null; } - protected static boolean getSimulationMode(Configuration conf) { - return conf.getBoolean(SIMULATE, false); + /** + * Determines whether this feature is enabled. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setSimulationMode(Job, boolean) + */ + protected static boolean getSimulationMode(JobContext context) { + return context.getConfiguration().getBoolean(SIMULATE, false); } + /** + * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. + */ protected static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> { private MultiTableBatchWriter mtbw = null; private HashMap<Text,BatchWriter> bws = null; @@ -243,26 +458,26 @@ public class AccumuloOutputFormat extend private Connector conn; - protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(conf); + protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException, AccumuloSecurityException, IOException { + Level l = getLogLevel(context); if (l != null) - log.setLevel(getLogLevel(conf)); - this.simulate = getSimulationMode(conf); - this.createTables = canCreateTables(conf); + log.setLevel(getLogLevel(context)); + this.simulate = getSimulationMode(context); + this.createTables = canCreateTables(context); if (simulate) log.info("Simulating output only. No writes to tables will occur"); this.bws = new HashMap<Text,BatchWriter>(); - String tname = getDefaultTableName(conf); + String tname = getDefaultTableName(context); this.defaultTableName = (tname == null) ? null : new Text(tname); if (!simulate) { - this.conn = getInstance(conf).getConnector(getUsername(conf), getPassword(conf)); - mtbw = conn.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(conf)) - .setMaxLatency(getMaxLatency(conf), TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(conf)) - .setTimeout(getTimeout(conf), TimeUnit.MILLISECONDS)); + this.conn = getInstance(context).getConnector(getUsername(context), getPassword(context)); + mtbw = conn.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(context)) + .setMaxLatency(getMaxLatency(context), TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(context)) + .setTimeout(getTimeout(context), TimeUnit.MILLISECONDS)); } } @@ -391,17 +606,13 @@ public class AccumuloOutputFormat extend @Override public void checkOutputSpecs(JobContext job) throws IOException { - checkOutputSpecs(job.getConfiguration()); - } - - public void checkOutputSpecs(Configuration conf) throws IOException { - if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) + if (!job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) throw new IOException("Output info has not been set."); - if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) + if (!job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false)) throw new IOException("Instance info has not been set."); try { - Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf)); - if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf))) + Connector c = getInstance(job).getConnector(getUsername(job), getPassword(job)); + if (!c.securityOperations().authenticateUser(getUsername(job), getPassword(job))) throw new IOException("Unable to authenticate user"); } catch (AccumuloException e) { throw new IOException(e); @@ -418,9 +629,206 @@ public class AccumuloOutputFormat extend @Override public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException { try { - return new AccumuloRecordWriter(attempt.getConfiguration()); + return new AccumuloRecordWriter(attempt); } catch (Exception e) { throw new IOException(e); } } + + // ---------------------------------------------------------------------------------------------------- + // Everything below this line is deprecated and should go away in future versions + // ---------------------------------------------------------------------------------------------------- + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String MAX_LATENCY = PREFIX + ".maxlatency"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads"; + + /** + * @deprecated since 1.5.0; + */ + @Deprecated + private static final String TIMEOUT = PREFIX + ".timeout"; + + /** + * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], boolean, String)} instead. + */ + @Deprecated + public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { + if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) + throw new IllegalStateException("Output info can only be set once per job"); + conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); + + ArgumentChecker.notNull(user, passwd); + conf.set(USERNAME, user); + conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); + conf.setBoolean(CREATETABLES, createTables); + if (defaultTable != null) + conf.set(DEFAULT_TABLE_NAME, defaultTable); + } + + /** + * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead. + */ + @Deprecated + public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { + if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) + throw new IllegalStateException("Instance info can only be set once per job"); + conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); + + ArgumentChecker.notNull(instanceName, zooKeepers); + conf.set(INSTANCE_NAME, instanceName); + conf.set(ZOOKEEPERS, zooKeepers); + } + + /** + * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead. + */ + @Deprecated + public static void setMockInstance(Configuration conf, String instanceName) { + conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); + conf.setBoolean(MOCK, true); + conf.set(INSTANCE_NAME, instanceName); + } + + /** + * @deprecated since 1.5.0; Use {@link #setMaxMutationBufferSize(Job, long)} instead. + */ + @Deprecated + public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { + conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); + } + + /** + * @deprecated since 1.5.0; Use {@link #setMaxLatency(Job, int)} instead. + */ + @Deprecated + public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { + conf.setInt(MAX_LATENCY, numberOfMilliseconds); + } + + /** + * @deprecated since 1.5.0; Use {@link #setMaxWriteThreads(Job, int)} instead. + */ + @Deprecated + public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { + conf.setInt(NUM_WRITE_THREADS, numberOfThreads); + } + + /** + * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead. + */ + @Deprecated + public static void setLogLevel(Configuration conf, Level level) { + ArgumentChecker.notNull(level); + conf.setInt(LOGLEVEL, level.toInt()); + } + + /** + * @deprecated since 1.5.0; Use {@link #setSimulationMode(Job, boolean)} instead. + */ + @Deprecated + public static void setSimulationMode(Configuration conf) { + conf.setBoolean(SIMULATE, true); + } + + /** + * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead. + */ + @Deprecated + protected static String getUsername(Configuration conf) { + return conf.get(USERNAME); + } + + /** + * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead. + */ + @Deprecated + protected static byte[] getPassword(Configuration conf) { + return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); + } + + /** + * @deprecated since 1.5.0; Use {@link #canCreateTables(JobContext)} instead. + */ + @Deprecated + protected static boolean canCreateTables(Configuration conf) { + return conf.getBoolean(CREATETABLES, false); + } + + /** + * @deprecated since 1.5.0; Use {@link #getDefaultTableName(JobContext)} instead. + */ + @Deprecated + protected static String getDefaultTableName(Configuration conf) { + return conf.get(DEFAULT_TABLE_NAME); + } + + /** + * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead. + */ + @Deprecated + protected static Instance getInstance(Configuration conf) { + if (conf.getBoolean(MOCK, false)) + return new MockInstance(conf.get(INSTANCE_NAME)); + return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + } + + /** + * @deprecated since 1.5.0; Use {@link #getMaxMutationBufferSize(JobContext)} instead. + */ + @Deprecated + protected static long getMaxMutationBufferSize(Configuration conf) { + return conf.getLong(MAX_MUTATION_BUFFER_SIZE, new BatchWriterConfig().getMaxMemory()); + } + + /** + * @deprecated since 1.5.0; Use {@link #getMaxLatency(JobContext)} instead. + */ + @Deprecated + protected static int getMaxLatency(Configuration conf) { + Long maxLatency = new BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS); + Integer max = maxLatency >= Integer.MAX_VALUE ? Integer.MAX_VALUE : Integer.parseInt(Long.toString(maxLatency)); + return conf.getInt(MAX_LATENCY, max); + } + + /** + * @deprecated since 1.5.0; Use {@link #getMaxWriteThreads(JobContext)} instead. + */ + @Deprecated + protected static int getMaxWriteThreads(Configuration conf) { + return conf.getInt(NUM_WRITE_THREADS, new BatchWriterConfig().getMaxWriteThreads()); + } + + /** + * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead. + */ + @Deprecated + protected static Level getLogLevel(Configuration conf) { + if (conf.get(LOGLEVEL) != null) + return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); + return null; + } + + /** + * @deprecated since 1.5.0; Use {@link #getSimulationMode(JobContext)} instead. + */ + @Deprecated + protected static boolean getSimulationMode(Configuration conf) { + return conf.getBoolean(SIMULATE, false); + } + } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Tue Jan 15 23:50:42 2013 @@ -24,10 +24,24 @@ import org.apache.accumulo.core.data.Key import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.PeekingIterator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +/** + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a + * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloRowInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)} + * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)} + * </ul> + * + * Other static methods are optional. + */ public class AccumuloRowInputFormat extends InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>> { @Override public RecordReader<Text,PeekingIterator<Entry<Key,Value>>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,