Author: ctubbsii
Date: Tue Jan 15 23:50:42 2013
New Revision: 1433745
URL: http://svn.apache.org/viewvc?rev=1433745&view=rev
Log:
ACCUMULO-769 Deprecated and replaced 1.4.x Mapreduce APIs and updated javadocs
for all Mapreduce classes and updated referencing classes.
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/VersioningIterator.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RegexExample.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/RowHash.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TableToFile.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/UniqueColumns.java
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/metanalysis/IndexMeta.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousVerify.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/multitable/CopyTool.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/randomwalk/sequential/MapRedVerifyTool.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnDefaultTable.java
Tue Jan 15 23:50:42 2013
@@ -29,7 +29,7 @@ public class ClientOnDefaultTable extend
this.defaultTable = table;
}
- @Parameter(names="--table", description="table to use")
+ @Parameter(names = "--table", description = "table to use")
String tableName;
public String getTableName() {
@@ -38,10 +38,11 @@ public class ClientOnDefaultTable extend
return tableName;
}
+ @Override
public void setAccumuloConfigs(Job job) {
super.setAccumuloConfigs(job);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user,
getPassword(), getTableName(), auths);
- AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user,
getPassword(), true, getTableName());
+ AccumuloInputFormat.setInputInfo(job, user, getPassword(), getTableName(),
auths);
+ AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true,
getTableName());
}
-
+
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOnRequiredTable.java
Tue Jan 15 23:50:42 2013
@@ -24,13 +24,13 @@ import com.beust.jcommander.Parameter;
public class ClientOnRequiredTable extends ClientOpts {
- @Parameter(names={"-t", "--table"}, required=true, description="table to
use")
+ @Parameter(names = {"-t", "--table"}, required = true, description = "table
to use")
public String tableName = null;
-
+
@Override
public void setAccumuloConfigs(Job job) {
super.setAccumuloConfigs(job);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), user,
getPassword(), tableName, auths);
- AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user,
getPassword(), true, tableName);
+ AccumuloInputFormat.setInputInfo(job, user, getPassword(), tableName,
auths);
+ AccumuloOutputFormat.setOutputInfo(job, user, getPassword(), true,
tableName);
}
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/cli/ClientOpts.java
Tue Jan 15 23:50:42 2013
@@ -48,7 +48,7 @@ import com.beust.jcommander.IStringConve
import com.beust.jcommander.Parameter;
public class ClientOpts extends Help {
-
+
public static class TimeConverter implements IStringConverter<Long> {
@Override
public Long convert(String value) {
@@ -72,7 +72,12 @@ public class ClientOpts extends Help {
public static class Password {
public byte[] value;
- public Password(String dfault) { value = dfault.getBytes(); }
+
+ public Password(String dfault) {
+ value = dfault.getBytes();
+ }
+
+ @Override
public String toString() {
return new String(value);
}
@@ -92,13 +97,13 @@ public class ClientOpts extends Help {
}
}
- @Parameter(names={"-u", "--user"}, description = "Connection user")
+ @Parameter(names = {"-u", "--user"}, description = "Connection user")
public String user = System.getProperty("user.name");
- @Parameter(names="-p", converter=PasswordConverter.class, description =
"Connection password")
+ @Parameter(names = "-p", converter = PasswordConverter.class, description =
"Connection password")
public Password password = new Password("secret");
- @Parameter(names="--password", converter=PasswordConverter.class,
description = "Enter the connection password", password=true)
+ @Parameter(names = "--password", converter = PasswordConverter.class,
description = "Enter the connection password", password = true)
public Password securePassword = null;
public byte[] getPassword() {
@@ -108,22 +113,22 @@ public class ClientOpts extends Help {
return securePassword.value;
}
- @Parameter(names={"-z", "--keepers"}, description="Comma separated list of
zookeeper hosts (host:port,host:port)")
+ @Parameter(names = {"-z", "--keepers"}, description = "Comma separated list
of zookeeper hosts (host:port,host:port)")
public String zookeepers = "localhost:2181";
- @Parameter(names={"-i", "--instance"}, description="The name of the accumulo
instance")
+ @Parameter(names = {"-i", "--instance"}, description = "The name of the
accumulo instance")
public String instance = null;
- @Parameter(names={"-auths", "--auths"}, converter=AuthConverter.class,
description="the authorizations to use when reading or writing")
+ @Parameter(names = {"-auths", "--auths"}, converter = AuthConverter.class,
description = "the authorizations to use when reading or writing")
public Authorizations auths = Constants.NO_AUTHS;
- @Parameter(names="--debug", description="turn on TRACE-level log messages")
+ @Parameter(names = "--debug", description = "turn on TRACE-level log
messages")
public boolean debug = false;
- @Parameter(names={"-fake", "--mock"}, description="Use a mock Instance")
- public boolean mock=false;
+ @Parameter(names = {"-fake", "--mock"}, description = "Use a mock Instance")
+ public boolean mock = false;
- @Parameter(names="--site-file", description="Read the given accumulo site
file to find the accumulo instance")
+ @Parameter(names = "--site-file", description = "Read the given accumulo
site file to find the accumulo instance")
public String siteFile = null;
public void startDebugLogging() {
@@ -131,7 +136,7 @@ public class ClientOpts extends Help {
Logger.getLogger(Constants.CORE_PACKAGE_NAME).setLevel(Level.TRACE);
}
- @Parameter(names="--trace", description="turn on distributed tracing")
+ @Parameter(names = "--trace", description = "turn on distributed tracing")
public boolean trace = false;
public void startTracing(String applicationName) {
@@ -144,7 +149,8 @@ public class ClientOpts extends Help {
Trace.off();
}
- public void parseArgs(String programName, String[] args, Object ... others) {
+ @Override
+ public void parseArgs(String programName, String[] args, Object... others) {
super.parseArgs(programName, args, others);
startDebugLogging();
startTracing(programName);
@@ -167,10 +173,10 @@ public class ClientOpts extends Help {
@Override
public Iterator<Entry<String,String>> iterator() {
- TreeMap<String, String> map = new TreeMap<String, String>();
- for (Entry<String, String> props :
DefaultConfiguration.getInstance())
+ TreeMap<String,String> map = new TreeMap<String,String>();
+ for (Entry<String,String> props : DefaultConfiguration.getInstance())
map.put(props.getKey(), props.getValue());
- for (Entry<String, String> props : xml)
+ for (Entry<String,String> props : xml)
map.put(props.getKey(), props.getValue());
return map.entrySet().iterator();
}
@@ -199,8 +205,8 @@ public class ClientOpts extends Help {
}
public void setAccumuloConfigs(Job job) {
- AccumuloInputFormat.setZooKeeperInstance(job.getConfiguration(), instance,
zookeepers);
- AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
instance, zookeepers);
+ AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
+ AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
}
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
Tue Jan 15 23:50:42 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -57,9 +58,10 @@ public class AccumuloFileOutputFormat ex
/**
* This helper method provides an AccumuloConfiguration object constructed
from the Accumulo defaults, and overridden with Accumulo properties that have
been
- * stored in the Job's configuration
+ * stored in the Job's configuration.
*
* @since 1.5.0
+ * @see #setAccumuloProperty(Job, Property, Object)
*/
protected static AccumuloConfiguration getAccumuloConfiguration(JobContext
context) {
ConfigurationCopy acuConf = new
ConfigurationCopy(AccumuloConfiguration.getDefaultConfiguration());
@@ -105,12 +107,15 @@ public class AccumuloFileOutputFormat ex
}
/**
+ * Sets the compression type to use for data blocks. Specifying a
compression may require additional libraries to be available to your Job.
+ *
* @param compressionType
- * The type of compression to use. One of "none", "gz", "lzo", or
"snappy". Specifying a compression may require additional libraries to be
available
- * to your Job.
+ * one of "none", "gz", "lzo", or "snappy"
* @since 1.5.0
*/
public static void setCompressionType(Job job, String compressionType) {
+ if (compressionType == null || !Arrays.asList("none", "gz", "lzo",
"snappy").contains(compressionType))
+ throw new IllegalArgumentException("Compression type must be one of:
none, gz, lzo, snappy");
setAccumuloProperty(job, Property.TABLE_FILE_COMPRESSION_TYPE,
compressionType);
}
@@ -128,7 +133,7 @@ public class AccumuloFileOutputFormat ex
}
/**
- * Sets the size for file blocks in the file system; file blocks are
managed, and replicated, by the underlying file system
+ * Sets the size for file blocks in the file system; file blocks are
managed, and replicated, by the underlying file system.
*
* @since 1.5.0
*/
@@ -165,7 +170,7 @@ public class AccumuloFileOutputFormat ex
final Path file = this.getDefaultWorkFile(context, "." + extension);
final LRUMap validVisibilities = new LRUMap(1000);
-
+
return new RecordWriter<Key,Value>() {
FileSKVWriter out = null;
@@ -184,7 +189,7 @@ public class AccumuloFileOutputFormat ex
new ColumnVisibility(cv);
validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv,
cv.length)), Boolean.TRUE);
}
-
+
if (out == null) {
out = FileOperations.getInstance().openWriter(file.toString(),
file.getFileSystem(conf), conf, acuConf);
out.startDefaultLocalityGroup();
@@ -200,32 +205,23 @@ public class AccumuloFileOutputFormat ex
/**
* @deprecated since 1.5.0;
- */
- @SuppressWarnings("unused")
- @Deprecated
- private static final String FILE_TYPE = PREFIX + "file_type";
-
- /**
- * @deprecated since 1.5.0;
- */
- @SuppressWarnings("unused")
- @Deprecated
- private static final String BLOCK_SIZE = PREFIX + "block_size";
-
- /**
- * @deprecated since 1.5.0;
+ * @see #setZooKeeperInstance(Configuration, String, String)
*/
@Deprecated
private static final String INSTANCE_HAS_BEEN_SET = PREFIX +
"instanceConfigured";
/**
* @deprecated since 1.5.0;
+ * @see #setZooKeeperInstance(Configuration, String, String)
+ * @see #getInstance(Configuration)
*/
@Deprecated
private static final String INSTANCE_NAME = PREFIX + "instanceName";
/**
* @deprecated since 1.5.0;
+ * @see #setZooKeeperInstance(Configuration, String, String)
+ * @see #getInstance(Configuration)
*/
@Deprecated
private static final String ZOOKEEPERS = PREFIX + "zooKeepers";
@@ -254,7 +250,8 @@ public class AccumuloFileOutputFormat ex
}
/**
- * @deprecated since 1.5.0; This OutputFormat does not communicate with
Accumulo. If this is needed, subclasses must implement their own configuration.
+ * @deprecated since 1.5.0; This {@link OutputFormat} does not communicate
with Accumulo. If this is needed, subclasses must implement their own
+ * configuration.
*/
@Deprecated
public static void setZooKeeperInstance(Configuration conf, String
instanceName, String zooKeepers) {
@@ -268,7 +265,8 @@ public class AccumuloFileOutputFormat ex
}
/**
- * @deprecated since 1.5.0; This OutputFormat does not communicate with
Accumulo. If this is needed, subclasses must implement their own configuration.
+ * @deprecated since 1.5.0; This {@link OutputFormat} does not communicate
with Accumulo. If this is needed, subclasses must implement their own
+ * configuration.
*/
@Deprecated
protected static Instance getInstance(Configuration conf) {
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
Tue Jan 15 23:50:42 2013
@@ -22,28 +22,28 @@ import java.util.Map.Entry;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
- * This class allows MapReduce jobs to use Accumulo as the source of data.
This input format provides keys and values of type Key and Value to the Map()
and
- * Reduce() functions.
+ * This class allows MapReduce jobs to use Accumulo as the source of data.
This {@link InputFormat} provides keys and values of type {@link Key} and
+ * {@link Value} to the Map function.
*
- * The user must specify the following via static methods:
+ * The user must specify the following via static configurator methods:
*
* <ul>
- * <li>AccumuloInputFormat.setInputTableInfo(job, username, password, table,
auths)
- * <li>AccumuloInputFormat.setZooKeeperInstance(job, instanceName, hosts)
+ * <li>{@link
AccumuloInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String,
byte[], String, org.apache.accumulo.core.security.Authorizations)}
+ * <li>{@link
AccumuloInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job,
String, String)}
* </ul>
*
- * Other static methods are optional
+ * Other static methods are optional.
*/
-
public class AccumuloInputFormat extends InputFormatBase<Key,Value> {
@Override
public RecordReader<Key,Value> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
- log.setLevel(getLogLevel(context.getConfiguration()));
+ log.setLevel(getLogLevel(context));
return new RecordReaderBase<Key,Value>() {
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Tue Jan 15 23:50:42 2013
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Arg
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -54,182 +55,396 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
- * This class allows MapReduce jobs to use Accumulo as the sink of data. This
output format accepts keys and values of type Text (for a table name) and
Mutation
- * from the Map() and Reduce() functions.
+ * This class allows MapReduce jobs to use Accumulo as the sink for data. This
{@link OutputFormat} accepts keys and values of type {@link Text} (for a table
+ * name) and {@link Mutation} from the Map and Reduce functions.
*
- * The user must specify the following via static methods:
+ * The user must specify the following via static configurator methods:
*
* <ul>
- * <li>AccumuloOutputFormat.setOutputInfo(job, username, password,
createTables, defaultTableName)
- * <li>AccumuloOutputFormat.setZooKeeperInstance(job, instanceName, hosts)
+ * <li>{@link AccumuloOutputFormat#setOutputInfo(Job, String, byte[], boolean,
String)}
+ * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)}
* </ul>
*
- * Other static methods are optional
+ * Other static methods are optional.
*/
public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> {
private static final Logger log =
Logger.getLogger(AccumuloOutputFormat.class);
+ /**
+ * Prefix shared by all job configuration property names for this class
+ */
private static final String PREFIX =
AccumuloOutputFormat.class.getSimpleName();
+
+ /**
+ * Used to limit the times a job can be configured with output information
to 1
+ *
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ * @see #getUsername(JobContext)
+ * @see #getPassword(JobContext)
+ * @see #canCreateTables(JobContext)
+ * @see #getDefaultTableName(JobContext)
+ */
private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX +
".configured";
+
+ /**
+ * Used to limit the times a job can be configured with instance information
to 1
+ *
+ * @see #setZooKeeperInstance(Job, String, String)
+ * @see #setMockInstance(Job, String)
+ * @see #getInstance(JobContext)
+ */
private static final String INSTANCE_HAS_BEEN_SET = PREFIX +
".instanceConfigured";
+
+ /**
+ * Key for storing the Accumulo user's name
+ *
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ * @see #getUsername(JobContext)
+ */
private static final String USERNAME = PREFIX + ".username";
+
+ /**
+ * Key for storing the Accumulo user's password
+ *
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ * @see #getPassword(JobContext)
+ */
private static final String PASSWORD = PREFIX + ".password";
+
+ /**
+ * Key for storing the default table to use when the output key is null
+ *
+ * @see #getDefaultTableName(JobContext)
+ */
private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
+ /**
+ * Key for storing the Accumulo instance name to connect to
+ *
+ * @see #setZooKeeperInstance(Job, String, String)
+ * @see #setMockInstance(Job, String)
+ * @see #getInstance(JobContext)
+ */
private static final String INSTANCE_NAME = PREFIX + ".instanceName";
+
+ /**
+ * Key for storing the set of Accumulo zookeeper servers to communicate with
+ *
+ * @see #setZooKeeperInstance(Job, String, String)
+ * @see #getInstance(JobContext)
+ */
private static final String ZOOKEEPERS = PREFIX + ".zooKeepers";
+
+ /**
+ * Key for storing the directive to use the mock instance type
+ *
+ * @see #setMockInstance(Job, String)
+ * @see #getInstance(JobContext)
+ */
private static final String MOCK = ".useMockInstance";
+ /**
+ * Key for storing the directive to create tables that don't exist
+ *
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ * @see #canCreateTables(JobContext)
+ */
private static final String CREATETABLES = PREFIX + ".createtables";
- private static final String LOGLEVEL = PREFIX + ".loglevel";
- private static final String SIMULATE = PREFIX + ".simulate";
- // BatchWriter options
- private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
- private static final String MAX_LATENCY = PREFIX + ".maxlatency";
- private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
- private static final String TIMEOUT = PREFIX + ".timeout";
+ /**
+ * Key for storing the desired logging level
+ *
+ * @see #setLogLevel(Job, Level)
+ * @see #getLogLevel(JobContext)
+ */
+ private static final String LOGLEVEL = PREFIX + ".loglevel";
- private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 *
1024; // 50MB
- private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
- private static final int DEFAULT_NUM_WRITE_THREADS = 2;
+ /**
+ * Key for storing the directive to simulate output instead of actually
writing to a file
+ *
+ * @see #setSimulationMode(Job, boolean)
+ * @see #getSimulationMode(JobContext)
+ */
+ private static final String SIMULATE = PREFIX + ".simulate";
/**
- * Configure the output format.
+ * Sets the minimum information needed to write to Accumulo in this job.
*
- * @param conf
- * the Map/Reduce job object
+ * @param job
+ * the Hadoop job instance to be configured
* @param user
- * the username, which must have the Table.CREATE permission to
create tables
+ * a valid Accumulo user name (user must have Table.CREATE
permission)
* @param passwd
- * the passwd for the username
+ * the user's password
* @param createTables
- * the output format will create new tables as necessary. Table
names can only be alpha-numeric and underscores.
+ * if true, the output format will create new tables as necessary.
Table names can only be alpha-numeric and underscores.
* @param defaultTable
* the table to use when the tablename is null in the write call
+ * @since 1.5.0
*/
- public static void setOutputInfo(Configuration conf, String user, byte[]
passwd, boolean createTables, String defaultTable) {
- if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
+ public static void setOutputInfo(Job job, String user, byte[] passwd,
boolean createTables, String defaultTable) {
+ if (job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
throw new IllegalStateException("Output info can only be set once per
job");
- conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
+ job.getConfiguration().setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
ArgumentChecker.notNull(user, passwd);
- conf.set(USERNAME, user);
- conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
- conf.setBoolean(CREATETABLES, createTables);
+ job.getConfiguration().set(USERNAME, user);
+ job.getConfiguration().set(PASSWORD, new
String(Base64.encodeBase64(passwd)));
+ job.getConfiguration().setBoolean(CREATETABLES, createTables);
if (defaultTable != null)
- conf.set(DEFAULT_TABLE_NAME, defaultTable);
+ job.getConfiguration().set(DEFAULT_TABLE_NAME, defaultTable);
}
- public static void setZooKeeperInstance(Configuration conf, String
instanceName, String zooKeepers) {
- if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+ /**
+ * Configures a {@link ZooKeeperInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @param zooKeepers
+ * a comma-separated list of zookeeper servers
+ * @since 1.5.0
+ */
+ public static void setZooKeeperInstance(Job job, String instanceName, String
zooKeepers) {
+ if (job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
throw new IllegalStateException("Instance info can only be set once per
job");
- conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+ job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
ArgumentChecker.notNull(instanceName, zooKeepers);
- conf.set(INSTANCE_NAME, instanceName);
- conf.set(ZOOKEEPERS, zooKeepers);
- System.out.println("instance set: " + conf.get(INSTANCE_HAS_BEEN_SET));
+ job.getConfiguration().set(INSTANCE_NAME, instanceName);
+ job.getConfiguration().set(ZOOKEEPERS, zooKeepers);
+ System.out.println("instance set: " +
job.getConfiguration().get(INSTANCE_HAS_BEEN_SET));
}
- public static void setMockInstance(Configuration conf, String instanceName) {
- conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
- conf.setBoolean(MOCK, true);
- conf.set(INSTANCE_NAME, instanceName);
+ /**
+ * Configures a {@link MockInstance} for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param instanceName
+ * the Accumulo instance name
+ * @since 1.5.0
+ */
+ public static void setMockInstance(Job job, String instanceName) {
+ job.getConfiguration().setBoolean(INSTANCE_HAS_BEEN_SET, true);
+ job.getConfiguration().setBoolean(MOCK, true);
+ job.getConfiguration().set(INSTANCE_NAME, instanceName);
}
/**
- * see {@link BatchWriterConfig#setMaxMemory(long)}
+ * @since 1.5.0
+ * @see BatchWriterConfig#setMaxMemory(long)
*/
-
- public static void setMaxMutationBufferSize(Configuration conf, long
numberOfBytes) {
- conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+ public static void setMaxMutationBufferSize(Job job, long numberOfBytes) {
+ job.getConfiguration().setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
}
/**
- * see {@link BatchWriterConfig#setMaxLatency(long, TimeUnit)}
+ * @since 1.5.0
+ * @see BatchWriterConfig#setMaxLatency(long, TimeUnit)
*/
-
- public static void setMaxLatency(Configuration conf, int
numberOfMilliseconds) {
- conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+ public static void setMaxLatency(Job job, int numberOfMilliseconds) {
+ job.getConfiguration().setInt(MAX_LATENCY, numberOfMilliseconds);
}
/**
- * see {@link BatchWriterConfig#setMaxWriteThreads(int)}
+ * @since 1.5.0
+ * @see BatchWriterConfig#setMaxWriteThreads(int)
*/
-
- public static void setMaxWriteThreads(Configuration conf, int
numberOfThreads) {
- conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+ public static void setMaxWriteThreads(Job job, int numberOfThreads) {
+ job.getConfiguration().setInt(NUM_WRITE_THREADS, numberOfThreads);
}
/**
- * see {@link BatchWriterConfig#setTimeout(long, TimeUnit)}
+ * @since 1.5.0
+ * @see BatchWriterConfig#setTimeout(long, TimeUnit)
*/
-
- public static void setTimeout(Configuration conf, long time, TimeUnit
timeUnit) {
- conf.setLong(TIMEOUT, timeUnit.toMillis(time));
+ public static void setTimeout(Job job, long time, TimeUnit timeUnit) {
+ job.getConfiguration().setLong(TIMEOUT, timeUnit.toMillis(time));
}
- public static void setLogLevel(Configuration conf, Level level) {
+ /**
+ * Sets the log level for this job.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param level
+ * the logging level
+ * @since 1.5.0
+ */
+ public static void setLogLevel(Job job, Level level) {
ArgumentChecker.notNull(level);
- conf.setInt(LOGLEVEL, level.toInt());
+ job.getConfiguration().setInt(LOGLEVEL, level.toInt());
}
- public static void setSimulationMode(Configuration conf) {
- conf.setBoolean(SIMULATE, true);
+ /**
+ * Sets the directive to use simulation mode for this job. In simulation
mode, no output is produced. This is useful for testing.
+ *
+ * <p>
+ * By default, this feature is <b>disabled</b>.
+ *
+ * @param job
+ * the Hadoop job instance to be configured
+ * @param enableFeature
+ * the feature is enabled if true, disabled otherwise
+ * @since 1.5.0
+ */
+ public static void setSimulationMode(Job job, boolean enableFeature) {
+ job.getConfiguration().setBoolean(SIMULATE, enableFeature);
}
- protected static String getUsername(Configuration conf) {
- return conf.get(USERNAME);
+ /**
+ * Gets the user name from the configuration.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the user name
+ * @since 1.5.0
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ */
+ protected static String getUsername(JobContext context) {
+ return context.getConfiguration().get(USERNAME);
}
/**
- * WARNING: The password is stored in the Configuration and shared with all
MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a
- * string, and is not intended to be secure.
+ * Gets the password from the configuration. WARNING: The password is stored
in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded
to
+ * provide a charset safe conversion to a string, and is not intended to be
secure.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the decoded user password
+ * @since 1.5.0
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
*/
- protected static byte[] getPassword(Configuration conf) {
- return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+ protected static byte[] getPassword(JobContext context) {
+ return Base64.decodeBase64(context.getConfiguration().get(PASSWORD,
"").getBytes());
}
- protected static boolean canCreateTables(Configuration conf) {
- return conf.getBoolean(CREATETABLES, false);
+ /**
+ * Determines whether tables are permitted to be created as needed.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return true if the feature is disabled, false otherwise
+ * @since 1.5.0
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ */
+ protected static boolean canCreateTables(JobContext context) {
+ return context.getConfiguration().getBoolean(CREATETABLES, false);
}
- protected static String getDefaultTableName(Configuration conf) {
- return conf.get(DEFAULT_TABLE_NAME);
+ /**
+ * Gets the default table name from the configuration.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the default table name
+ * @since 1.5.0
+ * @see #setOutputInfo(Job, String, byte[], boolean, String)
+ */
+ protected static String getDefaultTableName(JobContext context) {
+ return context.getConfiguration().get(DEFAULT_TABLE_NAME);
}
- protected static Instance getInstance(Configuration conf) {
- if (conf.getBoolean(MOCK, false))
- return new MockInstance(conf.get(INSTANCE_NAME));
- return new ZooKeeperInstance(conf.get(INSTANCE_NAME),
conf.get(ZOOKEEPERS));
+ /**
+ * Initializes an Accumulo {@link Instance} based on the configuration.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return an Accumulo instance
+ * @since 1.5.0
+ * @see #setZooKeeperInstance(Job, String, String)
+ * @see #setMockInstance(Job, String)
+ */
+ protected static Instance getInstance(JobContext context) {
+ if (context.getConfiguration().getBoolean(MOCK, false))
+ return new MockInstance(context.getConfiguration().get(INSTANCE_NAME));
+ return new
ZooKeeperInstance(context.getConfiguration().get(INSTANCE_NAME),
context.getConfiguration().get(ZOOKEEPERS));
}
- protected static long getMaxMutationBufferSize(Configuration conf) {
- return conf.getLong(MAX_MUTATION_BUFFER_SIZE,
DEFAULT_MAX_MUTATION_BUFFER_SIZE);
+ /**
+ * Gets the corresponding {@link BatchWriterConfig} setting.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the max memory to use (in bytes)
+ * @since 1.5.0
+ * @see #setMaxMutationBufferSize(Job, long)
+ */
+ protected static long getMaxMutationBufferSize(JobContext context) {
+ return context.getConfiguration().getLong(MAX_MUTATION_BUFFER_SIZE, new
BatchWriterConfig().getMaxMemory());
}
- protected static int getMaxLatency(Configuration conf) {
- return conf.getInt(MAX_LATENCY, DEFAULT_MAX_LATENCY);
+ /**
+ * Gets the corresponding {@link BatchWriterConfig} setting.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the max latency to use (in millis)
+ * @since 1.5.0
+ * @see #setMaxLatency(Job, int)
+ */
+ protected static long getMaxLatency(JobContext context) {
+ return context.getConfiguration().getLong(MAX_LATENCY, new
BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS));
}
- protected static int getMaxWriteThreads(Configuration conf) {
- return conf.getInt(NUM_WRITE_THREADS, DEFAULT_NUM_WRITE_THREADS);
+ /**
+ * Gets the corresponding {@link BatchWriterConfig} setting.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the max write threads to use
+ * @since 1.5.0
+ * @see #setMaxWriteThreads(Job, int)
+ */
+ protected static int getMaxWriteThreads(JobContext context) {
+ return context.getConfiguration().getInt(NUM_WRITE_THREADS, new
BatchWriterConfig().getMaxWriteThreads());
}
- protected static long getTimeout(Configuration conf) {
- return conf.getLong(TIMEOUT, Long.MAX_VALUE);
+ /**
+ * Gets the corresponding {@link BatchWriterConfig} setting.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the timeout for write operations
+ * @since 1.5.0
+ * @see #setTimeout(Job, long, TimeUnit)
+ */
+ protected static long getTimeout(JobContext context) {
+ return context.getConfiguration().getLong(TIMEOUT, Long.MAX_VALUE);
}
- protected static Level getLogLevel(Configuration conf) {
- if (conf.get(LOGLEVEL) != null)
- return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+ /**
+ * Gets the log level from this configuration.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return the log level
+ * @since 1.5.0
+ * @see #setLogLevel(Job, Level)
+ */
+ protected static Level getLogLevel(JobContext context) {
+ if (context.getConfiguration().get(LOGLEVEL) != null)
+ return Level.toLevel(context.getConfiguration().getInt(LOGLEVEL,
Level.INFO.toInt()));
return null;
}
- protected static boolean getSimulationMode(Configuration conf) {
- return conf.getBoolean(SIMULATE, false);
+ /**
+ * Determines whether this feature is enabled.
+ *
+ * @param context
+ * the Hadoop context for the configured job
+ * @return true if the feature is enabled, false otherwise
+ * @since 1.5.0
+ * @see #setSimulationMode(Job, boolean)
+ */
+ protected static boolean getSimulationMode(JobContext context) {
+ return context.getConfiguration().getBoolean(SIMULATE, false);
}
+ /**
+ * A base class to be used to create {@link RecordWriter} instances that
write to Accumulo.
+ */
protected static class AccumuloRecordWriter extends
RecordWriter<Text,Mutation> {
private MultiTableBatchWriter mtbw = null;
private HashMap<Text,BatchWriter> bws = null;
@@ -243,26 +458,26 @@ public class AccumuloOutputFormat extend
private Connector conn;
- protected AccumuloRecordWriter(Configuration conf) throws
AccumuloException, AccumuloSecurityException, IOException {
- Level l = getLogLevel(conf);
+ protected AccumuloRecordWriter(TaskAttemptContext context) throws
AccumuloException, AccumuloSecurityException, IOException {
+ Level l = getLogLevel(context);
if (l != null)
- log.setLevel(getLogLevel(conf));
- this.simulate = getSimulationMode(conf);
- this.createTables = canCreateTables(conf);
+ log.setLevel(getLogLevel(context));
+ this.simulate = getSimulationMode(context);
+ this.createTables = canCreateTables(context);
if (simulate)
log.info("Simulating output only. No writes to tables will occur");
this.bws = new HashMap<Text,BatchWriter>();
- String tname = getDefaultTableName(conf);
+ String tname = getDefaultTableName(context);
this.defaultTableName = (tname == null) ? null : new Text(tname);
if (!simulate) {
- this.conn = getInstance(conf).getConnector(getUsername(conf),
getPassword(conf));
- mtbw = conn.createMultiTableBatchWriter(new
BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(conf))
- .setMaxLatency(getMaxLatency(conf),
TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(conf))
- .setTimeout(getTimeout(conf), TimeUnit.MILLISECONDS));
+ this.conn = getInstance(context).getConnector(getUsername(context),
getPassword(context));
+ mtbw = conn.createMultiTableBatchWriter(new
BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(context))
+ .setMaxLatency(getMaxLatency(context),
TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(context))
+ .setTimeout(getTimeout(context), TimeUnit.MILLISECONDS));
}
}
@@ -391,17 +606,13 @@ public class AccumuloOutputFormat extend
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
- checkOutputSpecs(job.getConfiguration());
- }
-
- public void checkOutputSpecs(Configuration conf) throws IOException {
- if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
+ if (!job.getConfiguration().getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
throw new IOException("Output info has not been set.");
- if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+ if (!job.getConfiguration().getBoolean(INSTANCE_HAS_BEEN_SET, false))
throw new IOException("Instance info has not been set.");
try {
- Connector c = getInstance(conf).getConnector(getUsername(conf),
getPassword(conf));
- if (!c.securityOperations().authenticateUser(getUsername(conf),
getPassword(conf)))
+ Connector c = getInstance(job).getConnector(getUsername(job),
getPassword(job));
+ if (!c.securityOperations().authenticateUser(getUsername(job),
getPassword(job)))
throw new IOException("Unable to authenticate user");
} catch (AccumuloException e) {
throw new IOException(e);
@@ -418,9 +629,206 @@ public class AccumuloOutputFormat extend
@Override
public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext
attempt) throws IOException {
try {
- return new AccumuloRecordWriter(attempt.getConfiguration());
+ return new AccumuloRecordWriter(attempt);
} catch (Exception e) {
throw new IOException(e);
}
}
+
+ //
----------------------------------------------------------------------------------------------------
+ // Everything below this line is deprecated and should go away in future
versions
+ //
----------------------------------------------------------------------------------------------------
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String MAX_LATENCY = PREFIX + ".maxlatency";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
+
+ /**
+ * @deprecated since 1.5.0;
+ */
+ @Deprecated
+ private static final String TIMEOUT = PREFIX + ".timeout";
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[],
boolean, String)} instead.
+ */
+ @Deprecated
+ public static void setOutputInfo(Configuration conf, String user, byte[]
passwd, boolean createTables, String defaultTable) {
+ if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
+ throw new IllegalStateException("Output info can only be set once per
job");
+ conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
+
+ ArgumentChecker.notNull(user, passwd);
+ conf.set(USERNAME, user);
+ conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
+ conf.setBoolean(CREATETABLES, createTables);
+ if (defaultTable != null)
+ conf.set(DEFAULT_TABLE_NAME, defaultTable);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setZooKeeperInstance(Job, String,
String)} instead.
+ */
+ @Deprecated
+ public static void setZooKeeperInstance(Configuration conf, String
instanceName, String zooKeepers) {
+ if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
+ throw new IllegalStateException("Instance info can only be set once per
job");
+ conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+
+ ArgumentChecker.notNull(instanceName, zooKeepers);
+ conf.set(INSTANCE_NAME, instanceName);
+ conf.set(ZOOKEEPERS, zooKeepers);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setMockInstance(Job, String)}
instead.
+ */
+ @Deprecated
+ public static void setMockInstance(Configuration conf, String instanceName) {
+ conf.setBoolean(INSTANCE_HAS_BEEN_SET, true);
+ conf.setBoolean(MOCK, true);
+ conf.set(INSTANCE_NAME, instanceName);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setMaxMutationBufferSize(Job, long)}
instead.
+ */
+ @Deprecated
+ public static void setMaxMutationBufferSize(Configuration conf, long
numberOfBytes) {
+ conf.setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setMaxLatency(Job, int)} instead.
+ */
+ @Deprecated
+ public static void setMaxLatency(Configuration conf, int
numberOfMilliseconds) {
+ conf.setInt(MAX_LATENCY, numberOfMilliseconds);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setMaxWriteThreads(Job, int)}
instead.
+ */
+ @Deprecated
+ public static void setMaxWriteThreads(Configuration conf, int
numberOfThreads) {
+ conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setLogLevel(Job, Level)} instead.
+ */
+ @Deprecated
+ public static void setLogLevel(Configuration conf, Level level) {
+ ArgumentChecker.notNull(level);
+ conf.setInt(LOGLEVEL, level.toInt());
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #setSimulationMode(Job, boolean)}
instead.
+ */
+ @Deprecated
+ public static void setSimulationMode(Configuration conf) {
+ conf.setBoolean(SIMULATE, true);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+ */
+ @Deprecated
+ protected static String getUsername(Configuration conf) {
+ return conf.get(USERNAME);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+ */
+ @Deprecated
+ protected static byte[] getPassword(Configuration conf) {
+ return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #canCreateTables(JobContext)} instead.
+ */
+ @Deprecated
+ protected static boolean canCreateTables(Configuration conf) {
+ return conf.getBoolean(CREATETABLES, false);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getDefaultTableName(JobContext)}
instead.
+ */
+ @Deprecated
+ protected static String getDefaultTableName(Configuration conf) {
+ return conf.get(DEFAULT_TABLE_NAME);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getInstance(JobContext)} instead.
+ */
+ @Deprecated
+ protected static Instance getInstance(Configuration conf) {
+ if (conf.getBoolean(MOCK, false))
+ return new MockInstance(conf.get(INSTANCE_NAME));
+ return new ZooKeeperInstance(conf.get(INSTANCE_NAME),
conf.get(ZOOKEEPERS));
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link
#getMaxMutationBufferSize(JobContext)} instead.
+ */
+ @Deprecated
+ protected static long getMaxMutationBufferSize(Configuration conf) {
+ return conf.getLong(MAX_MUTATION_BUFFER_SIZE, new
BatchWriterConfig().getMaxMemory());
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getMaxLatency(JobContext)} instead.
+ */
+ @Deprecated
+ protected static int getMaxLatency(Configuration conf) {
+ Long maxLatency = new
BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS);
+ Integer max = maxLatency >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
Integer.parseInt(Long.toString(maxLatency));
+ return conf.getInt(MAX_LATENCY, max);
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getMaxWriteThreads(JobContext)}
instead.
+ */
+ @Deprecated
+ protected static int getMaxWriteThreads(Configuration conf) {
+ return conf.getInt(NUM_WRITE_THREADS, new
BatchWriterConfig().getMaxWriteThreads());
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getLogLevel(JobContext)} instead.
+ */
+ @Deprecated
+ protected static Level getLogLevel(Configuration conf) {
+ if (conf.get(LOGLEVEL) != null)
+ return Level.toLevel(conf.getInt(LOGLEVEL, Level.INFO.toInt()));
+ return null;
+ }
+
+ /**
+ * @deprecated since 1.5.0; Use {@link #getSimulationMode(JobContext)}
instead.
+ */
+ @Deprecated
+ protected static boolean getSimulationMode(Configuration conf) {
+ return conf.getBoolean(SIMULATE, false);
+ }
+
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1433745&r1=1433744&r2=1433745&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
Tue Jan 15 23:50:42 2013
@@ -24,10 +24,24 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+/**
+ * This class allows MapReduce jobs to use Accumulo as the source of data.
This {@link InputFormat} provides row names as {@link Text} as keys, and a
+ * corresponding {@link PeekingIterator} as a value, which in turn makes the
{@link Key}/{@link Value} pairs for that row available to the Map function.
+ *
+ * The user must specify the following via static configurator methods:
+ *
+ * <ul>
+ * <li>{@link
AccumuloRowInputFormat#setInputInfo(org.apache.hadoop.mapreduce.Job, String,
byte[], String, org.apache.accumulo.core.security.Authorizations)}
+ * <li>{@link
AccumuloRowInputFormat#setZooKeeperInstance(org.apache.hadoop.mapreduce.Job,
String, String)}
+ * </ul>
+ *
+ * Other static methods are optional.
+ */
public class AccumuloRowInputFormat extends
InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>> {
@Override
public RecordReader<Text,PeekingIterator<Entry<Key,Value>>>
createRecordReader(InputSplit split, TaskAttemptContext context) throws
IOException,