Author: ctubbsii Date: Tue Jan 22 18:07:17 2013 New Revision: 1437073 URL: http://svn.apache.org/viewvc?rev=1437073&view=rev Log: ACCUMULO-769 Modify mapreduce API to use the Hadoop static configurator conventions, but done in a way that allows us to standardize and reuse configurator code to support multiple frameworks.
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/ConfiguratorBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/FileOutputConfigurator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/InputConfigurator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/OutputConfigurator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/util/package-info.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java Tue Jan 22 18:07:17 2013 @@ -41,8 +41,12 @@ public class ClientOnDefaultTable extend @Override public void setAccumuloConfigs(Job job) { super.setAccumuloConfigs(job); - AccumuloInputFormat.setInputInfo(job, user, getPassword(), getTableName(), auths); - AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, getTableName()); + AccumuloInputFormat.setConnectorInfo(job, user, getPassword()); + AccumuloInputFormat.setInputTableName(job, getTableName()); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloOutputFormat.setConnectorInfo(job, user, getPassword()); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, getTableName()); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java Tue Jan 22 18:07:17 2013 @@ -30,7 +30,11 @@ public class ClientOnRequiredTable exten @Override public void setAccumuloConfigs(Job job) { super.setAccumuloConfigs(job); - AccumuloInputFormat.setInputInfo(job, user, getPassword(), tableName, auths); - AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true, tableName); + AccumuloInputFormat.setConnectorInfo(job, user, getPassword()); + AccumuloInputFormat.setInputTableName(job, tableName); + AccumuloInputFormat.setScanAuthorizations(job, auths); + AccumuloOutputFormat.setConnectorInfo(job, user, getPassword()); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, tableName); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java Tue Jan 22 18:07:17 2013 @@ -18,20 +18,18 @@ package org.apache.accumulo.core.client. import java.io.IOException; import java.util.Arrays; -import java.util.Map.Entry; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.util.FileOutputConfigurator; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -41,6 +39,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to write output in the Accumulo data file format.<br /> @@ -53,70 +52,33 @@ import org.apache.hadoop.mapreduce.lib.o * supported at this time. */ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { - private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName() + "."; - private static final String ACCUMULO_PROPERTY_PREFIX = PREFIX + "accumuloProperties."; + + private static final Class<?> CLASS = AccumuloFileOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); /** * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been * stored in the Job's configuration. * + * @param context + * the Hadoop context for the configured job * @since 1.5.0 - * @see #setAccumuloProperty(Job, Property, Object) */ protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { - ConfigurationCopy acuConf = new ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration()); - for (Entry<String,String> entry : context.getConfiguration()) - if (entry.getKey().startsWith(ACCUMULO_PROPERTY_PREFIX)) - acuConf.set(Property.getPropertyByKey(entry.getKey().substring(ACCUMULO_PROPERTY_PREFIX.length())), entry.getValue()); - return acuConf; - } - - /** - * The supported Accumulo properties we set in this OutputFormat, that change the behavior of the RecordWriter.<br /> - * These properties correspond to the supported public static setter methods available to this class. - * - * @since 1.5.0 - */ - protected static boolean isSupportedAccumuloProperty(Property property) { - switch (property) { - case TABLE_FILE_COMPRESSION_TYPE: - case TABLE_FILE_COMPRESSED_BLOCK_SIZE: - case TABLE_FILE_BLOCK_SIZE: - case TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX: - case TABLE_FILE_REPLICATION: - return true; - default: - return false; - } - } - - /** - * Helper for transforming Accumulo configuration properties into something that can be stored safely inside the Hadoop Job configuration. - * - * @since 1.5.0 - */ - protected static <T> void setAccumuloProperty(Job job, Property property, T value) { - if (isSupportedAccumuloProperty(property)) { - String val = String.valueOf(value); - if (property.getType().isValidFormat(val)) - job.getConfiguration().set(ACCUMULO_PROPERTY_PREFIX + property.getKey(), val); - else - throw new IllegalArgumentException("Value is not appropriate for property type '" + property.getType() + "'"); - } else - throw new IllegalArgumentException("Unsupported configuration property " + property.getKey()); + return FileOutputConfigurator.getAccumuloConfiguration(CLASS, context.getConfiguration()); } /** * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. * + * @param job + * the Hadoop job instance to be configured * @param compressionType * one of "none", "gz", "lzo", or "snappy" * @since 1.5.0 */ public static void setCompressionType(Job job, String compressionType) { - if (compressionType == null || !Arrays.asList("none", "gz", "lzo", "snappy").contains(compressionType)) - throw new IllegalArgumentException("Compression type must be one of: none, gz, lzo, snappy"); - setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE, compressionType); + FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType); } /** @@ -126,38 +88,54 @@ public class AccumuloFileOutputFormat ex * <p> * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). * + * @param job + * the Hadoop job instance to be configured + * @param dataBlockSize + * the block size, in bytes * @since 1.5.0 */ public static void setDataBlockSize(Job job, long dataBlockSize) { - setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, dataBlockSize); + FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize); } /** * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. * + * @param job + * the Hadoop job instance to be configured + * @param fileBlockSize + * the block size, in bytes * @since 1.5.0 */ public static void setFileBlockSize(Job job, long fileBlockSize) { - setAccumuloProperty(job, Property.TABLE_FILE_BLOCK_SIZE, fileBlockSize); + FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize); } /** * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow * index hierarchy within the file. This can affect the performance of queries. * + * @param job + * the Hadoop job instance to be configured + * @param indexBlockSize + * the block size, in bytes * @since 1.5.0 */ public static void setIndexBlockSize(Job job, long indexBlockSize) { - setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, indexBlockSize); + FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize); } /** * Sets the file system replication factor for the resulting file, overriding the file system default. * + * @param job + * the Hadoop job instance to be configured + * @param replication + * the number of replicas for produced files * @since 1.5.0 */ public static void setReplication(Job job, int replication) { - setAccumuloProperty(job, Property.TABLE_FILE_REPLICATION, replication); + FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication); } @Override @@ -204,35 +182,13 @@ public class AccumuloFileOutputFormat ex // ---------------------------------------------------------------------------------------------------- /** - * @deprecated since 1.5.0; - * @see #setZooKeeperInstance(Configuration, String, String) - */ - @Deprecated - private static final String INSTANCE_HAS_BEEN_SET = PREFIX + "instanceConfigured"; - - /** - * @deprecated since 1.5.0; - * @see #setZooKeeperInstance(Configuration, String, String) - * @see #getInstance(Configuration) - */ - @Deprecated - private static final String INSTANCE_NAME = PREFIX + "instanceName"; - - /** - * @deprecated since 1.5.0; - * @see #setZooKeeperInstance(Configuration, String, String) - * @see #getInstance(Configuration) - */ - @Deprecated - private static final String ZOOKEEPERS = PREFIX + "zooKeepers"; - - /** - * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)} + * @deprecated since 1.5.0; Retrieve the relevant block size from {@link #getAccumuloConfiguration(JobContext)} and configure hadoop's + * io.seqfile.compress.blocksize with the same value. No longer needed, as {@link RFile} does not use this field. */ @Deprecated protected static void handleBlockSize(Configuration conf) { conf.setInt("io.seqfile.compress.blocksize", - (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); + (int) FileOutputConfigurator.getAccumuloConfiguration(CLASS, conf).getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)); } /** @@ -246,7 +202,7 @@ public class AccumuloFileOutputFormat ex */ @Deprecated public static void setBlockSize(Configuration conf, int blockSize) { - conf.set(ACCUMULO_PROPERTY_PREFIX + Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), String.valueOf(blockSize)); + FileOutputConfigurator.setDataBlockSize(CLASS, conf, blockSize); } /** @@ -255,13 +211,7 @@ public class AccumuloFileOutputFormat ex */ @Deprecated public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); + FileOutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers); } /** @@ -270,7 +220,7 @@ public class AccumuloFileOutputFormat ex */ @Deprecated protected static Instance getInstance(Configuration conf) { - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + return FileOutputConfigurator.getInstance(CLASS, conf); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java Tue Jan 22 18:07:17 2013 @@ -21,9 +21,11 @@ import java.util.Map.Entry; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.format.DefaultFormatter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -34,8 +36,10 @@ import org.apache.hadoop.mapreduce.TaskA * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)} + * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)} + * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)} * </ul> * * Other static methods are optional. Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Tue Jan 22 18:07:17 2013 @@ -16,12 +16,7 @@ */ package org.apache.accumulo.core.client.mapreduce; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; @@ -39,14 +34,13 @@ import org.apache.accumulo.core.client.M import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.util.OutputConfigurator; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; -import org.apache.accumulo.core.util.ArgumentChecker; -import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -66,146 +60,70 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setOutputInfo(Job, String, byte[], boolean, String)} - * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} + * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)} * </ul> * * Other static methods are optional. */ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { - private static final Logger log = Logger.getLogger(AccumuloOutputFormat.class); - /** - * Prefix shared by all job configuration property names for this class - */ - private static final String PREFIX = AccumuloOutputFormat.class.getSimpleName(); + private static final Class<?> CLASS = AccumuloOutputFormat.class; + protected static final Logger log = Logger.getLogger(CLASS); /** - * Used to limit the times a job can be configured with output information to 1 + * Sets the connector information needed to communicate with Accumulo in this job. * - * @see #setOutputInfo(Job, String, byte[], boolean, String) - * @see #getUsername(JobContext) - * @see #getPassword(JobContext) - * @see #canCreateTables(JobContext) - * @see #getDefaultTableName(JobContext) - */ - private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; - - /** - * Used to limit the times a job can be configured with instance information to 1 - * - * @see #setZooKeeperInstance(Job, String, String) - * @see #setMockInstance(Job, String) - * @see #getInstance(JobContext) - */ - private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; - - /** - * Key for storing the Accumulo user's name - * - * @see #setOutputInfo(Job, String, byte[], boolean, String) - * @see #getUsername(JobContext) - */ - private static final String USERNAME = PREFIX + ".username"; - - /** - * Key for storing the Accumulo user's password - * - * @see #setOutputInfo(Job, String, byte[], boolean, String) - * @see #getPassword(JobContext) - */ - private static final String PASSWORD = PREFIX + ".password"; - - /** - * Key for storing the default table to use when the output key is null - * - * @see #getDefaultTableName(JobContext) - */ - private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable"; - - /** - * Key for storing the Accumulo instance name to connect to - * - * @see #setZooKeeperInstance(Job, String, String) - * @see #setMockInstance(Job, String) - * @see #getInstance(JobContext) - */ - private static final String INSTANCE_NAME = PREFIX + ".instanceName"; - - /** - * Key for storing the set of Accumulo zookeeper servers to communicate with - * - * @see #setZooKeeperInstance(Job, String, String) - * @see #getInstance(JobContext) - */ - private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; - - /** - * Key for storing the directive to use the mock instance type - * - * @see #setMockInstance(Job, String) - * @see #getInstance(JobContext) - */ - private static final String MOCK = PREFIX + ".useMockInstance"; - - /** - * Key for storing the {@link BatchWriterConfig}. - * - * @see #setBatchWriterOptions(Job, BatchWriterConfig) - * @see #getBatchWriterOptions(JobContext) - */ - private static final String BATCH_WRITER_CONFIG = PREFIX + ".bwConfig"; - - /** - * Key for storing the directive to create tables that don't exist - * - * @see #setOutputInfo(Job, String, byte[], boolean, String) - * @see #canCreateTables(JobContext) + * @param job + * the Hadoop job instance to be configured + * @param user + * a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) + * @param passwd + * the user's password + * @since 1.5.0 */ - private static final String CREATETABLES = PREFIX + ".createtables"; + public static void setConnectorInfo(Job job, String user, byte[] passwd) { + OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), user, passwd); + } /** - * Key for storing the desired logging level + * Determines if the connector has been configured. * - * @see #setLogLevel(Job, Level) - * @see #getLogLevel(JobContext) + * @param context + * the Hadoop context for the configured job + * @return true if the connector has been configured, false otherwise + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, byte[]) */ - private static final String LOGLEVEL = PREFIX + ".loglevel"; + protected static Boolean isConnectorInfoSet(JobContext context) { + return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); + } /** - * Key for storing the directive to simulate output instead of actually writing to a file + * Gets the user name from the configuration. * - * @see #setSimulationMode(Job, boolean) - * @see #getSimulationMode(JobContext) + * @param context + * the Hadoop context for the configured job + * @return the user name + * @since 1.5.0 + * @see #setConnectorInfo(Job, String, byte[]) */ - private static final String SIMULATE = PREFIX + ".simulate"; + protected static String getUsername(JobContext context) { + return OutputConfigurator.getUsername(CLASS, context.getConfiguration()); + } /** - * Sets the minimum information needed to write to Accumulo in this job. + * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to + * provide a charset safe conversion to a string, and is not intended to be secure. * - * @param job - * the Hadoop job instance to be configured - * @param user - * a valid Accumulo user name (user must have Table.CREATE permission) - * @param passwd - * the user's password - * @param createTables - * if true, the output format will create new tables as necessary. Table names can only be alpha-numeric and underscores. - * @param defaultTable - * the table to use when the tablename is null in the write call + * @param context + * the Hadoop context for the configured job + * @return the decoded user password * @since 1.5.0 + * @see #setConnectorInfo(Job, String, byte[]) */ - public static void setOutputInfo(Job job, String user, byte[] passwd, boolean createTables, String defaultTable) { - if (job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Output info can only be set once per job"); - job.getConfiguration().setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(user, passwd); - job.getConfiguration().set(USERNAME, user); - job.getConfiguration().set(PASSWORD, new String(Base64.encodeBase64(passwd))); - job.getConfiguration().setBoolean(CREATETABLES, createTables); - if (defaultTable != null) - job.getConfiguration().set(DEFAULT_TABLE_NAME, defaultTable); + protected static byte[] getPassword(JobContext context) { + return OutputConfigurator.getPassword(CLASS, context.getConfiguration()); } /** @@ -220,13 +138,7 @@ public class AccumuloOutputFormat extend * @since 1.5.0 */ public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { - if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true); - ArgumentChecker.notNull(instanceName, zooKeepers); - job.getConfiguration().set(INSTANCE_NAME, instanceName); - job.getConfiguration().set(ZOOKEEPERS, zooKeepers); - System.out.println("instance set: " + job.getConfiguration().get(INSTANCE_HAS_BEEN_SET)); + OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), instanceName, zooKeepers); } /** @@ -239,32 +151,21 @@ public class AccumuloOutputFormat extend * @since 1.5.0 */ public static void setMockInstance(Job job, String instanceName) { - job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true); - job.getConfiguration().setBoolean(MOCK, true); - job.getConfiguration().set(INSTANCE_NAME, instanceName); + OutputConfigurator.setMockInstance(CLASS, job.getConfiguration(), instanceName); } /** - * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is - * used. Setting the configuration multiple times overwrites any previous configuration. + * Initializes an Accumulo {@link Instance} based on the configuration. * - * @param job - * the Hadoop job instance to be configured - * @param bwConfig - * the configuration for the {@link BatchWriter} + * @param context + * the Hadoop context for the configured job + * @return an Accumulo instance * @since 1.5.0 + * @see #setZooKeeperInstance(Job, String, String) + * @see #setMockInstance(Job, String) */ - public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - String serialized; - try { - bwConfig.write(new DataOutputStream(baos)); - serialized = new String(baos.toByteArray(), Charset.forName("UTF-8")); - baos.close(); - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); - } - job.getConfiguration().set(BATCH_WRITER_CONFIG, serialized); + protected static Instance getInstance(JobContext context) { + return OutputConfigurator.getInstance(CLASS, context.getConfiguration()); } /** @@ -277,134 +178,119 @@ public class AccumuloOutputFormat extend * @since 1.5.0 */ public static void setLogLevel(Job job, Level level) { - ArgumentChecker.notNull(level); - job.getConfiguration().setInt(LOGLEVEL, level.toInt()); + OutputConfigurator.setLogLevel(CLASS, job.getConfiguration(), level); } /** - * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. - * - * <p> - * By default, this feature is <b>disabled</b>. + * Gets the log level from this configuration. * - * @param job - * the Hadoop job instance to be configured - * @param enableFeature - * the feature is enabled if true, disabled otherwise + * @param context + * the Hadoop context for the configured job + * @return the log level * @since 1.5.0 + * @see #setLogLevel(Job, Level) */ - public static void setSimulationMode(Job job, boolean enableFeature) { - job.getConfiguration().setBoolean(SIMULATE, enableFeature); + protected static Level getLogLevel(JobContext context) { + return OutputConfigurator.getLogLevel(CLASS, context.getConfiguration()); } /** - * Gets the user name from the configuration. + * Sets the default table name to use if one emits a null in place of a table name for a given mutation. Table names can only be alpha-numeric and + * underscores. * - * @param context - * the Hadoop context for the configured job - * @return the user name + * @param job + * the Hadoop job instance to be configured + * @param tableName + * the table to use when the tablename is null in the write call * @since 1.5.0 - * @see #setOutputInfo(Job, String, byte[], boolean, String) */ - protected static String getUsername(JobContext context) { - return context.getConfiguration().get(USERNAME); + public static void setDefaultTableName(Job job, String tableName) { + OutputConfigurator.setDefaultTableName(CLASS, job.getConfiguration(), tableName); } /** - * Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to - * provide a charset safe conversion to a string, and is not intended to be secure. + * Gets the default table name from the configuration. * * @param context * the Hadoop context for the configured job - * @return the decoded user password + * @return the default table name * @since 1.5.0 - * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #setDefaultTableName(Job, String) */ - protected static byte[] getPassword(JobContext context) { - return Base64.decodeBase64(context.getConfiguration().get(PASSWORD, "").getBytes()); + protected static String getDefaultTableName(JobContext context) { + return OutputConfigurator.getDefaultTableName(CLASS, context.getConfiguration()); } /** - * Determines whether tables are permitted to be created as needed. + * Sets the configuration for for the job's {@link BatchWriter} instances. If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is + * used. Setting the configuration multiple times overwrites any previous configuration. * - * @param context - * the Hadoop context for the configured job - * @return true if the feature is disabled, false otherwise + * @param job + * the Hadoop job instance to be configured + * @param bwConfig + * the configuration for the {@link BatchWriter} * @since 1.5.0 - * @see #setOutputInfo(Job, String, byte[], boolean, String) */ - protected static boolean canCreateTables(JobContext context) { - return context.getConfiguration().getBoolean(CREATETABLES, false); + public static void setBatchWriterOptions(Job job, BatchWriterConfig bwConfig) { + OutputConfigurator.setBatchWriterOptions(CLASS, job.getConfiguration(), bwConfig); } /** - * Gets the default table name from the configuration. + * Gets the {@link BatchWriterConfig} settings. * * @param context * the Hadoop context for the configured job - * @return the default table name + * @return the configuration object * @since 1.5.0 - * @see #setOutputInfo(Job, String, byte[], boolean, String) + * @see #setBatchWriterOptions(Job, BatchWriterConfig) */ - protected static String getDefaultTableName(JobContext context) { - return context.getConfiguration().get(DEFAULT_TABLE_NAME); + protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { + return OutputConfigurator.getBatchWriterOptions(CLASS, context.getConfiguration()); } /** - * Initializes an Accumulo {@link Instance} based on the configuration. + * Sets the directive to create new tables, as necessary. Table names can only be alpha-numeric and underscores. * - * @param context - * the Hadoop context for the configured job - * @return an Accumulo instance + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise * @since 1.5.0 - * @see #setZooKeeperInstance(Job, String, String) - * @see #setMockInstance(Job, String) */ - protected static Instance getInstance(JobContext context) { - if (context.getConfiguration().getBoolean(MOCK, false)) - return new MockInstance(context.getConfiguration().get(INSTANCE_NAME)); - return new ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME), context.getConfiguration().get(ZOOKEEPERS)); + public static void setCreateTables(Job job, boolean enableFeature) { + OutputConfigurator.setCreateTables(CLASS, job.getConfiguration(), enableFeature); } /** - * Gets the {@link BatchWriterConfig} settings. + * Determines whether tables are permitted to be created as needed. * * @param context * the Hadoop context for the configured job - * @return the configuration object + * @return true if the feature is disabled, false otherwise * @since 1.5.0 - * @see #setBatchWriterOptions(Job, BatchWriterConfig) + * @see #setCreateTables(Job, boolean) */ - protected static BatchWriterConfig getBatchWriterOptions(JobContext context) { - String serialized = context.getConfiguration().get(BATCH_WRITER_CONFIG); - BatchWriterConfig bwConfig = new BatchWriterConfig(); - if (serialized == null || serialized.isEmpty()) { - return bwConfig; - } else { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8"))); - bwConfig.readFields(new DataInputStream(bais)); - bais.close(); - return bwConfig; - } catch (IOException e) { - throw new IllegalArgumentException("unable to serialize " + BatchWriterConfig.class.getName()); - } - } + protected static Boolean canCreateTables(JobContext context) { + return OutputConfigurator.canCreateTables(CLASS, context.getConfiguration()); } /** - * Gets the log level from this configuration. + * Sets the directive to use simulation mode for this job. In simulation mode, no output is produced. This is useful for testing. * - * @param context - * the Hadoop context for the configured job - * @return the log level + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param job + * the Hadoop job instance to be configured + * @param enableFeature + * the feature is enabled if true, disabled otherwise * @since 1.5.0 - * @see #setLogLevel(Job, Level) */ - protected static Level getLogLevel(JobContext context) { - if (context.getConfiguration().get(LOGLEVEL) != null) - return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt())); - return null; + public static void setSimulationMode(Job job, boolean enableFeature) { + OutputConfigurator.setSimulationMode(CLASS, job.getConfiguration(), enableFeature); } /** @@ -416,8 +302,8 @@ public class AccumuloOutputFormat extend * @since 1.5.0 * @see #setSimulationMode(Job, boolean) */ - protected static boolean getSimulationMode(JobContext context) { - return context.getConfiguration().getBoolean(SIMULATE, false); + protected static Boolean getSimulationMode(JobContext context) { + return OutputConfigurator.getSimulationMode(CLASS, context.getConfiguration()); } /** @@ -582,11 +468,10 @@ public class AccumuloOutputFormat extend @Override public void checkOutputSpecs(JobContext job) throws IOException { - if (!job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IOException("Output info has not been set."); - if (!job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IOException("Instance info has not been set."); + if (!isConnectorInfoSet(job)) + throw new IOException("Connector info has not been set."); try { + // if the instance isn't configured, it will complain here Connector c = getInstance(job).getConnector(getUsername(job), getPassword(job)); if (!c.securityOperations().authenticateUser(getUsername(job), getPassword(job))) throw new IOException("Unable to authenticate user"); @@ -616,38 +501,14 @@ public class AccumuloOutputFormat extend // ---------------------------------------------------------------------------------------------------- /** - * @deprecated since 1.5.0; - */ - @Deprecated - private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; - - /** - * @deprecated since 1.5.0; - */ - @Deprecated - private static final String MAX_LATENCY = PREFIX + ".maxlatency"; - - /** - * @deprecated since 1.5.0; - */ - @Deprecated - private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads"; - - /** - * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], boolean, String)} instead. + * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setCreateTables(Job, boolean)}, and + * {@link #setDefaultTableName(Job, String)} instead. */ @Deprecated public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { - if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Output info can only be set once per job"); - conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(user, passwd); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - conf.setBoolean(CREATETABLES, createTables); - if (defaultTable != null) - conf.set(DEFAULT_TABLE_NAME, defaultTable); + OutputConfigurator.setConnectorInfo(CLASS, conf, user, passwd); + OutputConfigurator.setCreateTables(CLASS, conf, createTables); + OutputConfigurator.setDefaultTableName(CLASS, conf, defaultTable); } /** @@ -655,13 +516,7 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setZooKeeperInstance(Configuration conf, String instanceName, String zooKeepers) { - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); + OutputConfigurator.setZooKeeperInstance(CLASS, conf, instanceName, zooKeepers); } /** @@ -669,9 +524,7 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setMockInstance(Configuration conf, String instanceName) { - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - conf.setBoolean(MOCK, true); - conf.set(INSTANCE_NAME, instanceName); + OutputConfigurator.setMockInstance(CLASS, conf, instanceName); } /** @@ -679,7 +532,9 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) { - conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes); + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxMemory(numberOfBytes); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); } /** @@ -687,7 +542,9 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) { - conf.setInt(MAX_LATENCY, numberOfMilliseconds); + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxLatency(numberOfMilliseconds, TimeUnit.MILLISECONDS); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); } /** @@ -695,7 +552,9 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) { - conf.setInt(NUM_WRITE_THREADS, numberOfThreads); + BatchWriterConfig bwConfig = OutputConfigurator.getBatchWriterOptions(CLASS, conf); + bwConfig.setMaxWriteThreads(numberOfThreads); + OutputConfigurator.setBatchWriterOptions(CLASS, conf, bwConfig); } /** @@ -703,8 +562,7 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setLogLevel(Configuration conf, Level level) { - ArgumentChecker.notNull(level); - conf.setInt(LOGLEVEL, level.toInt()); + OutputConfigurator.setLogLevel(CLASS, conf, level); } /** @@ -712,7 +570,7 @@ public class AccumuloOutputFormat extend */ @Deprecated public static void setSimulationMode(Configuration conf) { - conf.setBoolean(SIMULATE, true); + OutputConfigurator.setSimulationMode(CLASS, conf, true); } /** @@ -720,7 +578,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static String getUsername(Configuration conf) { - return conf.get(USERNAME); + return OutputConfigurator.getUsername(CLASS, conf); } /** @@ -728,7 +586,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static byte[] getPassword(Configuration conf) { - return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes()); + return OutputConfigurator.getPassword(CLASS, conf); } /** @@ -736,7 +594,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static boolean canCreateTables(Configuration conf) { - return conf.getBoolean(CREATETABLES, false); + return OutputConfigurator.canCreateTables(CLASS, conf); } /** @@ -744,7 +602,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static String getDefaultTableName(Configuration conf) { - return conf.get(DEFAULT_TABLE_NAME); + return OutputConfigurator.getDefaultTableName(CLASS, conf); } /** @@ -752,9 +610,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static Instance getInstance(Configuration conf) { - if (conf.getBoolean(MOCK, false)) - return new MockInstance(conf.get(INSTANCE_NAME)); - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); + return OutputConfigurator.getInstance(CLASS, conf); } /** @@ -762,7 +618,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static long getMaxMutationBufferSize(Configuration conf) { - return conf.getLong(MAX_MUTATION_BUFFER_SIZE, new BatchWriterConfig().getMaxMemory()); + return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxMemory(); } /** @@ -770,9 +626,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static int getMaxLatency(Configuration conf) { - Long maxLatency = new BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS); - Integer max = maxLatency >= Integer.MAX_VALUE ? Integer.MAX_VALUE : Integer.parseInt(Long.toString(maxLatency)); - return conf.getInt(MAX_LATENCY, max); + return (int) OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxLatency(TimeUnit.MILLISECONDS); } /** @@ -780,7 +634,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static int getMaxWriteThreads(Configuration conf) { - return conf.getInt(NUM_WRITE_THREADS, new BatchWriterConfig().getMaxWriteThreads()); + return OutputConfigurator.getBatchWriterOptions(CLASS, conf).getMaxWriteThreads(); } /** @@ -788,9 +642,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static Level getLogLevel(Configuration conf) { - if (conf.get(LOGLEVEL) != null) - return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt())); - return null; + return OutputConfigurator.getLogLevel(CLASS, conf); } /** @@ -798,7 +650,7 @@ public class AccumuloOutputFormat extend */ @Deprecated protected static boolean getSimulationMode(Configuration conf) { - return conf.getBoolean(SIMULATE, false); + return OutputConfigurator.getSimulationMode(CLASS, conf); } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1437073&r1=1437072&r2=1437073&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Tue Jan 22 18:07:17 2013 @@ -22,10 +22,12 @@ import java.util.Map.Entry; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.PeekingIterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -36,8 +38,10 @@ import org.apache.hadoop.mapreduce.TaskA * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String, byte[], String, org.apache.accumulo.core.security.Authorizations)} - * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job, String, String)} + * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)} + * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} + * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)} * </ul> * * Other static methods are optional.