Author: rohini
Date: Sat Jan 31 00:24:45 2015
New Revision: 1656128
URL: http://svn.apache.org/r1656128
Log:
PIG-4404: LOAD with HBaseStorage on secure cluster is broken in Tez (rohini)
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Sat Jan 31 00:24:45 2015
@@ -26,6 +26,8 @@ IMPROVEMENTS
BUG FIXES
+PIG-4404: LOAD with HBaseStorage on secure cluster is broken in Tez (rohini)
+
PIG-4375: ObjectCache should use ProcessorContext.getObjectRegistry() (rohini)
PIG-4334: PigProcessor does not set pig.datetime.default.tz (rohini)
Modified:
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
---
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
(original)
+++
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
Sat Jan 31 00:24:45 2015
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.pig.PigException;
@@ -58,7 +57,7 @@ public class TezJobCompiler {
public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String,
LocalResource> localResources)
throws IOException, YarnException {
DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
- tezDag.setCredentials(new Credentials());
+ tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials());
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext,
tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
return tezDag;
Modified:
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
---
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
(original)
+++
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
Sat Jan 31 00:24:45 2015
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -54,7 +55,14 @@ public class TezOperPlan extends Operato
private int estimatedTotalParallelism = -1;
+ private Credentials creds;
+
public TezOperPlan() {
+ creds = new Credentials();
+ }
+
+ public Credentials getCredentials() {
+ return creds;
}
public int getEstimatedTotalParallelism() {
Modified:
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
---
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
(original)
+++
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
Sat Jan 31 00:24:45 2015
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
@@ -47,13 +48,28 @@ import org.apache.pig.impl.util.UDFConte
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
public class LoaderProcessor extends TezOpPlanVisitor {
- private Configuration conf;
+
+ private static final Log LOG = LogFactory.getLog(LoaderProcessor.class);
+
+ private TezOperPlan tezOperPlan;
+ private JobConf jobConf;
private PigContext pc;
- private static final Log log = LogFactory.getLog(LoaderProcessor.class);
- public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
+
+ public LoaderProcessor(TezOperPlan plan, PigContext pigContext) throws
VisitorException {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ this.tezOperPlan = plan;
this.pc = pigContext;
- this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());;
+ this.jobConf = new
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties()));
+ // This ensures that the same credentials object is used by reference
everywhere
+ this.jobConf.setCredentials(tezOperPlan.getCredentials());
+ this.jobConf.setBoolean("mapred.mapper.new-api", true);
+ this.jobConf.setClass("mapreduce.inputformat.class",
+ PigInputFormat.class, InputFormat.class);
+ try {
+ this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
}
/**
@@ -76,22 +92,25 @@ public class LoaderProcessor extends Tez
ArrayList<String> inpSignatureLists = new ArrayList<String>();
ArrayList<Long> inpLimits = new ArrayList<Long>();
- Job job = Job.getInstance(conf);
- conf = job.getConfiguration();
- conf.setBoolean("mapred.mapper.new-api", true);
- conf.setClass("mapreduce.inputformat.class",
- PigInputFormat.class, InputFormat.class);
- conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
POLoad.class);
+ Job job = Job.getInstance(jobConf);
+ Configuration conf = job.getConfiguration();
+
if (lds != null && lds.size() > 0) {
- for (POLoad ld : lds) {
- LoadFunc lf = ld.getLoadFunc();
- lf.setLocation(ld.getLFile().getFileName(), job);
+ if (lds.size() == 1) {
+ for (POLoad ld : lds) {
+ LoadFunc lf = ld.getLoadFunc();
+ lf.setLocation(ld.getLFile().getFileName(), job);
- // Store the inp filespecs
- inp.add(ld.getLFile());
+ // Store the inp filespecs
+ inp.add(ld.getLFile());
+ }
+ } else {
+ throw new VisitorException(
+ "There is more than one load for TezOperator "
+ + tezOp);
}
}
@@ -139,7 +158,7 @@ public class LoaderProcessor extends Tez
try {
maxCombinedSplitSize = Long.parseLong(tmp);
} catch (NumberFormatException e) {
- log.warn("Invalid numeric format for
pig.maxCombinedSplitSize; use the default maximum combined split size");
+ LOG.warn("Invalid numeric format for
pig.maxCombinedSplitSize; use the default maximum combined split size");
}
}
if (maxCombinedSplitSize > 0)
@@ -159,6 +178,7 @@ public class LoaderProcessor extends Tez
try {
tezOp.getLoaderInfo().setLoads(processLoads(tezOp));
} catch (Exception e) {
+ e.printStackTrace();
throw new VisitorException(e);
}
}
Modified:
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL:
http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1656128&r1=1656127&r2=1656128&view=diff
==============================================================================
---
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(original)
+++
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Sat Jan 31 00:24:45 2015
@@ -16,8 +16,6 @@
*/
package org.apache.pig.backend.hadoop.hbase;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -87,6 +85,7 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import
org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
@@ -788,7 +787,9 @@ public class HBaseStorage extends LoadFu
List<Class> classList = new ArrayList<Class>();
classList.add(org.apache.hadoop.hbase.client.HTable.class); // main
hbase jar or hbase-client
classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); //
main hbase jar or hbase-server
- classList.add(com.google.common.collect.Lists.class); // guava
+ if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop
0.23/2 itself has guava
+ classList.add(com.google.common.collect.Lists.class); // guava
+ }
classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
addClassToList("org.cloudera.htrace.Trace", classList); // htrace
@@ -1132,9 +1133,10 @@ public class HBaseStorage extends LoadFu
return new RequiredFieldResponse(true);
}
+ @Override
public void ensureAllKeyInstancesInSameSplit() throws IOException {
- /**
- * no-op because hbase keys are unique
+ /**
+ * no-op because hbase keys are unique
* This will also work with things like
DelimitedKeyPrefixRegionSplitPolicy
* if you need a partial key match to be included in the split
*/
@@ -1149,7 +1151,7 @@ public class HBaseStorage extends LoadFu
throw new RuntimeException("LoadFunc expected split of type
TableSplit but was " + split.getClass().getName());
}
}
-
+
/**
* Class to encapsulate logic around which column names were specified in
each