Author: rohini
Date: Thu Jun 25 20:59:31 2015
New Revision: 1687646
URL: http://svn.apache.org/r1687646
Log:
PIG-4574: Eliminate identity vertex for order by and skewed join right after
LOAD (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jun 25 20:59:31 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4574: Eliminate identity vertex for order by and skewed join right after
LOAD (rohini)
+
PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)
PIG-4570: Allow AvroStorage to use a class for the schema (pmazak via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Thu Jun 25 20:59:31 2015
@@ -1485,10 +1485,19 @@ public class TezCompiler extends PhyPlan
POPoissonSample poSample = new POPoissonSample(new
OperatorKey(scope,nig.getNextNodeId(scope)),
-1, sampleRate, heapPerc, totalMemory);
- TezOperator prevOp = compiledInputs[0];
- prevOp.plan.addAsLeaf(lrTez);
- prevOp.plan.addAsLeaf(poSample);
- prevOp.markSampler();
+ TezOperator samplerOper = compiledInputs[0];
+ boolean writeDataForPartitioner =
shouldWriteDataForPartitioner(samplerOper);
+
+ PhysicalPlan partitionerPlan = null;
+ if (writeDataForPartitioner) {
+ samplerOper.plan.addAsLeaf(lrTez);
+ } else {
+ partitionerPlan = samplerOper.plan.clone();
+ partitionerPlan.addAsLeaf(lrTez);
+ }
+
+ samplerOper.plan.addAsLeaf(poSample);
+ samplerOper.markSampler();
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans =
op.getJoinPlans();
List<PhysicalOperator> l = plan.getPredecessors(op);
@@ -1532,9 +1541,9 @@ public class TezCompiler extends PhyPlan
// This foreach will pick the sort key columns from the
POPoissonSample output
POForEach nfe1 = new POForEach(new
OperatorKey(scope,nig.getNextNodeId(scope)),
-1, eps1, flat1);
- prevOp.plan.addAsLeaf(nfe1);
- prevOp.plan.addAsLeaf(lrTezSample);
- prevOp.setClosed(true);
+ samplerOper.plan.addAsLeaf(nfe1);
+ samplerOper.plan.addAsLeaf(lrTezSample);
+ samplerOper.setClosed(true);
int rp = op.getRequestedParallelism();
if (rp == -1) {
@@ -1557,10 +1566,9 @@ public class TezCompiler extends PhyPlan
compiledInputs = new TezOperator[] {joinInputs[0]};
- blocking();
-
- // Add a POIdentityInOutTez to the joinJobs[0] which is a
partition vertex.
- // It just partitions the data from first vertex based on the
quantiles from sample vertex.
+ // Add a partitioner vertex that partitions the data based on the
quantiles from sample vertex.
+ curTezOp = getTezOp();
+ tezPlan.add(curTezOp);
joinJobs[0] = curTezOp;
try {
@@ -1578,15 +1586,38 @@ public class TezCompiler extends PhyPlan
}
lrTez.setKeyType(type);
lrTez.setPlans(groups);
- lrTez.setSkewedJoin(true);
lrTez.setResultType(DataType.TUPLE);
- POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
- OperatorKey.genOpKey(scope), lrTez);
- identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
- joinJobs[0].plan.addAsLeaf(identityInOutTez);
+ POLocalRearrangeTez partitionerLR = null;
+ if (!writeDataForPartitioner) {
+ // Read input from hdfs again
+ joinJobs[0].plan = partitionerPlan;
+ partitionerLR = lrTez;
+ lrTez.setSkewedJoin(true);
+ } else {
+ // Add a POIdentityInOutTez which just passes data through
from sampler vertex
+ partitionerLR = new POIdentityInOutTez(
+ OperatorKey.genOpKey(scope),
+ lrTez,
+ samplerOper.getOperatorKey().toString());
+ partitionerLR.setSkewedJoin(true);
+ joinJobs[0].plan.addAsLeaf(partitionerLR);
+
+ // Connect the sampler vertex to the partitioner vertex
+ lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
samplerOper, joinJobs[0]);
+ // TODO: PIG-3775 unsorted shuffle
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = UnorderedKVOutput.class.getName();
+ edge.inputClassName = UnorderedKVInput.class.getName();
+ // If prevOp.requestedParallelism changes based on no. of
input splits
+ // it will reflect for joinJobs[0] so that 1-1 edge will work.
+ joinJobs[0].setRequestedParallelismByReference(samplerOper);
+ }
joinJobs[0].setClosed(true);
joinJobs[0].markSampleBasedPartitioner();
+ joinJobs[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
rearrangeOutputs[0] = joinJobs[0];
compiledInputs = new TezOperator[] {joinInputs[1]};
@@ -1663,23 +1694,11 @@ public class TezCompiler extends PhyPlan
fe.visit(this);
// Connect vertices
- lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
-
identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString());
+
partitionerLR.setOutputKey(joinJobs[2].getOperatorKey().toString());
pr.setOutputKey(joinJobs[2].getOperatorKey().toString());
- TezEdgeDescriptor edge =
joinJobs[0].inEdges.get(prevOp.getOperatorKey());
- joinJobs[0].setUseMRMapSettings(prevOp.isUseMRMapSettings());
- // TODO: Convert to unsorted shuffle after TEZ-661
- // Use 1-1 edge
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = UnorderedKVOutput.class.getName();
- edge.inputClassName = UnorderedKVInput.class.getName();
- // If prevOp.requestedParallelism changes based on no. of input
splits
- // it will reflect for joinJobs[0] so that 1-1 edge will work.
- joinJobs[0].setRequestedParallelismByReference(prevOp);
-
- TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
+ TezCompilerUtil.connect(tezPlan, samplerOper, sampleJobPair.first);
POValueOutputTez sampleOut = (POValueOutputTez)
sampleJobPair.first.plan.getLeaves().get(0);
for (int i = 0; i <= 2; i++) {
@@ -1687,16 +1706,16 @@ public class TezCompiler extends PhyPlan
// We need to send sample to left relation partitioner
vertex, right relation load vertex,
// and join vertex (IsFirstReduceOfKey in join vertex need
sample file as well)
joinJobs[i].setSampleOperator(sampleJobPair.first);
-
+
// Configure broadcast edges for distribution map
- edge = TezCompilerUtil.connect(tezPlan,
sampleJobPair.first, joinJobs[i]);
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
sampleJobPair.first, joinJobs[i]);
TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
}
// Configure skewed partitioner for join
if (i != 2) {
- edge =
joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+ TezEdgeDescriptor edge =
joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
edge.partitionerClass = SkewedPartitionerTez.class;
}
}
@@ -1726,55 +1745,71 @@ public class TezCompiler extends PhyPlan
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
- /**
- * Force an end to the current vertex with store and sample
- * @return Tez operator that now is finished with a store.
- * @throws PlanException
- */
- private TezOperator endSingleInputWithStoreAndSample(
- POSort sort,
- POLocalRearrangeTez lr,
- POLocalRearrangeTez lrSample,
- byte keyType,
- Pair<POProject, Byte>[] fields) throws PlanException {
- if(compiledInputs.length>1) {
- int errCode = 2023;
- String msg = "Received a multi input plan when expecting only a
single input one.";
- throw new PlanException(msg, errCode, PigException.BUG);
+ private boolean shouldWriteDataForPartitioner(TezOperator samplerOper) {
+ // If there are operators other than load and foreach (like filter
+ // split, etc) in the plan, then process and write the data out
+ // and save the cost of processing in the partitioner vertex
+ // Else read from hdfs again save the IO cost of the extra write
+ boolean writeDataForPartitioner = false;
+ if (samplerOper.plan.getRoots().get(0) instanceof POLoad) {
+ for (PhysicalOperator oper : samplerOper.plan) {
+ if (oper instanceof POForEach || oper instanceof POLoad) {
+ continue;
+ }
+ writeDataForPartitioner = true;
+ break;
+ }
+ } else {
+ writeDataForPartitioner = true;
}
- TezOperator oper = compiledInputs[0];
- if (!oper.isClosed()) {
+ return writeDataForPartitioner;
+ }
- List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
- if (fields == null) {
- // This is project *
- PhysicalPlan ep = new PhysicalPlan();
- POProject prj = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setStar(true);
- prj.setOverloaded(false);
- prj.setResultType(DataType.TUPLE);
- ep.add(prj);
- eps.add(ep);
- } else {
- // Attach the sort plans to the local rearrange to get the
- // projection.
- eps.addAll(sort.getSortPlans());
- }
+ /**
+ * Get LocalRearrange for POSort input
+ */
+ private POLocalRearrangeTez getLocalRearrangeForSortInput(POSort sort,
+ byte keyType, Pair<POProject, Byte>[] fields)
+ throws PlanException {
+ POLocalRearrangeTez lr = new
POLocalRearrangeTez(OperatorKey.genOpKey(scope));
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ if (fields == null) {
+ // This is project *
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ eps.add(ep);
+ } else {
+ // Attach the sort plans to the local rearrange to get the
+ // projection.
+ eps.addAll(sort.getSortPlans());
+ }
+
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created
POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
keyType);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+ lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+ return lr;
+ }
- try {
- lr.setIndex(0);
- } catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created
POLocalRearrange.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
- lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE
: keyType);
- lr.setPlans(eps);
- lr.setResultType(DataType.TUPLE);
- lr.addOriginalLocation(sort.getAlias(),
sort.getOriginalLocations());
+ /**
+ * Add a sampler to the sort input
+ */
+ private POLocalRearrangeTez addSamplingToSortInput(POSort sort,
TezOperator oper,
+ byte keyType, Pair<POProject, Byte>[] fields) throws PlanException
{
- lr.setOutputKey(curTezOp.getOperatorKey().toString());
- oper.plan.addAsLeaf(lr);
+ POLocalRearrangeTez lrSample =
localRearrangeFactory.create(LocalRearrangeType.NULL);
+ if (!oper.isClosed()) {
List<Boolean> flat1 = new ArrayList<Boolean>();
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -1875,7 +1910,8 @@ public class TezCompiler extends PhyPlan
String msg = "The current operator is closed. This is unexpected
while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
- return oper;
+ oper.markSampler();
+ return lrSample;
}
private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob(
@@ -2112,32 +2148,42 @@ public class TezCompiler extends PhyPlan
private TezOperator[] getSortJobs(
TezOperator inputOper,
+ PhysicalPlan partitionerPlan,
POLocalRearrangeTez inputOperRearrange,
POSort sort,
byte keyType,
Pair<POProject, Byte>[] fields) throws PlanException{
+
TezOperator[] opers = new TezOperator[2];
+
+ // Partitioner Vertex
TezOperator oper1 = getTezOp();
tezPlan.add(oper1);
opers[0] = oper1;
-
- POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
- OperatorKey.genOpKey(scope),
- inputOperRearrange);
- identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
- oper1.plan.addAsLeaf(identityInOutTez);
+ POLocalRearrangeTez partitionerLR = null;
+ if (partitionerPlan != null) {
+ // Read from hdfs again
+ oper1.plan = partitionerPlan;
+ partitionerLR = inputOperRearrange;
+ } else {
+ partitionerLR = new POIdentityInOutTez(
+ OperatorKey.genOpKey(scope),
+ inputOperRearrange,
+ inputOper.getOperatorKey().toString());
+ oper1.plan.addAsLeaf(partitionerLR);
+ }
oper1.setClosed(true);
oper1.markSampleBasedPartitioner();
+ // Global Sort Vertex
TezOperator oper2 = getTezOp();
+ partitionerLR.setOutputKey(oper2.getOperatorKey().toString());
oper2.markGlobalSort();
opers[1] = oper2;
tezPlan.add(oper2);
long limit = sort.getLimit();
-
boolean[] sortOrder;
-
List<Boolean> sortOrderList = sort.getMAscCols();
if(sortOrderList != null) {
sortOrder = new boolean[sortOrderList.size()];
@@ -2147,8 +2193,6 @@ public class TezCompiler extends PhyPlan
oper2.setSortOrder(sortOrder);
}
- identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
-
if (limit!=-1) {
POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
pkg_c.setPkgr(new LitePackager());
@@ -2228,6 +2272,13 @@ public class TezCompiler extends PhyPlan
@Override
public void visitSort(POSort op) throws VisitorException {
try{
+
+ if (compiledInputs.length > 1) {
+ int errCode = 2023;
+ String msg = "Received a multi input plan when expecting only
a single input one.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
byte keyType = DataType.UNKNOWN;
@@ -2242,58 +2293,65 @@ public class TezCompiler extends PhyPlan
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
- POLocalRearrangeTez lr = new
POLocalRearrangeTez(OperatorKey.genOpKey(scope));
- POLocalRearrangeTez lrSample =
localRearrangeFactory.create(LocalRearrangeType.NULL);
+ TezOperator samplerOper = compiledInputs[0];
+ boolean writeDataForPartitioner =
shouldWriteDataForPartitioner(samplerOper);
- TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr,
lrSample, keyType, fields);
- prevOper.markSampler();
+ POLocalRearrangeTez lr = getLocalRearrangeForSortInput(op,
keyType, fields);
+ PhysicalPlan partitionerPlan = null;
+ if (writeDataForPartitioner) {
+ samplerOper.plan.addAsLeaf(lr);
+ } else {
+ partitionerPlan = samplerOper.plan.clone();
+ partitionerPlan.addAsLeaf(lr);
+ }
+ // if rp is still -1, let it be, TezParallelismEstimator will set
it to an estimated rp
int rp = op.getRequestedParallelism();
if (rp == -1) {
rp = pigContext.defaultParallel;
}
- // if rp is still -1, let it be, TezParallelismEstimator will set
it to an estimated rp
+ // Add sampling to sort input. Create a sample aggregation
operator and connect both
+ POLocalRearrangeTez lrSample = addSamplingToSortInput(op,
samplerOper, keyType, fields);
Pair<TezOperator, Integer> quantJobParallelismPair =
getOrderbySamplingAggregationJob(op, rp);
- TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType,
fields);
+ TezCompilerUtil.connect(tezPlan, samplerOper,
quantJobParallelismPair.first);
- TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
prevOper, sortOpers[0]);
- sortOpers[0].setUseMRMapSettings(prevOper.isUseMRMapSettings());
+ // Create the partitioner and the global sort vertices
+ TezOperator[] sortOpers = getSortJobs(samplerOper,
partitionerPlan, lr, op, keyType, fields);
+ sortOpers[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
+ if (writeDataForPartitioner) {
+ // Connect the sampler and partitioner vertex
+ lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
samplerOper, sortOpers[0]);
+
+ // Use 1-1 edge
+ edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+ edge.outputClassName = UnorderedKVOutput.class.getName();
+ edge.inputClassName = UnorderedKVInput.class.getName();
+ // If prevOper.requestedParallelism changes based on no. of
input splits
+ // it will reflect for sortOpers[0] so that 1-1 edge will work.
+ sortOpers[0].setRequestedParallelismByReference(samplerOper);
+ }
- // Use 1-1 edge
- edge.dataMovementType = DataMovementType.ONE_TO_ONE;
- edge.outputClassName = UnorderedKVOutput.class.getName();
- edge.inputClassName = UnorderedKVInput.class.getName();
- // If prevOper.requestedParallelism changes based on no. of input
splits
- // it will reflect for sortOpers[0] so that 1-1 edge will work.
- sortOpers[0].setRequestedParallelismByReference(prevOper);
- if (rp==-1) {
+ if (rp == -1) {
quantJobParallelismPair.first.setNeedEstimatedQuantile(true);
}
+ quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
- /*
- // TODO: Convert to unsorted shuffle after TEZ-661
- // edge.outputClassName = UnorderedKVOutput.class.getName();
- // edge.inputClassName = UnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
-
sortOpers[0].setRequestedParallelism(quantJobParallelismPair.second);
-
sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
- */
-
- TezCompilerUtil.connect(tezPlan, prevOper,
quantJobParallelismPair.first);
- lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
-
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
-
- edge = TezCompilerUtil.connect(tezPlan,
quantJobParallelismPair.first, sortOpers[0]);
+ // Broadcast the sample to Partitioner vertex
+ TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
quantJobParallelismPair.first, sortOpers[0]);
TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
POValueOutputTez sampleOut =
(POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
sortOpers[0].setSampleOperator(quantJobParallelismPair.first);
+
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
+
+ // Connect the Partitioner and Global Sort vertex
edge = TezCompilerUtil.connect(tezPlan, sortOpers[0],
sortOpers[1]);
edge.partitionerClass = WeightedRangePartitionerTez.class;
-
curTezOp = sortOpers[1];
// TODO: Review sort udf
@@ -2301,7 +2359,6 @@ public class TezCompiler extends PhyPlan
//
curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
// curTezOp.isUDFComparatorUsed = true;
// }
- quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
// If Order by followed by Limit and parallelism of order by is
not 1
// add a new vertex for Limit with parallelism 1.
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
Thu Jun 25 20:59:31 2015
@@ -58,9 +58,10 @@ public class POIdentityInOutTez extends
private transient boolean shuffleInput;
private transient boolean finished = false;
- public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange) {
+ public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange,
String inputKey) {
super(inputRearrange);
this.mKey = k;
+ this.inputKey = inputKey;
}
public void setInputKey(String inputKey) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
Thu Jun 25 20:59:31 2015
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -144,6 +145,13 @@ public class POLocalRearrangeTez extends
// assign the tuple to its slot in the projection.
key.setIndex(index);
val.setIndex(index);
+ if (isSkewedJoin) {
+ // Wrap into a NullablePartitionWritable to match the
key
+ // of the right table from POPartitionRearrangeTez for
the skewed join
+ NullablePartitionWritable wrappedKey = new
NullablePartitionWritable(key);
+ wrappedKey.setPartition(-1);
+ key = wrappedKey;
+ }
writer.write(key, val);
} else {
illustratorMarkup(res.result, res.result, 0);
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
Thu Jun 25 20:59:31 2015
@@ -4,86 +4,92 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-12 -> Tez vertex scope-22,Tez vertex scope-33,
-Tez vertex scope-22 -> Tez vertex scope-33,
-Tez vertex scope-33 -> Tez vertex scope-35,
-Tez vertex scope-35 -> Tez vertex scope-46,
-Tez vertex scope-46
+Tez vertex scope-12 -> Tez vertex scope-28,
+Tez vertex scope-28 -> Tez vertex scope-39,
+Tez vertex scope-39 -> Tez vertex scope-40,
+Tez vertex scope-40 -> Tez vertex scope-51,
+Tez vertex scope-51
Tez vertex scope-12
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-15 -> scope-22
+Local Rearrange[tuple]{tuple}(false) - scope-21 -> scope-28
| |
-| Constant(DummyVal) - scope-14
+| Constant(DummyVal) - scope-20
|
-|---New For Each(false,false,true)[tuple] - scope-21
+|---New For Each(false,false,true)[tuple] - scope-27
| |
| Project[int][0] - scope-8
| |
| Project[int][1] - scope-9
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-20
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-26
| |
- | |---Project[tuple][*] - scope-19
+ | |---Project[tuple][*] - scope-25
|
- |---ReservoirSample - scope-18
+ |---ReservoirSample - scope-24
|
- |---b: Local Rearrange[tuple]{tuple}(false) - scope-13 ->
scope-33
+ |---a: New For Each(false,false)[bag] - scope-7
| |
- | Project[int][0] - scope-8
+ | Cast[int] - scope-2
| |
- | Project[int][1] - scope-9
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
|
- |---a: New For Each(false,false)[bag] - scope-7
- | |
- | Cast[int] - scope-2
- | |
- | |---Project[bytearray][0] - scope-1
- | |
- | Cast[int] - scope-5
- | |
- | |---Project[bytearray][1] - scope-4
- |
- |---a:
Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-22
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) -
scope-0
+Tez vertex scope-28
# Plan on vertex
-POValueOutputTez - scope-32 -> [scope-33]
+POValueOutputTez - scope-38 -> [scope-39]
|
-|---New For Each(false)[tuple] - scope-31
+|---New For Each(false)[tuple] - scope-37
| |
- |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-30
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-36
| |
- | |---Project[tuple][*] - scope-29
+ | |---Project[tuple][*] - scope-35
|
- |---New For Each(false,false)[tuple] - scope-28
+ |---New For Each(false,false)[tuple] - scope-34
| |
- | Constant(-1) - scope-27
+ | Constant(-1) - scope-33
| |
- | Project[bag][1] - scope-24
+ | Project[bag][1] - scope-30
|
- |---Package(Packager)[tuple]{bytearray} - scope-23
-Tez vertex scope-33
+ |---Package(Packager)[tuple]{bytearray} - scope-29
+Tez vertex scope-39
# Plan on vertex
-POIdentityInOutTez - scope-34 <- scope-12 -> scope-35
+b: Local Rearrange[tuple]{tuple}(false) - scope-13 -> scope-40
| |
| Project[int][0] - scope-8
| |
| Project[int][1] - scope-9
-Tez vertex scope-35
+|
+|---a: New For Each(false,false)[bag] - scope-19
+ | |
+ | Cast[int] - scope-16
+ | |
+ | |---Project[bytearray][0] - scope-15
+ | |
+ | Cast[int] - scope-18
+ | |
+ | |---Project[bytearray][1] - scope-17
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-40
# Plan on vertex
-POValueOutputTez - scope-45 -> [scope-46]
+POValueOutputTez - scope-50 -> [scope-51]
|
-|---Limit - scope-44
+|---Limit - scope-49
|
- |---New For Each(true)[tuple] - scope-43
+ |---New For Each(true)[tuple] - scope-48
| |
- | Project[bag][1] - scope-42
+ | Project[bag][1] - scope-47
|
- |---Package(LitePackager)[tuple]{tuple} - scope-41
-Tez vertex scope-46
+ |---Package(LitePackager)[tuple]{tuple} - scope-46
+Tez vertex scope-51
# Plan on vertex
c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-11
|
-|---Limit - scope-48
+|---Limit - scope-53
|
- |---POValueInputTez - scope-47 <- scope-35
+ |---POValueInputTez - scope-52 <- scope-40
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
Thu Jun 25 20:59:31 2015
@@ -4,70 +4,78 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-11 -> Tez vertex scope-20,Tez vertex scope-30,
-Tez vertex scope-20 -> Tez vertex scope-30,
-Tez vertex scope-30 -> Tez vertex scope-32,
-Tez vertex scope-32
+Tez vertex scope-11 -> Tez vertex scope-26,
+Tez vertex scope-26 -> Tez vertex scope-36,
+Tez vertex scope-36 -> Tez vertex scope-37,
+Tez vertex scope-37
Tez vertex scope-11
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-14 -> scope-20
+Local Rearrange[tuple]{tuple}(false) - scope-20 -> scope-26
| |
-| Constant(DummyVal) - scope-13
+| Constant(DummyVal) - scope-19
|
-|---New For Each(false,true)[tuple] - scope-19
+|---New For Each(false,true)[tuple] - scope-25
| |
| Project[int][0] - scope-8
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-18
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-24
| |
- | |---Project[tuple][*] - scope-17
+ | |---Project[tuple][*] - scope-23
|
- |---ReservoirSample - scope-16
+ |---ReservoirSample - scope-22
|
- |---b: Local Rearrange[tuple]{int}(false) - scope-12 ->
scope-30
+ |---a: New For Each(false,false)[bag] - scope-7
| |
- | Project[int][0] - scope-8
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
|
- |---a: New For Each(false,false)[bag] - scope-7
- | |
- | Cast[int] - scope-2
- | |
- | |---Project[bytearray][0] - scope-1
- | |
- | Cast[int] - scope-5
- | |
- | |---Project[bytearray][1] - scope-4
- |
- |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
-Tez vertex scope-20
+ |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
+Tez vertex scope-26
# Plan on vertex
-POValueOutputTez - scope-29 -> [scope-30]
+POValueOutputTez - scope-35 -> [scope-36]
|
-|---New For Each(false)[tuple] - scope-28
+|---New For Each(false)[tuple] - scope-34
| |
- |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-27
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-33
| |
- | |---Project[tuple][*] - scope-26
+ | |---Project[tuple][*] - scope-32
|
- |---New For Each(false,false)[tuple] - scope-25
+ |---New For Each(false,false)[tuple] - scope-31
| |
- | Constant(-1) - scope-24
+ | Constant(-1) - scope-30
| |
- | Project[bag][1] - scope-22
+ | Project[bag][1] - scope-28
|
- |---Package(Packager)[tuple]{bytearray} - scope-21
-Tez vertex scope-30
+ |---Package(Packager)[tuple]{bytearray} - scope-27
+Tez vertex scope-36
# Plan on vertex
-POIdentityInOutTez - scope-31 <- scope-11 -> scope-32
+b: Local Rearrange[tuple]{int}(false) - scope-12 -> scope-37
| |
| Project[int][0] - scope-8
-Tez vertex scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-18
+ | |
+ | Cast[int] - scope-15
+ | |
+ | |---Project[bytearray][0] - scope-14
+ | |
+ | Cast[int] - scope-17
+ | |
+ | |---Project[bytearray][1] - scope-16
+ |
+ |---a: Load(file:///tmp/input:PigStorage(',')) - scope-13
+Tez vertex scope-37
# Plan on vertex
b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
|
-|---New For Each(true)[tuple] - scope-35
+|---New For Each(true)[tuple] - scope-40
| |
- | Project[bag][1] - scope-34
+ | Project[bag][1] - scope-39
|
- |---Package(LitePackager)[tuple]{int} - scope-33
+ |---Package(LitePackager)[tuple]{int} - scope-38
Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld?rev=1687646&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
Thu Jun 25 20:59:31 2015
@@ -0,0 +1,81 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-15 -> Tez vertex scope-24,Tez vertex scope-34,
+Tez vertex scope-24 -> Tez vertex scope-34,
+Tez vertex scope-34 -> Tez vertex scope-36,
+Tez vertex scope-36
+
+Tez vertex scope-15
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-18 -> scope-24
+| |
+| Constant(DummyVal) - scope-17
+|
+|---New For Each(false,true)[tuple] - scope-23
+ | |
+ | Project[int][0] - scope-12
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22
+ | |
+ | |---Project[tuple][*] - scope-21
+ |
+ |---ReservoirSample - scope-20
+ |
+ |---c: Local Rearrange[tuple]{int}(false) - scope-16 ->
scope-34
+ | |
+ | Project[int][0] - scope-12
+ |
+ |---b: Filter[bag] - scope-8
+ | |
+ | Equal To[boolean] - scope-11
+ | |
+ | |---Project[int][0] - scope-9
+ | |
+ | |---Constant(1) - scope-10
+ |
+ |---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
+Tez vertex scope-24
+# Plan on vertex
+POValueOutputTez - scope-33 -> [scope-34]
+|
+|---New For Each(false)[tuple] - scope-32
+ | |
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
- scope-31
+ | |
+ | |---Project[tuple][*] - scope-30
+ |
+ |---New For Each(false,false)[tuple] - scope-29
+ | |
+ | Constant(-1) - scope-28
+ | |
+ | Project[bag][1] - scope-26
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-25
+Tez vertex scope-34
+# Plan on vertex
+POIdentityInOutTez - scope-35 <- scope-15 -> scope-36
+| |
+| Project[int][0] - scope-12
+Tez vertex scope-36
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-14
+|
+|---New For Each(true)[tuple] - scope-39
+ | |
+ | Project[bag][1] - scope-38
+ |
+ |---Package(LitePackager)[tuple]{int} - scope-37
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
Thu Jun 25 20:59:31 2015
@@ -4,68 +4,76 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-27 -> Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36 -> Tez vertex scope-28,Tez vertex scope-46,
-Tez vertex scope-46 -> Tez vertex scope-50,
-Tez vertex scope-28 -> Tez vertex scope-50,
-Tez vertex scope-50
+Tez vertex scope-27 -> Tez vertex scope-42,
+Tez vertex scope-42 -> Tez vertex scope-28,Tez vertex scope-52,
+Tez vertex scope-52 -> Tez vertex scope-55,
+Tez vertex scope-28 -> Tez vertex scope-55,
+Tez vertex scope-55
Tez vertex scope-27
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-31 -> scope-36
+Local Rearrange[tuple]{tuple}(false) - scope-31 -> scope-42
| |
| Constant(DummyVal) - scope-30
|
-|---New For Each(true,true)[tuple] - scope-35
+|---New For Each(true,true)[tuple] - scope-41
| |
| Project[int][0] - scope-16
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-34
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-40
| |
- | |---Project[tuple][*] - scope-33
+ | |---Project[tuple][*] - scope-39
|
|---PoissonSample - scope-32
|
- |---Local Rearrange[tuple]{int}(false) - scope-29 ->
scope-46
+ |---a: New For Each(false,false)[bag] - scope-7
| |
- | Project[int][0] - scope-16
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
|
- |---a: New For Each(false,false)[bag] - scope-7
- | |
- | Cast[int] - scope-2
- | |
- | |---Project[bytearray][0] - scope-1
- | |
- | Cast[int] - scope-5
- | |
- | |---Project[bytearray][1] - scope-4
- |
- |---a:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-36
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage)
- scope-0
+Tez vertex scope-42
# Plan on vertex
-POValueOutputTez - scope-45 -> [scope-28, scope-46]
+POValueOutputTez - scope-51 -> [scope-28, scope-52]
|
-|---New For Each(false)[tuple] - scope-44
+|---New For Each(false)[tuple] - scope-50
| |
- |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-43
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-49
| |
- | |---Project[tuple][*] - scope-42
+ | |---Project[tuple][*] - scope-48
|
- |---New For Each(false,false)[tuple] - scope-41
+ |---New For Each(false,false)[tuple] - scope-47
| |
- | Constant(-1) - scope-40
+ | Constant(-1) - scope-46
| |
- | Project[bag][1] - scope-38
+ | Project[bag][1] - scope-44
|
- |---Package(Packager)[tuple]{bytearray} - scope-37
-Tez vertex scope-46
+ |---Package(Packager)[tuple]{bytearray} - scope-43
+Tez vertex scope-52
# Plan on vertex
-POIdentityInOutTez - scope-47 <- scope-27 -> scope-50
+Local Rearrange[tuple]{int}(false) - scope-29 -> scope-55
| |
| Project[int][0] - scope-16
+|
+|---a: New For Each(false,false)[bag] - scope-38
+ | |
+ | Cast[int] - scope-35
+ | |
+ | |---Project[bytearray][0] - scope-34
+ | |
+ | Cast[int] - scope-37
+ | |
+ | |---Project[bytearray][1] - scope-36
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) -
scope-33
Tez vertex scope-28
# Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-48 -> scope-50
+Partition Rearrange[tuple]{int}(false) - scope-53 -> scope-55
| |
| Project[int][0] - scope-17
|
@@ -80,7 +88,7 @@ Partition Rearrange[tuple]{int}(false) -
| |---Project[bytearray][1] - scope-12
|
|---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
-Tez vertex scope-50
+Tez vertex scope-55
# Plan on vertex
d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-26
|
@@ -92,10 +100,10 @@ d: Store(file:///tmp/output:org.apache.p
| |
| Project[int][3] - scope-23
|
- |---New For Each(true,true)[tuple] - scope-54
+ |---New For Each(true,true)[tuple] - scope-59
| |
- | Project[bag][1] - scope-52
+ | Project[bag][1] - scope-57
| |
- | Project[bag][2] - scope-53
+ | Project[bag][2] - scope-58
|
- |---Package(Packager)[tuple]{int} - scope-51
+ |---Package(Packager)[tuple]{int} - scope-56
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld?rev=1687646&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
Thu Jun 25 20:59:31 2015
@@ -0,0 +1,109 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-31 -> Tez vertex scope-40,Tez vertex scope-50,
+Tez vertex scope-40 -> Tez vertex scope-32,Tez vertex scope-50,
+Tez vertex scope-50 -> Tez vertex scope-54,
+Tez vertex scope-32 -> Tez vertex scope-54,
+Tez vertex scope-54
+
+Tez vertex scope-31
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-35 -> scope-40
+| |
+| Constant(DummyVal) - scope-34
+|
+|---New For Each(true,true)[tuple] - scope-39
+ | |
+ | Project[int][0] - scope-20
+ | |
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-38
+ | |
+ | |---Project[tuple][*] - scope-37
+ |
+ |---PoissonSample - scope-36
+ |
+ |---Local Rearrange[tuple]{int}(false) - scope-33 ->
scope-50
+ | |
+ | Project[int][0] - scope-20
+ |
+ |---a: Filter[bag] - scope-8
+ | |
+ | Equal To[boolean] - scope-11
+ | |
+ | |---Project[int][0] - scope-9
+ | |
+ | |---Constant(1) - scope-10
+ |
+ |---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-40
+# Plan on vertex
+POValueOutputTez - scope-49 -> [scope-32, scope-50]
+|
+|---New For Each(false)[tuple] - scope-48
+ | |
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-47
+ | |
+ | |---Project[tuple][*] - scope-46
+ |
+ |---New For Each(false,false)[tuple] - scope-45
+ | |
+ | Constant(-1) - scope-44
+ | |
+ | Project[bag][1] - scope-42
+ |
+ |---Package(Packager)[tuple]{bytearray} - scope-41
+Tez vertex scope-50
+# Plan on vertex
+POIdentityInOutTez - scope-51 <- scope-31 -> scope-54
+| |
+| Project[int][0] - scope-20
+Tez vertex scope-32
+# Plan on vertex
+Partition Rearrange[tuple]{int}(false) - scope-52 -> scope-54
+| |
+| Project[int][0] - scope-21
+|
+|---b: New For Each(false,false)[bag] - scope-19
+ | |
+ | Cast[int] - scope-14
+ | |
+ | |---Project[bytearray][0] - scope-13
+ | |
+ | Cast[int] - scope-17
+ | |
+ | |---Project[bytearray][1] - scope-16
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) -
scope-12
+Tez vertex scope-54
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---d: New For Each(false,false,false)[bag] - scope-29
+ | |
+ | Project[int][0] - scope-23
+ | |
+ | Project[int][1] - scope-25
+ | |
+ | Project[int][3] - scope-27
+ |
+ |---New For Each(true,true)[tuple] - scope-58
+ | |
+ | Project[bag][1] - scope-56
+ | |
+ | Project[bag][2] - scope-57
+ |
+ |---Package(Packager)[tuple]{int} - scope-55
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
(original)
+++
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
Thu Jun 25 20:59:31 2015
@@ -4,70 +4,78 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-37,Tez vertex scope-58,
-Tez vertex scope-58 -> Tez vertex scope-62,
+Tez vertex scope-30 -> Tez vertex scope-54,
+Tez vertex scope-54 -> Tez vertex scope-37,Tez vertex scope-64,
+Tez vertex scope-64 -> Tez vertex scope-67,
Tez vertex scope-31 -> Tez vertex scope-35,Tez vertex scope-37,
Tez vertex scope-35 -> Tez vertex scope-37,
-Tez vertex scope-37 -> Tez vertex scope-62,
-Tez vertex scope-62
+Tez vertex scope-37 -> Tez vertex scope-67,
+Tez vertex scope-67
Tez vertex scope-30
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-43 -> scope-48
+Local Rearrange[tuple]{tuple}(false) - scope-43 -> scope-54
| |
| Constant(DummyVal) - scope-42
|
-|---New For Each(true,true)[tuple] - scope-47
+|---New For Each(true,true)[tuple] - scope-53
| |
| Project[int][0] - scope-26
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-46
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-52
| |
- | |---Project[tuple][*] - scope-45
+ | |---Project[tuple][*] - scope-51
|
|---PoissonSample - scope-44
|
- |---Local Rearrange[tuple]{int}(false) - scope-41 ->
scope-58
+ |---d: New For Each(false,false)[bag] - scope-7
| |
- | Project[int][0] - scope-26
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
|
- |---d: New For Each(false,false)[bag] - scope-7
- | |
- | Cast[int] - scope-2
- | |
- | |---Project[bytearray][0] - scope-1
- | |
- | Cast[chararray] - scope-5
- | |
- | |---Project[bytearray][1] - scope-4
- |
- |---d:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-48
-# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-37, scope-58]
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage)
- scope-0
+Tez vertex scope-54
+# Plan on vertex
+POValueOutputTez - scope-63 -> [scope-37, scope-64]
|
-|---New For Each(false)[tuple] - scope-56
+|---New For Each(false)[tuple] - scope-62
| |
- |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-55
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-61
| |
- | |---Project[tuple][*] - scope-54
+ | |---Project[tuple][*] - scope-60
|
- |---New For Each(false,false)[tuple] - scope-53
+ |---New For Each(false,false)[tuple] - scope-59
| |
- | Constant(-1) - scope-52
+ | Constant(-1) - scope-58
| |
- | Project[bag][1] - scope-50
+ | Project[bag][1] - scope-56
|
- |---Package(Packager)[tuple]{bytearray} - scope-49
-Tez vertex scope-58
+ |---Package(Packager)[tuple]{bytearray} - scope-55
+Tez vertex scope-64
# Plan on vertex
-POIdentityInOutTez - scope-59 <- scope-30 -> scope-62
+Local Rearrange[tuple]{int}(false) - scope-41 -> scope-67
| |
| Project[int][0] - scope-26
+|
+|---d: New For Each(false,false)[bag] - scope-50
+ | |
+ | Cast[int] - scope-47
+ | |
+ | |---Project[bytearray][0] - scope-46
+ | |
+ | Cast[chararray] - scope-49
+ | |
+ | |---Project[bytearray][1] - scope-48
+ |
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) -
scope-45
Tez vertex scope-31
# Plan on vertex
-a: Split - scope-68
+a: Split - scope-73
| |
| POValueOutputTez - scope-39 -> [scope-37]
| |
@@ -99,19 +107,19 @@ POValueOutputTez - scope-40 -> [scope-3
|---POValueInputTez - scope-36 <- scope-31
Tez vertex scope-37
# Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-60 -> scope-62
+Partition Rearrange[tuple]{int}(false) - scope-65 -> scope-67
| |
| Project[int][0] - scope-27
|
|---POShuffledValueInputTez - scope-38 <- [scope-31, scope-35]
-Tez vertex scope-62
+Tez vertex scope-67
# Plan on vertex
e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-29
|
-|---New For Each(true,true)[tuple] - scope-66
+|---New For Each(true,true)[tuple] - scope-71
| |
- | Project[bag][1] - scope-64
+ | Project[bag][1] - scope-69
| |
- | Project[bag][2] - scope-65
+ | Project[bag][2] - scope-70
|
- |---Package(Packager)[tuple]{int} - scope-63
+ |---Package(Packager)[tuple]{int} - scope-68
Modified:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
Thu Jun 25 20:59:31 2015
@@ -4,76 +4,84 @@
#--------------------------------------------------
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
-Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-31,Tez vertex scope-58,
-Tez vertex scope-58 -> Tez vertex scope-62,
-Tez vertex scope-31 -> Tez vertex scope-62,
-Tez vertex scope-62
+Tez vertex scope-30 -> Tez vertex scope-54,
+Tez vertex scope-54 -> Tez vertex scope-31,Tez vertex scope-64,
+Tez vertex scope-64 -> Tez vertex scope-67,
+Tez vertex scope-31 -> Tez vertex scope-67,
+Tez vertex scope-67
Tez vertex scope-30
# Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-43 -> scope-48
+Local Rearrange[tuple]{tuple}(false) - scope-43 -> scope-54
| |
| Constant(DummyVal) - scope-42
|
-|---New For Each(true,true)[tuple] - scope-47
+|---New For Each(true,true)[tuple] - scope-53
| |
| Project[int][0] - scope-26
| |
- | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-46
+ | POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-52
| |
- | |---Project[tuple][*] - scope-45
+ | |---Project[tuple][*] - scope-51
|
|---PoissonSample - scope-44
|
- |---Local Rearrange[tuple]{int}(false) - scope-41 ->
scope-58
+ |---d: New For Each(false,false)[bag] - scope-7
| |
- | Project[int][0] - scope-26
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[chararray] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
|
- |---d: New For Each(false,false)[bag] - scope-7
- | |
- | Cast[int] - scope-2
- | |
- | |---Project[bytearray][0] - scope-1
- | |
- | Cast[chararray] - scope-5
- | |
- | |---Project[bytearray][1] - scope-4
- |
- |---d:
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-48
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage)
- scope-0
+Tez vertex scope-54
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-31, scope-58]
+POValueOutputTez - scope-63 -> [scope-31, scope-64]
|
-|---New For Each(false)[tuple] - scope-56
+|---New For Each(false)[tuple] - scope-62
| |
- |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-55
+ |
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
- scope-61
| |
- | |---Project[tuple][*] - scope-54
+ | |---Project[tuple][*] - scope-60
|
- |---New For Each(false,false)[tuple] - scope-53
+ |---New For Each(false,false)[tuple] - scope-59
| |
- | Constant(-1) - scope-52
+ | Constant(-1) - scope-58
| |
- | Project[bag][1] - scope-50
+ | Project[bag][1] - scope-56
|
- |---Package(Packager)[tuple]{bytearray} - scope-49
-Tez vertex scope-58
+ |---Package(Packager)[tuple]{bytearray} - scope-55
+Tez vertex scope-64
# Plan on vertex
-POIdentityInOutTez - scope-59 <- scope-30 -> scope-62
+Local Rearrange[tuple]{int}(false) - scope-41 -> scope-67
| |
| Project[int][0] - scope-26
+|
+|---d: New For Each(false,false)[bag] - scope-50
+ | |
+ | Cast[int] - scope-47
+ | |
+ | |---Project[bytearray][0] - scope-46
+ | |
+ | Cast[chararray] - scope-49
+ | |
+ | |---Project[bytearray][1] - scope-48
+ |
+ |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) -
scope-45
Tez vertex scope-31
# Plan on vertex
-a: Split - scope-68
+a: Split - scope-73
| |
-| Partition Rearrange[tuple]{int}(false) - scope-69 -> scope-62
+| Partition Rearrange[tuple]{int}(false) - scope-74 -> scope-67
| | |
-| | Project[int][0] - scope-70
+| | Project[int][0] - scope-75
| |
-| Partition Rearrange[tuple]{int}(false) - scope-71 -> scope-62
+| Partition Rearrange[tuple]{int}(false) - scope-76 -> scope-67
| | |
-| | Project[int][0] - scope-72
+| | Project[int][0] - scope-77
| |
| |---b: Filter[bag] - scope-21
| | |
@@ -94,14 +102,14 @@ a: Split - scope-68
| |---Project[bytearray][1] - scope-12
|
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
-Tez vertex scope-62
+Tez vertex scope-67
# Plan on vertex
e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-29
|
-|---New For Each(true,true)[tuple] - scope-66
+|---New For Each(true,true)[tuple] - scope-71
| |
- | Project[bag][1] - scope-64
+ | Project[bag][1] - scope-69
| |
- | Project[bag][2] - scope-65
+ | Project[bag][2] - scope-70
|
- |---Package(Packager)[tuple]{int} - scope-63
+ |---Package(Packager)[tuple]{int} - scope-68
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Jun 25
20:59:31 2015
@@ -309,7 +309,7 @@ public class TestTezAutoParallelism {
+ "STORE E into '" + outputDir + "/finalout';";
String log = testIncreaseIntermediateParallelism(script, outputDir,
true);
// Parallelism of C should be increased
- assertTrue(log.contains("Increased requested parallelism of scope-54
to 4"));
+ assertTrue(log.contains("Increased requested parallelism of scope-59
to 4"));
assertEquals(1, StringUtils.countMatches(log, "Increased requested
parallelism"));
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Thu Jun 25 20:59:31
2015
@@ -194,6 +194,19 @@ public class TestTezCompiler {
}
@Test
+ public void testSkewedJoinFilter() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "a = filter a by x == 1;" +
+ "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+ "c = join a by x, b by x using 'skewed';" +
+ "d = foreach c generate a::x as x, y, z;" +
+ "store d into 'file:///tmp/output';";
+
+ run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
+ }
+
+ @Test
public void testLimit() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -312,6 +325,17 @@ public class TestTezCompiler {
run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
}
+ @Test
+ public void testOrderByWithFilter() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' using PigStorage(',') as (x:int,
y:int);" +
+ "b = filter a by x == 1;" +
+ "c = order b by x;" +
+ "STORE c INTO 'file:///tmp/output';";
+
+ run(query,
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
+ }
+
// PIG-3759, PIG-3781
// Combiner should not be added in case of co-group
@Test
@@ -545,6 +569,8 @@ public class TestTezCompiler {
@Test
public void testUnionSkewedJoin() throws Exception {
+ // TODO: PIG-4574 optimization needs to be done for this as well.
+ // Requires changes in UnionOptimizer
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -561,6 +587,8 @@ public class TestTezCompiler {
@Test
public void testUnionOrderby() throws Exception {
+ // TODO: PIG-4574 optimization needs to be done for this as well.
+ // Requires changes in UnionOptimizer
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +
"b = load 'file:///tmp/input' as (y:chararray, x:int);" +