Author: rohini
Date: Sat Jan 31 00:24:45 2015
New Revision: 1656128

URL: http://svn.apache.org/r1656128
Log:
PIG-4404: LOAD with HBaseStorage on secure cluster is broken in Tez (rohini)

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
    
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Sat Jan 31 00:24:45 2015
@@ -26,6 +26,8 @@ IMPROVEMENTS
 
 BUG FIXES
 
+PIG-4404: LOAD with HBaseStorage on secure cluster is broken in Tez (rohini)
+
 PIG-4375: ObjectCache should use ProcessorContext.getObjectRegistry() (rohini)
 
 PIG-4334: PigProcessor does not set pig.datetime.default.tz (rohini)

Modified: 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 (original)
+++ 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 Sat Jan 31 00:24:45 2015
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
@@ -58,7 +57,7 @@ public class TezJobCompiler {
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, 
LocalResource> localResources)
             throws IOException, YarnException {
         DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
-        tezDag.setCredentials(new Credentials());
+        tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials());
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, 
tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         return tezDag;

Modified: 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 (original)
+++ 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 Sat Jan 31 00:24:45 2015
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -54,7 +55,14 @@ public class TezOperPlan extends Operato
 
     private int estimatedTotalParallelism = -1;
 
+    private Credentials creds;
+
     public TezOperPlan() {
+        creds = new Credentials();
+    }
+
+    public Credentials getCredentials() {
+        return creds;
     }
 
     public int getEstimatedTotalParallelism() {

Modified: 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 (original)
+++ 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 Sat Jan 31 00:24:45 2015
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
@@ -47,13 +48,28 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 
 public class LoaderProcessor extends TezOpPlanVisitor {
-    private Configuration conf;
+
+    private static final Log LOG = LogFactory.getLog(LoaderProcessor.class);
+
+    private TezOperPlan tezOperPlan;
+    private JobConf jobConf;
     private PigContext pc;
-    private static final Log log = LogFactory.getLog(LoaderProcessor.class);
-    public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
+
+    public LoaderProcessor(TezOperPlan plan, PigContext pigContext) throws 
VisitorException {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        this.tezOperPlan = plan;
         this.pc = pigContext;
-        this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());;
+        this.jobConf = new 
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties()));
+        // This ensures that the same credentials object is used by reference 
everywhere
+        this.jobConf.setCredentials(tezOperPlan.getCredentials());
+        this.jobConf.setBoolean("mapred.mapper.new-api", true);
+        this.jobConf.setClass("mapreduce.inputformat.class",
+                PigInputFormat.class, InputFormat.class);
+        try {
+            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
     }
 
     /**
@@ -76,22 +92,25 @@ public class LoaderProcessor extends Tez
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
         ArrayList<Long> inpLimits = new ArrayList<Long>();
 
-        Job job = Job.getInstance(conf);
-        conf = job.getConfiguration();
-        conf.setBoolean("mapred.mapper.new-api", true);
-        conf.setClass("mapreduce.inputformat.class",
-                PigInputFormat.class, InputFormat.class);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
         List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
                 POLoad.class);
 
+        Job job = Job.getInstance(jobConf);
+        Configuration conf = job.getConfiguration();
+
         if (lds != null && lds.size() > 0) {
-            for (POLoad ld : lds) {
-                LoadFunc lf = ld.getLoadFunc();
-                lf.setLocation(ld.getLFile().getFileName(), job);
+            if (lds.size() == 1) {
+                for (POLoad ld : lds) {
+                    LoadFunc lf = ld.getLoadFunc();
+                    lf.setLocation(ld.getLFile().getFileName(), job);
 
-                // Store the inp filespecs
-                inp.add(ld.getLFile());
+                    // Store the inp filespecs
+                    inp.add(ld.getLFile());
+                }
+            } else {
+                throw new VisitorException(
+                        "There is more than one load for TezOperator "
+                                + tezOp);
             }
         }
 
@@ -139,7 +158,7 @@ public class LoaderProcessor extends Tez
                 try {
                     maxCombinedSplitSize = Long.parseLong(tmp);
                 } catch (NumberFormatException e) {
-                    log.warn("Invalid numeric format for 
pig.maxCombinedSplitSize; use the default maximum combined split size");
+                    LOG.warn("Invalid numeric format for 
pig.maxCombinedSplitSize; use the default maximum combined split size");
                 }
             }
             if (maxCombinedSplitSize > 0)
@@ -159,6 +178,7 @@ public class LoaderProcessor extends Tez
         try {
             tezOp.getLoaderInfo().setLoads(processLoads(tezOp));
         } catch (Exception e) {
+            e.printStackTrace();
             throw new VisitorException(e);
         }
     }

Modified: 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 (original)
+++ 
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 Sat Jan 31 00:24:45 2015
@@ -16,8 +16,6 @@
  */
 package org.apache.pig.backend.hadoop.hbase;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -87,6 +85,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import 
org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -788,7 +787,9 @@ public class HBaseStorage extends LoadFu
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main 
hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // 
main hbase jar or hbase-server
-        classList.add(com.google.common.collect.Lists.class); // guava
+        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 
0.23/2 itself has guava
+            classList.add(com.google.common.collect.Lists.class); // guava
+        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
@@ -1132,9 +1133,10 @@ public class HBaseStorage extends LoadFu
         return new RequiredFieldResponse(true);
     }
 
+    @Override
     public void ensureAllKeyInstancesInSameSplit() throws IOException {
-        /** 
-         * no-op because hbase keys are unique 
+        /**
+         * no-op because hbase keys are unique
          * This will also work with things like 
DelimitedKeyPrefixRegionSplitPolicy
          * if you need a partial key match to be included in the split
          */
@@ -1149,7 +1151,7 @@ public class HBaseStorage extends LoadFu
             throw new RuntimeException("LoadFunc expected split of type 
TableSplit but was " + split.getClass().getName());
         }
     }
- 
+
 
     /**
      * Class to encapsulate logic around which column names were specified in 
each


Reply via email to