Author: ctubbsii Date: Sat Jan 26 03:11:36 2013 New Revision: 1438827 URL: http://svn.apache.org/viewvc?rev=1438827&view=rev Log: ACCUMULO-829 Adds a new option to pass in the path to a file that gets added to the distributed cache, with the user's credentials.
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java Sat Jan 26 03:11:36 2013 @@ -22,7 +22,9 @@ import java.util.Map.Entry; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.tokens.AccumuloToken; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -36,7 +38,7 @@ import org.apache.hadoop.mapred.Reporter * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, byte[])} + * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, AccumuloToken)} OR {@link AccumuloInputFormat#setConnectorInfo(JobConf, Path)} * <li>{@link AccumuloInputFormat#setInputTableName(JobConf, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)} Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java Sat Jan 26 03:11:36 2013 @@ -41,7 +41,10 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.tokens.AccumuloToken; +import org.apache.accumulo.core.security.tokens.TokenHelper; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -58,7 +61,7 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, AccumuloToken)} + * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, AccumuloToken)} OR {@link AccumuloOutputFormat#setConnectorInfo(JobConf, Path)} * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)} * </ul> * @@ -72,10 +75,14 @@ public class AccumuloOutputFormat implem /** * Sets the connector information needed to communicate with Accumulo in this job. * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * * @param job * the Hadoop job instance to be configured * @param token - * a valid AccumuloToken (user must have Table.CREATE permission if {@link #setCreateTables(JobConf, boolean)} is set to true) + * a valid AccumuloToken (principal must have Table.CREATE permission) * @since 1.5.0 */ public static void setConnectorInfo(JobConf job, AccumuloToken<?,?> token) { @@ -83,6 +90,22 @@ public class AccumuloOutputFormat implem } /** + * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when + * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the + * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using + * {@link TokenHelper#asBase64String(AccumuloToken)}. + * + * @param job + * the Hadoop job instance to be configured + * @param path + * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication + * @since 1.5.0 + */ + public static void setConnectorInfo(JobConf job, Path path) { + OutputConfigurator.setConnectorInfo(CLASS, job, path); + } + + /** * Determines if the connector has been configured. * * @param job @@ -90,6 +113,7 @@ public class AccumuloOutputFormat implem * @return true if the connector has been configured, false otherwise * @since 1.5.0 * @see #setConnectorInfo(JobConf, AccumuloToken) + * @see #setConnectorInfo(JobConf, Path) */ protected static Boolean isConnectorInfoSet(JobConf job) { return OutputConfigurator.isConnectorInfoSet(CLASS, job); @@ -104,6 +128,7 @@ public class AccumuloOutputFormat implem * @return the decoded user token * @since 1.5.0 * @see #setConnectorInfo(JobConf, AccumuloToken) + * @see #setConnectorInfo(JobConf, Path) */ protected static AccumuloToken<?,?> getToken(JobConf job) { return OutputConfigurator.getToken(CLASS, job); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java Sat Jan 26 03:11:36 2013 @@ -23,7 +23,9 @@ import org.apache.accumulo.core.client.R import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.tokens.AccumuloToken; import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -38,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, byte[])} + * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, AccumuloToken)} OR {@link AccumuloRowInputFormat#setConnectorInfo(JobConf, Path)} * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)} Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java Sat Jan 26 03:11:36 2013 @@ -54,8 +54,11 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.tokens.AccumuloToken; +import org.apache.accumulo.core.security.tokens.TokenHelper; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -83,10 +86,14 @@ public abstract class InputFormatBase<K, /** * Sets the connector information needed to communicate with Accumulo in this job. * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * * @param job * the Hadoop job instance to be configured * @param token - * a valid AccumuloToken (user must have Table.CREATE permission) + * a valid AccumuloToken (principal must have Table.CREATE permission) * @since 1.5.0 */ public static void setConnectorInfo(JobConf job, AccumuloToken<?,?> token) { @@ -94,6 +101,22 @@ public abstract class InputFormatBase<K, } /** + * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when + * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the + * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using + * {@link TokenHelper#asBase64String(AccumuloToken)}. + * + * @param job + * the Hadoop job instance to be configured + * @param path + * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication + * @since 1.5.0 + */ + public static void setConnectorInfo(JobConf job, Path path) { + InputConfigurator.setConnectorInfo(CLASS, job, path); + } + + /** * Determines if the connector has been configured. * * @param job @@ -101,6 +124,7 @@ public abstract class InputFormatBase<K, * @return true if the connector has been configured, false otherwise * @since 1.5.0 * @see #setConnectorInfo(JobConf, AccumuloToken) + * @see #setConnectorInfo(JobConf, Path) */ protected static Boolean isConnectorInfoSet(JobConf job) { return InputConfigurator.isConnectorInfoSet(CLASS, job); @@ -115,6 +139,7 @@ public abstract class InputFormatBase<K, * @return the decoded user Token * @since 1.5.0 * @see #setConnectorInfo(JobConf, AccumuloToken) + * @see #setConnectorInfo(JobConf, Path) */ protected static AccumuloToken<?,?> getToken(JobConf job) { return InputConfigurator.getToken(CLASS, job); Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java Sat Jan 26 03:11:36 2013 @@ -22,7 +22,9 @@ import java.util.Map.Entry; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.tokens.AccumuloToken; import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -36,7 +38,7 @@ import org.apache.hadoop.mapreduce.TaskA * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, AccumuloToken)} OR {@link AccumuloInputFormat#setConnectorInfo(Job, Path)} * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)} Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java Sat Jan 26 03:11:36 2013 @@ -42,8 +42,11 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.tokens.AccumuloToken; +import org.apache.accumulo.core.security.tokens.TokenHelper; import org.apache.accumulo.core.security.tokens.UserPassToken; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -62,7 +65,7 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, AccumuloToken)} OR {@link AccumuloOutputFormat#setConnectorInfo(Job, Path)} * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(Job, String)} * </ul> * @@ -76,10 +79,14 @@ public class AccumuloOutputFormat extend /** * Sets the connector information needed to communicate with Accumulo in this job. * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * * @param job * the Hadoop job instance to be configured * @param token - * a valid AccumuloToken (principal must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true) + * a valid AccumuloToken (principal must have Table.CREATE permission) * @since 1.5.0 */ public static void setConnectorInfo(Job job, AccumuloToken<?,?> token) { @@ -87,13 +94,30 @@ public class AccumuloOutputFormat extend } /** + * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when + * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the + * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using + * {@link TokenHelper#asBase64String(AccumuloToken)}. + * + * @param job + * the Hadoop job instance to be configured + * @param path + * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication + * @since 1.5.0 + */ + public static void setConnectorInfo(Job job, Path path) { + OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path); + } + + /** * Determines if the connector has been configured. * * @param context * the Hadoop context for the configured job * @return true if the connector has been configured, false otherwise * @since 1.5.0 - * @see #setConnectorInfo(Job, String, byte[]) + * @see #setConnectorInfo(Job, AccumuloToken) + * @see #setConnectorInfo(Job, Path) */ protected static Boolean isConnectorInfoSet(JobContext context) { return OutputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); @@ -107,11 +131,12 @@ public class AccumuloOutputFormat extend * @return the AccumuloToken * @since 1.5.0 * @see #setConnectorInfo(Job, AccumuloToken) + * @see #setConnectorInfo(Job, Path) */ protected static AccumuloToken<?,?> getToken(JobContext context) { return OutputConfigurator.getToken(CLASS, context.getConfiguration()); } - + /** * Configures a {@link ZooKeeperInstance} for this job. * @@ -487,8 +512,8 @@ public class AccumuloOutputFormat extend // ---------------------------------------------------------------------------------------------------- /** - * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, byte[])}, {@link #setCreateTables(Job, boolean)}, and - * {@link #setDefaultTableName(Job, String)} instead. + * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, AccumuloToken)}, {@link #setConnectorInfo(Job, Path)}, {@link #setCreateTables(Job, boolean)}, + * and {@link #setDefaultTableName(Job, String)} instead. */ @Deprecated public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean createTables, String defaultTable) { @@ -560,7 +585,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead. */ @Deprecated protected static String getUsername(Configuration conf) { @@ -568,7 +593,7 @@ public class AccumuloOutputFormat extend } /** - * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead. */ @Deprecated protected static byte[] getPassword(Configuration conf) { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java Sat Jan 26 03:11:36 2013 @@ -23,7 +23,9 @@ import org.apache.accumulo.core.client.R import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.tokens.AccumuloToken; import org.apache.accumulo.core.util.PeekingIterator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -38,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskA * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])} + * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, AccumuloToken)} OR {@link AccumuloRowInputFormat#setConnectorInfo(Job, Path)} * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)} Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Sat Jan 26 03:11:36 2013 @@ -63,10 +63,13 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.tokens.AccumuloToken; +import org.apache.accumulo.core.security.tokens.TokenHelper; import org.apache.accumulo.core.security.tokens.UserPassToken; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -96,6 +99,10 @@ public abstract class InputFormatBase<K, /** * Sets the connector information needed to communicate with Accumulo in this job. * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * * @param job * the Hadoop job instance to be configured * @param token @@ -107,6 +114,22 @@ public abstract class InputFormatBase<K, } /** + * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when + * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the + * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using + * {@link TokenHelper#asBase64String(AccumuloToken)}. + * + * @param job + * the Hadoop job instance to be configured + * @param path + * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication + * @since 1.5.0 + */ + public static void setConnectorInfo(Job job, Path path) { + InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path); + } + + /** * Determines if the connector has been configured. * * @param context @@ -114,6 +137,7 @@ public abstract class InputFormatBase<K, * @return true if the connector has been configured, false otherwise * @since 1.5.0 * @see #setConnectorInfo(Job, AccumuloToken) + * @see #setConnectorInfo(Job, Path) */ protected static Boolean isConnectorInfoSet(JobContext context) { return InputConfigurator.isConnectorInfoSet(CLASS, context.getConfiguration()); @@ -127,11 +151,12 @@ public abstract class InputFormatBase<K, * @return the user name * @since 1.5.0 * @see #setConnectorInfo(Job, AccumuloToken) + * @see #setConnectorInfo(Job, Path) */ protected static AccumuloToken<?,?> getToken(JobContext context) { return InputConfigurator.getToken(CLASS, context.getConfiguration()); } - + /** * Configures a {@link ZooKeeperInstance} for this job. * @@ -549,8 +574,7 @@ public abstract class InputFormatBase<K, log.debug("Creating scanner for table: " + getInputTableName(attempt)); log.debug("Authorizations are: " + authorizations); if (isOfflineScan(attempt)) { - scanner = new OfflineScanner(instance, token, Tables.getTableId(instance, getInputTableName(attempt)), - authorizations); + scanner = new OfflineScanner(instance, token, Tables.getTableId(instance, getInputTableName(attempt)), authorizations); } else { scanner = conn.createScanner(getInputTableName(attempt), authorizations); } @@ -946,7 +970,7 @@ public abstract class InputFormatBase<K, } /** - * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, AccumuloToken}, {@link #setInputTableName(Job, String)}, and + * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, AccumuloToken)}, {@link #setInputTableName(Job, String)}, and * {@link #setScanAuthorizations(Job, Authorizations)} instead. */ @Deprecated @@ -1047,7 +1071,7 @@ public abstract class InputFormatBase<K, } /** - * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead. */ @Deprecated protected static String getUsername(Configuration conf) { @@ -1055,7 +1079,7 @@ public abstract class InputFormatBase<K, } /** - * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead. + * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead. */ @Deprecated protected static byte[] getPassword(Configuration conf) { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java Sat Jan 26 03:11:36 2013 @@ -16,6 +16,13 @@ */ package org.apache.accumulo.core.client.mapreduce.lib.util; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.net.URI; +import java.util.Scanner; + import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mock.MockInstance; @@ -23,6 +30,8 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.security.tokens.TokenHelper; import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -38,7 +47,7 @@ public class ConfiguratorBase { * @since 1.5.0 */ public static enum ConnectorInfo { - IS_CONFIGURED, TOKEN + IS_CONFIGURED, TOKEN, TOKEN_IS_CACHE_FILE } /** @@ -76,6 +85,10 @@ public class ConfiguratorBase { /** * Sets the connector information needed to communicate with Accumulo in this job. * + * <p> + * <b>WARNING:</b> The serialized token is stored in the configuration and shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe + * conversion to a string, and is not intended to be secure. + * * @param implementingClass * the class whose name will be used as a prefix for the property configuration key * @param conf @@ -90,10 +103,37 @@ public class ConfiguratorBase { ArgumentChecker.notNull(token); conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); + conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_IS_CACHE_FILE), false); conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), TokenHelper.asBase64String(token)); } /** + * Sets the connector information needed to communicate with Accumulo in this job. The authentication information will be read from the specified file when + * the job runs. This prevents the user's token from being exposed on the Job Tracker web page. The specified path will be placed in the + * {@link DistributedCache}, for better performance during job execution. Users can create the contents of this file using + * {@link TokenHelper#asBase64String(AccumuloToken)}. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param path + * the path to a file in the configured file system, containing the serialized, base-64 encoded {@link AccumuloToken} with the user's authentication + * @since 1.5.0 + */ + public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, Path path) { + if (isConnectorInfoSet(implementingClass, conf)) + throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job"); + + ArgumentChecker.notNull(path); + URI uri = path.toUri(); + conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true); + conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_IS_CACHE_FILE), true); + DistributedCache.addCacheFile(uri, conf); + conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), uri.getPath()); + } + + /** * Determines if the connector info has already been set for this instance. * * @param implementingClass @@ -102,14 +142,15 @@ public class ConfiguratorBase { * the Hadoop configuration object to configure * @return true if the connector info has already been set, false otherwise * @since 1.5.0 + * @see #setConnectorInfo(Class, Configuration, AccumuloToken) + * @see #setConnectorInfo(Class, Configuration, Path) */ public static Boolean isConnectorInfoSet(Class<?> implementingClass, Configuration conf) { return conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), false); } /** - * Gets the AccumuloToken from the configuration. WARNING: The serialized Token is stored in the Configuration and shared with all MapReduce tasks; It is - * BASE64 encoded to provide a charset safe conversion to a string, and is not intended to be secure. + * Gets the AccumuloToken from the configuration. * * @param implementingClass * the class whose name will be used as a prefix for the property configuration key @@ -118,9 +159,42 @@ public class ConfiguratorBase { * @return the AccumuloToken * @since 1.5.0 * @see #setConnectorInfo(Class, Configuration, AccumuloToken) + * @see #setConnectorInfo(Class, Configuration, Path) */ public static AccumuloToken<?,?> getToken(Class<?> implementingClass, Configuration conf) { - return TokenHelper.fromBase64String(conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN))); + if (!isConnectorInfoSet(implementingClass, conf)) + throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " has not been set"); + + String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN)); + + if (conf.getBoolean(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_IS_CACHE_FILE), false)) { + String tokenFile = token; + token = null; + + try { + Path[] cf = DistributedCache.getLocalCacheFiles(conf); + if (cf != null) { + for (Path path : cf) { + if (path.toUri().getPath().endsWith(tokenFile.substring(tokenFile.lastIndexOf('/')))) { + StringBuilder fileContents = new StringBuilder(); + Scanner in = new Scanner(new BufferedReader(new FileReader(path.toString()))); + try { + while (in.hasNextLine()) + fileContents.append(in.nextLine()); + } finally { + in.close(); + } + token = fileContents.toString(); + break; + } + } + } + throw new FileNotFoundException(tokenFile + " not found in distributed cache"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return TokenHelper.fromBase64String(token); } /**