Author: ctubbsii
Date: Sat Jan 26 03:11:36 2013
New Revision: 1438827

URL: http://svn.apache.org/viewvc?rev=1438827&view=rev
Log:
ACCUMULO-829 Adds a new option to pass in the path to a file that gets added to 
the distributed cache, with the user's credentials.

Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -22,7 +22,9 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -36,7 +38,7 @@ import org.apache.hadoop.mapred.Reporter
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, byte[])}
+ * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, AccumuloToken)} OR 
{@link AccumuloInputFormat#setConnectorInfo(JobConf, Path)}
  * <li>{@link AccumuloInputFormat#setInputTableName(JobConf, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, 
Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, 
String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -41,7 +41,10 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.TokenHelper;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +61,7 @@ import org.apache.log4j.Logger;
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, AccumuloToken)}
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, AccumuloToken)} 
OR {@link AccumuloOutputFormat#setConnectorInfo(JobConf, Path)}
  * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, String, 
String)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}
  * </ul>
  * 
@@ -72,10 +75,14 @@ public class AccumuloOutputFormat implem
   /**
    * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
+   * <p>
+   * <b>WARNING:</b> The serialized token is stored in the configuration and 
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+   * conversion to a string, and is not intended to be secure.
+   * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param token
-   *          a valid AccumuloToken (user must have Table.CREATE permission if 
{@link #setCreateTables(JobConf, boolean)} is set to true)
+   *          a valid AccumuloToken (principal must have Table.CREATE 
permission)
    * @since 1.5.0
    */
   public static void setConnectorInfo(JobConf job, AccumuloToken<?,?> token) {
@@ -83,6 +90,22 @@ public class AccumuloOutputFormat implem
   }
   
   /**
+   * Sets the connector information needed to communicate with Accumulo in 
this job. The authentication information will be read from the specified file 
when
+   * the job runs. This prevents the user's token from being exposed on the 
Job Tracker web page. The specified path will be placed in the
+   * {@link DistributedCache}, for better performance during job execution. 
Users can create the contents of this file using
+   * {@link TokenHelper#asBase64String(AccumuloToken)}.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param path
+   *          the path to a file in the configured file system, containing the 
serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(JobConf job, Path path) {
+    OutputConfigurator.setConnectorInfo(CLASS, job, path);
+  }
+  
+  /**
    * Determines if the connector has been configured.
    * 
    * @param job
@@ -90,6 +113,7 @@ public class AccumuloOutputFormat implem
    * @return true if the connector has been configured, false otherwise
    * @since 1.5.0
    * @see #setConnectorInfo(JobConf, AccumuloToken)
+   * @see #setConnectorInfo(JobConf, Path)
    */
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return OutputConfigurator.isConnectorInfoSet(CLASS, job);
@@ -104,6 +128,7 @@ public class AccumuloOutputFormat implem
    * @return the decoded user token
    * @since 1.5.0
    * @see #setConnectorInfo(JobConf, AccumuloToken)
+   * @see #setConnectorInfo(JobConf, Path)
    */
   protected static AccumuloToken<?,?> getToken(JobConf job) {
     return OutputConfigurator.getToken(CLASS, job);

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -23,7 +23,9 @@ import org.apache.accumulo.core.client.R
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -38,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, byte[])}
+ * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, AccumuloToken)} 
OR {@link AccumuloRowInputFormat#setConnectorInfo(JobConf, Path)}
  * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)}
  * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, 
Authorizations)}
  * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, String, 
String)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)}

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java
 Sat Jan 26 03:11:36 2013
@@ -54,8 +54,11 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.TokenHelper;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -83,10 +86,14 @@ public abstract class InputFormatBase<K,
   /**
    * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
+   * <p>
+   * <b>WARNING:</b> The serialized token is stored in the configuration and 
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+   * conversion to a string, and is not intended to be secure.
+   * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param token
-   *          a valid AccumuloToken (user must have Table.CREATE permission)
+   *          a valid AccumuloToken (principal must have Table.CREATE 
permission)
    * @since 1.5.0
    */
   public static void setConnectorInfo(JobConf job, AccumuloToken<?,?> token) {
@@ -94,6 +101,22 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * Sets the connector information needed to communicate with Accumulo in 
this job. The authentication information will be read from the specified file 
when
+   * the job runs. This prevents the user's token from being exposed on the 
Job Tracker web page. The specified path will be placed in the
+   * {@link DistributedCache}, for better performance during job execution. 
Users can create the contents of this file using
+   * {@link TokenHelper#asBase64String(AccumuloToken)}.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param path
+   *          the path to a file in the configured file system, containing the 
serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(JobConf job, Path path) {
+    InputConfigurator.setConnectorInfo(CLASS, job, path);
+  }
+  
+  /**
    * Determines if the connector has been configured.
    * 
    * @param job
@@ -101,6 +124,7 @@ public abstract class InputFormatBase<K,
    * @return true if the connector has been configured, false otherwise
    * @since 1.5.0
    * @see #setConnectorInfo(JobConf, AccumuloToken)
+   * @see #setConnectorInfo(JobConf, Path)
    */
   protected static Boolean isConnectorInfoSet(JobConf job) {
     return InputConfigurator.isConnectorInfoSet(CLASS, job);
@@ -115,6 +139,7 @@ public abstract class InputFormatBase<K,
    * @return the decoded user Token
    * @since 1.5.0
    * @see #setConnectorInfo(JobConf, AccumuloToken)
+   * @see #setConnectorInfo(JobConf, Path)
    */
   protected static AccumuloToken<?,?> getToken(JobConf job) {
     return InputConfigurator.getToken(CLASS, job);

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -22,7 +22,9 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -36,7 +38,7 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, AccumuloToken)} OR 
{@link AccumuloInputFormat#setConnectorInfo(Job, Path)}
  * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
  * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
  * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} 
OR {@link AccumuloInputFormat#setMockInstance(Job, String)}

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -42,8 +42,11 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.TokenHelper;
 import org.apache.accumulo.core.security.tokens.UserPassToken;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -62,7 +65,7 @@ import org.apache.log4j.Logger;
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, AccumuloToken)} OR 
{@link AccumuloOutputFormat#setConnectorInfo(Job, Path)}
  * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, String, String)} 
OR {@link AccumuloOutputFormat#setMockInstance(Job, String)}
  * </ul>
  * 
@@ -76,10 +79,14 @@ public class AccumuloOutputFormat extend
   /**
    * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
+   * <p>
+   * <b>WARNING:</b> The serialized token is stored in the configuration and 
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+   * conversion to a string, and is not intended to be secure.
+   * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param token
-   *          a valid AccumuloToken (principal must have Table.CREATE 
permission if {@link #setCreateTables(Job, boolean)} is set to true)
+   *          a valid AccumuloToken (principal must have Table.CREATE 
permission)
    * @since 1.5.0
    */
   public static void setConnectorInfo(Job job, AccumuloToken<?,?> token) {
@@ -87,13 +94,30 @@ public class AccumuloOutputFormat extend
   }
   
   /**
+   * Sets the connector information needed to communicate with Accumulo in 
this job. The authentication information will be read from the specified file 
when
+   * the job runs. This prevents the user's token from being exposed on the 
Job Tracker web page. The specified path will be placed in the
+   * {@link DistributedCache}, for better performance during job execution. 
Users can create the contents of this file using
+   * {@link TokenHelper#asBase64String(AccumuloToken)}.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param path
+   *          the path to a file in the configured file system, containing the 
serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(Job job, Path path) {
+    OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path);
+  }
+  
+  /**
    * Determines if the connector has been configured.
    * 
    * @param context
    *          the Hadoop context for the configured job
    * @return true if the connector has been configured, false otherwise
    * @since 1.5.0
-   * @see #setConnectorInfo(Job, String, byte[])
+   * @see #setConnectorInfo(Job, AccumuloToken)
+   * @see #setConnectorInfo(Job, Path)
    */
   protected static Boolean isConnectorInfoSet(JobContext context) {
     return OutputConfigurator.isConnectorInfoSet(CLASS, 
context.getConfiguration());
@@ -107,11 +131,12 @@ public class AccumuloOutputFormat extend
    * @return the AccumuloToken
    * @since 1.5.0
    * @see #setConnectorInfo(Job, AccumuloToken)
+   * @see #setConnectorInfo(Job, Path)
    */
   protected static AccumuloToken<?,?> getToken(JobContext context) {
     return OutputConfigurator.getToken(CLASS, context.getConfiguration());
   }
-
+  
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -487,8 +512,8 @@ public class AccumuloOutputFormat extend
   // 
----------------------------------------------------------------------------------------------------
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, String, 
byte[])}, {@link #setCreateTables(Job, boolean)}, and
-   *             {@link #setDefaultTableName(Job, String)} instead.
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, 
AccumuloToken)}, {@link #setConnectorInfo(Job, Path)}, {@link 
#setCreateTables(Job, boolean)},
+   *             and {@link #setDefaultTableName(Job, String)} instead.
    */
   @Deprecated
   public static void setOutputInfo(Configuration conf, String user, byte[] 
passwd, boolean createTables, String defaultTable) {
@@ -560,7 +585,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
    */
   @Deprecated
   protected static String getUsername(Configuration conf) {
@@ -568,7 +593,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
    */
   @Deprecated
   protected static byte[] getPassword(Configuration conf) {

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java
 Sat Jan 26 03:11:36 2013
@@ -23,7 +23,9 @@ import org.apache.accumulo.core.client.R
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.tokens.AccumuloToken;
 import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -38,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskA
  * The user must specify the following via static configurator methods:
  * 
  * <ul>
- * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, byte[])}
+ * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, AccumuloToken)} OR 
{@link AccumuloRowInputFormat#setConnectorInfo(Job, Path)}
  * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)}
  * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, 
Authorizations)}
  * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, String, 
String)} OR {@link AccumuloRowInputFormat#setMockInstance(Job, String)}

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
 Sat Jan 26 03:11:36 2013
@@ -63,10 +63,13 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.tokens.AccumuloToken;
+import org.apache.accumulo.core.security.tokens.TokenHelper;
 import org.apache.accumulo.core.security.tokens.UserPassToken;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -96,6 +99,10 @@ public abstract class InputFormatBase<K,
   /**
    * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
+   * <p>
+   * <b>WARNING:</b> The serialized token is stored in the configuration and 
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+   * conversion to a string, and is not intended to be secure.
+   * 
    * @param job
    *          the Hadoop job instance to be configured
    * @param token
@@ -107,6 +114,22 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * Sets the connector information needed to communicate with Accumulo in 
this job. The authentication information will be read from the specified file 
when
+   * the job runs. This prevents the user's token from being exposed on the 
Job Tracker web page. The specified path will be placed in the
+   * {@link DistributedCache}, for better performance during job execution. 
Users can create the contents of this file using
+   * {@link TokenHelper#asBase64String(AccumuloToken)}.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param path
+   *          the path to a file in the configured file system, containing the 
serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(Job job, Path path) {
+    InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), path);
+  }
+  
+  /**
    * Determines if the connector has been configured.
    * 
    * @param context
@@ -114,6 +137,7 @@ public abstract class InputFormatBase<K,
    * @return true if the connector has been configured, false otherwise
    * @since 1.5.0
    * @see #setConnectorInfo(Job, AccumuloToken)
+   * @see #setConnectorInfo(Job, Path)
    */
   protected static Boolean isConnectorInfoSet(JobContext context) {
     return InputConfigurator.isConnectorInfoSet(CLASS, 
context.getConfiguration());
@@ -127,11 +151,12 @@ public abstract class InputFormatBase<K,
    * @return the user name
    * @since 1.5.0
    * @see #setConnectorInfo(Job, AccumuloToken)
+   * @see #setConnectorInfo(Job, Path)
    */
   protected static AccumuloToken<?,?> getToken(JobContext context) {
     return InputConfigurator.getToken(CLASS, context.getConfiguration());
   }
-
+  
   /**
    * Configures a {@link ZooKeeperInstance} for this job.
    * 
@@ -549,8 +574,7 @@ public abstract class InputFormatBase<K,
         log.debug("Creating scanner for table: " + getInputTableName(attempt));
         log.debug("Authorizations are: " + authorizations);
         if (isOfflineScan(attempt)) {
-          scanner = new OfflineScanner(instance, token, 
Tables.getTableId(instance, getInputTableName(attempt)),
-              authorizations);
+          scanner = new OfflineScanner(instance, token, 
Tables.getTableId(instance, getInputTableName(attempt)), authorizations);
         } else {
           scanner = conn.createScanner(getInputTableName(attempt), 
authorizations);
         }
@@ -946,7 +970,7 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, 
AccumuloToken}, {@link #setInputTableName(Job, String)}, and
+   * @deprecated since 1.5.0; Use {@link #setConnectorInfo(Job, 
AccumuloToken)}, {@link #setInputTableName(Job, String)}, and
    *             {@link #setScanAuthorizations(Job, Authorizations)} instead.
    */
   @Deprecated
@@ -1047,7 +1071,7 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getUsername(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
    */
   @Deprecated
   protected static String getUsername(Configuration conf) {
@@ -1055,7 +1079,7 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getPassword(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getToken(JobContext)} instead.
    */
   @Deprecated
   protected static byte[] getPassword(Configuration conf) {

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java?rev=1438827&r1=1438826&r2=1438827&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/util/ConfiguratorBase.java
 Sat Jan 26 03:11:36 2013
@@ -16,6 +16,13 @@
  */
 package org.apache.accumulo.core.client.mapreduce.lib.util;
 
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Scanner;
+
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -23,6 +30,8 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.tokens.TokenHelper;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -38,7 +47,7 @@ public class ConfiguratorBase {
    * @since 1.5.0
    */
   public static enum ConnectorInfo {
-    IS_CONFIGURED, TOKEN
+    IS_CONFIGURED, TOKEN, TOKEN_IS_CACHE_FILE
   }
   
   /**
@@ -76,6 +85,10 @@ public class ConfiguratorBase {
   /**
    * Sets the connector information needed to communicate with Accumulo in 
this job.
    * 
+   * <p>
+   * <b>WARNING:</b> The serialized token is stored in the configuration and 
shared with all MapReduce tasks. It is BASE64 encoded to provide a charset safe
+   * conversion to a string, and is not intended to be secure.
+   * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property 
configuration key
    * @param conf
@@ -90,10 +103,37 @@ public class ConfiguratorBase {
     
     ArgumentChecker.notNull(token);
     conf.setBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.IS_CONFIGURED), true);
+    conf.setBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.TOKEN_IS_CACHE_FILE), false);
     conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), 
TokenHelper.asBase64String(token));
   }
   
   /**
+   * Sets the connector information needed to communicate with Accumulo in 
this job. The authentication information will be read from the specified file 
when
+   * the job runs. This prevents the user's token from being exposed on the 
Job Tracker web page. The specified path will be placed in the
+   * {@link DistributedCache}, for better performance during job execution. 
Users can create the contents of this file using
+   * {@link TokenHelper#asBase64String(AccumuloToken)}.
+   * 
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param path
+   *          the path to a file in the configured file system, containing the 
serialized, base-64 encoded {@link AccumuloToken} with the user's authentication
+   * @since 1.5.0
+   */
+  public static void setConnectorInfo(Class<?> implementingClass, 
Configuration conf, Path path) {
+    if (isConnectorInfoSet(implementingClass, conf))
+      throw new IllegalStateException("Connector info for " + 
implementingClass.getSimpleName() + " can only be set once per job");
+    
+    ArgumentChecker.notNull(path);
+    URI uri = path.toUri();
+    conf.setBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.IS_CONFIGURED), true);
+    conf.setBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.TOKEN_IS_CACHE_FILE), true);
+    DistributedCache.addCacheFile(uri, conf);
+    conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), 
uri.getPath());
+  }
+  
+  /**
    * Determines if the connector info has already been set for this instance.
    * 
    * @param implementingClass
@@ -102,14 +142,15 @@ public class ConfiguratorBase {
    *          the Hadoop configuration object to configure
    * @return true if the connector info has already been set, false otherwise
    * @since 1.5.0
+   * @see #setConnectorInfo(Class, Configuration, AccumuloToken)
+   * @see #setConnectorInfo(Class, Configuration, Path)
    */
   public static Boolean isConnectorInfoSet(Class<?> implementingClass, 
Configuration conf) {
     return conf.getBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.IS_CONFIGURED), false);
   }
   
   /**
-   * Gets the AccumuloToken from the configuration. WARNING: The serialized 
Token is stored in the Configuration and shared with all MapReduce tasks; It is
-   * BASE64 encoded to provide a charset safe conversion to a string, and is 
not intended to be secure.
+   * Gets the AccumuloToken from the configuration.
    * 
    * @param implementingClass
    *          the class whose name will be used as a prefix for the property 
configuration key
@@ -118,9 +159,42 @@ public class ConfiguratorBase {
    * @return the AccumuloToken
    * @since 1.5.0
    * @see #setConnectorInfo(Class, Configuration, AccumuloToken)
+   * @see #setConnectorInfo(Class, Configuration, Path)
    */
   public static AccumuloToken<?,?> getToken(Class<?> implementingClass, 
Configuration conf) {
-    return 
TokenHelper.fromBase64String(conf.get(enumToConfKey(implementingClass, 
ConnectorInfo.TOKEN)));
+    if (!isConnectorInfoSet(implementingClass, conf))
+      throw new IllegalStateException("Connector info for " + 
implementingClass.getSimpleName() + " has not been set");
+    
+    String token = conf.get(enumToConfKey(implementingClass, 
ConnectorInfo.TOKEN));
+    
+    if (conf.getBoolean(enumToConfKey(implementingClass, 
ConnectorInfo.TOKEN_IS_CACHE_FILE), false)) {
+      String tokenFile = token;
+      token = null;
+      
+      try {
+        Path[] cf = DistributedCache.getLocalCacheFiles(conf);
+        if (cf != null) {
+          for (Path path : cf) {
+            if 
(path.toUri().getPath().endsWith(tokenFile.substring(tokenFile.lastIndexOf('/'))))
 {
+              StringBuilder fileContents = new StringBuilder();
+              Scanner in = new Scanner(new BufferedReader(new 
FileReader(path.toString())));
+              try {
+                while (in.hasNextLine())
+                  fileContents.append(in.nextLine());
+              } finally {
+                in.close();
+              }
+              token = fileContents.toString();
+              break;
+            }
+          }
+        }
+        throw new FileNotFoundException(tokenFile + " not found in distributed 
cache");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return TokenHelper.fromBase64String(token);
   }
   
   /**


Reply via email to