Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java Wed Jan 23 20:51:59 2013 @@ -21,29 +21,33 @@ import java.util.Map.Entry; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** - * This class allows MapReduce jobs to use Accumulo as the source of data. This input format provides keys and values of type Key and Value to the Map() and - * Reduce() functions. + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and + * {@link Value} to the Map function. * - * The user must specify the following via static methods: + * The user must specify the following via static configurator methods: * * <ul> - * <li>AccumuloInputFormat.setInputTableInfo(job, username, password, table, auths) - * <li>AccumuloInputFormat.setZooKeeperInstance(job, instanceName, hosts) + * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)} + * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)} * </ul> * - * Other static methods are optional + * Other static methods are optional. */ - public class AccumuloInputFormat extends InputFormatBase<Key,Value> { @Override public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - log.setLevel(getLogLevel(context.getConfiguration())); + log.setLevel(getLogLevel(context)); return new RecordReaderBase<Key,Value>() { @Override public boolean nextKeyValue() throws IOException, InterruptedException {
Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Wed Jan 23 20:51:59 2013 @@ -34,16 +34,18 @@ import org.apache.accumulo.core.client.M import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.util.OutputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; -import org.apache.accumulo.core.util.ArgumentChecker; -import org.apache.commons.codec.binary.Base64; +import org.apache.accumulo.core.security.tokens.AccumuloToken; +import org.apache.accumulo.core.security.tokens.UserPassToken; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -54,182 +56,245 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; /** - * This class allows MapReduce jobs to use Accumulo as the sink of data. This output format accepts keys and values of type Text (for a table name) and Mutation - * from the Map() and Reduce() functions. + * This class allows MapReduce jobs to use Accumulo as the sink for data. This {@link OutputFormat} accepts keys and values of type {@link Text} (for a table + * name) and {@link Mutation} from the Map and Reduce functions. * - * The user must specify the following via static methods: + * The user must specify the following via static configurator methods: * * <ul> - * <li>AccumuloOutputFormat.setOutputInfo(job, username, password, createTables, defaultTableName) - * <li>AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, hosts) + * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)} * </ul> * - * Other static methods are optional + * Other static methods are optional. */ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { - private static final Logger log = Logger.getLogger(AccumuloOutputFormat.class); - private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName(); - private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; - private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; - private static final String USERNAME = PREFIX + ".username"; - private static final String PASSWORD = PREFIX + ".password"; - private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable"; - - private static final String INSTANCE_NAME = PREFIX + ".instanceName"; - private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; - private static final String MOCK = ".useMockInstance"; - - private static final String CREATETABLES = PREFIX + ".createtables"; - private static final String LOGLEVEL = PREFIX + ".loglevel"; - private static final String SIMULATE = PREFIX + ".simulate"; - - // BatchWriter options - private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; - private static final String MAX_LATENCY = PREFIX + ".maxlatency"; - private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads"; - private static final String TIMEOUT = PREFIX + ".timeout"; - - private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB - private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute - private static final int DEFAULT_NUM_WRITE_THREADS = 2; - - /** - * Configure the output format. - * - * @param conf - * the Map/Reduce job object - * @param user - * the username, which must have the Table.CREATE permission to create tables - * @param passwd - * the passwd for the username - * @param createTables - * the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores. - * @param defaultTable - * the table to use when the tablename is null in the write call - */ - public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { - if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Output info can only be set once per job"); - conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(user, passwd); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - conf.setBoolean(CREATETABLES, createTables); - if (defaultTable != null) - conf.set(DEFAULT_TABLE_NAME, defaultTable); - } - - public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); - System.out.println("instance set: " + conf.get(INSTANCE_HAS_BEEN_SET)); - } - - public static void setMockInstance(Configuration conf, String instanceName) { - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - conf.setBoolean(MOCK, true); - conf.set(INSTANCE_NAME, instanceName); - } + private static final Class<?> CLASS = AccumuloOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); /** - * see {@link BatchWriterConfig#setMaxMemory(long)} + * Sets the connector information needed to communicate with Accumulo in this job. + * + * @param job + * the Hadoop job instance to be configured + * @param token + * a valid AccumuloToken (principal must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) + * @since 1.5.0 */ - - public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { - conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); + public static void setConnectorInfo(Job job, AccumuloToken<?,?> token) { + OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), token); } /** - * see {@link BatchWriterConfig#setMaxLatency(long, TimeUnit)} + * Determines if the connector has been configured. + * + * @param context + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, byte[]) */ - - public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { - conf.setInt(MAX_LATENCY, numberOfMilliseconds); + protected static Boolean isConnectorInfoSet(JobContext context) { + return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); } /** - * see {@link BatchWriterConfig#setMaxWriteThreads(int)} + * Gets the user name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the AccumuloToken + * @since 1.5.0 + * @see #setConnectorInfo(Job, AccumuloToken) */ - - public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { - conf.setInt(NUM_WRITE_THREADS, numberOfThreads); + protected static AccumuloToken<?,?> getToken(JobContext context) { + return OutputConfigurator.getToken(CLASS, context.getConfiguration()); } - + /** - * see {@link BatchWriterConfig#setTimeout(long, TimeUnit)} + * Configures a {@link ZooKeeperInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @param zooKeepers + * a comma-separated list of zookeeper servers + * @since 1.5.0 */ - - public static void setTimeout(Configuration conf, long time, TimeUnit timeUnit) { - conf.setLong(TIMEOUT, timeUnit.toMillis(time)); - } - - public static void setLogLevel(Configuration conf, Level level) { - ArgumentChecker.notNull(level); - conf.setInt(LOGLEVEL, level.toInt()); + public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { + OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers); } - public static void setSimulationMode(Configuration conf) { - conf.setBoolean(SIMULATE, true); + /** + * Configures a {@link MockInstance} for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param instanceName + * the Accumulo instance name + * @since 1.5.0 + */ + public static void setMockInstance(Job job, String instanceName) { + OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); } - protected static String getUsername(Configuration conf) { - return conf.get(USERNAME); + /** + * Initializes an Accumulo {@link Instance} based on the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return an Accumulo instance + * @since 1.5.0 + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) + */ + protected static Instance getInstance(JobContext context) { + return OutputConfigurator.getInstance(CLASS, context.getConfiguration()); } /** - * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a - * string, and is not intended to be secure. + * Sets the log level for this job. + * + * @param job + * the Hadoop job instance to be configured + * @param level + * the logging level + * @since 1.5.0 */ - protected static byte[] getPassword(Configuration conf) { - return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); + public static void setLogLevel(Job job, Level level) { + OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); } - protected static boolean canCreateTables(Configuration conf) { - return conf.getBoolean(CREATETABLES, false); + /** + * Gets the log level from this configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the log level + * @since 1.5.0 + * @see #setLogLevel(Job, Level) + */ + protected static Level getLogLevel(JobContext context) { + return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration()); } - protected static String getDefaultTableName(Configuration conf) { - return conf.get(DEFAULT_TABLE_NAME); + /** + * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and + * underscores. + * + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table to use when the tablename is null in the write call + * @since 1.5.0 + */ + public static void setDefaultTableName(Job job, String tableName) { + OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName); } - protected static Instance getInstance(Configuration conf) { - if (conf.getBoolean(MOCK, false)) - return new MockInstance(conf.get(INSTANCE_NAME)); - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + /** + * Gets the default table name from the configuration. + * + * @param context + * the Hadoop context for the configured job + * @return the default table name + * @since 1.5.0 + * @see #setDefaultTableName(Job, String) + */ + protected static String getDefaultTableName(JobContext context) { + return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration()); } - protected static long getMaxMutationBufferSize(Configuration conf) { - return conf.getLong(MAX_MUTATION_BUFFER_SIZE, DEFAULT_MAX_MUTATION_BUFFER_SIZE); + /** + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is + * used. Setting the configuration multiple times overwrites any previous configuration. + * + * @param job + * the Hadoop job instance to be configured + * @param bwConfig + * the configuration for the {@link BatchWriter} + * @since 1.5.0 + */ + public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { + OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig); } - protected static int getMaxLatency(Configuration conf) { - return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY); + /** + * Gets the {@link BatchWriterConfig} settings. + * + * @param context + * the Hadoop context for the configured job + * @return the configuration object + * @since 1.5.0 + * @see #setBatchWriterOptions(Job, BatchWriterConfig) + */ + protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { + return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration()); } - protected static int getMaxWriteThreads(Configuration conf) { - return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS); + /** + * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setCreateTables(Job job, boolean enableFeature) { + OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature); } - protected static long getTimeout(Configuration conf) { - return conf.getLong(TIMEOUT, Long.MAX_VALUE); + /** + * Determines whether tables are permitted to be created as needed. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is disabled, false otherwise + * @since 1.5.0 + * @see #setCreateTables(Job, boolean) + */ + protected static Boolean canCreateTables(JobContext context) { + return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration()); } - protected static Level getLogLevel(Configuration conf) { - if (conf.get(LOGLEVEL) != null) - return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); - return null; + /** + * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.5.0 + */ + public static void setSimulationMode(Job job, boolean enableFeature) { + OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature); } - protected static boolean getSimulationMode(Configuration conf) { - return conf.getBoolean(SIMULATE, false); + /** + * Determines whether this feature is enabled. + * + * @param context + * the Hadoop context for the configured job + * @return true if the feature is enabled, false otherwise + * @since 1.5.0 + * @see #setSimulationMode(Job, boolean) + */ + protected static Boolean getSimulationMode(JobContext context) { + return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration()); } + /** + * A base class to be used to create {@link RecordWriter} instances that write to Accumulo. + */ protected static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> { private MultiTableBatchWriter mtbw = null; private HashMap<Text,BatchWriter> bws = null; @@ -243,26 +308,24 @@ public class AccumuloOutputFormat extend private Connector conn; - protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException, IOException { - Level l = getLogLevel(conf); + protected AccumuloRecordWriter(TaskAttemptContext context) throws AccumuloException, AccumuloSecurityException, IOException { + Level l = getLogLevel(context); if (l != null) - log.setLevel(getLogLevel(conf)); - this.simulate = getSimulationMode(conf); - this.createTables = canCreateTables(conf); + log.setLevel(getLogLevel(context)); + this.simulate = getSimulationMode(context); + this.createTables = canCreateTables(context); if (simulate) log.info("Simulating output only. No writes to tables will occur"); this.bws = new HashMap<Text,BatchWriter>(); - String tname = getDefaultTableName(conf); + String tname = getDefaultTableName(context); this.defaultTableName = (tname == null) ? null : new Text(tname); if (!simulate) { - this.conn = getInstance(conf).getConnector(getUsername(conf), getPassword(conf)); - mtbw = conn.createMultiTableBatchWriter(new BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(conf)) - .setMaxLatency(getMaxLatency(conf), TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(conf)) - .setTimeout(getTimeout(conf), TimeUnit.MILLISECONDS)); + this.conn = getInstance(context).getConnector(getToken(context)); + mtbw = conn.createMultiTableBatchWriter(getBatchWriterOptions(context)); } } @@ -391,17 +454,12 @@ public class AccumuloOutputFormat extend @Override public void checkOutputSpecs(JobContext job) throws IOException { - checkOutputSpecs(job.getConfiguration()); - } - - public void checkOutputSpecs(Configuration conf) throws IOException { - if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IOException("Output info has not been set."); - if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IOException("Instance info has not been set."); + if (!isConnectorInfoSet(job)) + throw new IOException("Connector info has not been set."); try { - Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf)); - if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf))) + // if the instance isn't configured, it will complain here + Connector c = getInstance(job).getConnector(getToken(job)); + if (!c.securityOperations().authenticateUser(getToken(job))) throw new IOException("Unable to authenticate user"); } catch (AccumuloException e) { throw new IOException(e); @@ -418,9 +476,172 @@ public class AccumuloOutputFormat extend @Override public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException { try { - return new AccumuloRecordWriter(attempt.getConfiguration()); + return new AccumuloRecordWriter(attempt); } catch (Exception e) { throw new IOException(e); } } + + // ---------------------------------------------------------------------------------------------------- + // Everything below this line is deprecated and should go away in future versions + // ---------------------------------------------------------------------------------------------------- + + /** + * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setCreateTables(Job, boolean)}, and + * {@link #setDefaultTableName(Job, String)} instead. + */ + @Deprecated + public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { + OutputConfigurator.setConnectorInfo(CLASS, conf, new UserPassToken(user, passwd)); + OutputConfigurator.setCreateTables(CLASS, conf, createTables); + OutputConfigurator.setDefaultTableName(CLASS, conf, defaultTable); + } + + /** + * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String, String)} instead. + */ + @Deprecated + public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { + OutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers); + } + + /** + * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)} instead. + */ + @Deprecated + public static void setMockInstance(Configuration conf, String instanceName) { + OutputConfigurator.setMockInstance(CLASS, conf, instanceName); + } + + /** + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. + */ + @Deprecated + public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxMemory(numberOfBytes); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); + } + + /** + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. + */ + @Deprecated + public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxLatency(numberOfMilliseconds, TimeUnit.MILLISECONDS); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); + } + + /** + * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, BatchWriterConfig)} instead. + */ + @Deprecated + public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxWriteThreads(numberOfThreads); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); + } + + /** + * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead. + */ + @Deprecated + public static void setLogLevel(Configuration conf, Level level) { + OutputConfigurator.setLogLevel(CLASS, conf, level); + } + + /** + * @deprecated since 1.5.0; Use {@link #setSimulationMode(Job, boolean)} instead. + */ + @Deprecated + public static void setSimulationMode(Configuration conf) { + OutputConfigurator.setSimulationMode(CLASS, conf, true); + } + + /** + * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead. + */ + @Deprecated + protected static String getUsername(Configuration conf) { + return OutputConfigurator.getToken(CLASS, conf).getPrincipal(); + } + + /** + * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead. + */ + @Deprecated + protected static byte[] getPassword(Configuration conf) { + AccumuloToken<?,?> token = OutputConfigurator.getToken(CLASS, conf); + if (token instanceof UserPassToken) { + UserPassToken upt = (UserPassToken) token; + return upt.getPassword(); + } + throw new RuntimeException("Not applicable for non-UserPassTokens"); + } + + /** + * @deprecated since 1.5.0; Use {@link #canCreateTables(JobContext)} instead. + */ + @Deprecated + protected static boolean canCreateTables(Configuration conf) { + return OutputConfigurator.canCreateTables(CLASS, conf); + } + + /** + * @deprecated since 1.5.0; Use {@link #getDefaultTableName(JobContext)} instead. + */ + @Deprecated + protected static String getDefaultTableName(Configuration conf) { + return OutputConfigurator.getDefaultTableName(CLASS, conf); + } + + /** + * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead. + */ + @Deprecated + protected static Instance getInstance(Configuration conf) { + return OutputConfigurator.getInstance(CLASS, conf); + } + + /** + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. + */ + @Deprecated + protected static long getMaxMutationBufferSize(Configuration conf) { + return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxMemory(); + } + + /** + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. + */ + @Deprecated + protected static int getMaxLatency(Configuration conf) { + return (int) OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxLatency(TimeUnit.MILLISECONDS); + } + + /** + * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} instead. + */ + @Deprecated + protected static int getMaxWriteThreads(Configuration conf) { + return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxWriteThreads(); + } + + /** + * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead. + */ + @Deprecated + protected static Level getLogLevel(Configuration conf) { + return OutputConfigurator.getLogLevel(CLASS, conf); + } + + /** + * @deprecated since 1.5.0; Use {@link #getSimulationMode(JobContext)} instead. + */ + @Deprecated + protected static boolean getSimulationMode(Configuration conf) { + return OutputConfigurator.getSimulationMode(CLASS, conf); + } + } Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1437726&r1=1437725&r2=1437726&view=diff ============================================================================== --- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original) +++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Wed Jan 23 20:51:59 2013 @@ -22,12 +22,30 @@ import java.util.Map.Entry; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.PeekingIterator; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +/** + * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a + * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. + * + * The user must specify the following via static configurator methods: + * + * <ul> + * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)} + * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} + * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)} + * </ul> + * + * Other static methods are optional. + */ public class AccumuloRowInputFormat extends InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>> { @Override public RecordReader<Text,PeekingIterator<Entry<Key,Value>>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
