Author: rohini
Date: Tue Apr 7 23:21:01 2015
New Revision: 1671973
URL: http://svn.apache.org/r1671973
Log:
PIG-4495: Better multi-query planning in case of multiple edges (rohini)
Added:
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/test/e2e/pig/tests/multiquery.conf
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/TestFRJoin.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 7 23:21:01 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4495: Better multi-query planning in case of multiple edges (rohini)
+
PIG-3294: Allow Pig use Hive UDFs (daijy)
PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via
rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
Tue Apr 7 23:21:01 2015
@@ -107,11 +107,7 @@ public class POPoissonSample extends Phy
if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
} else if (res.returnStatus == POStatus.STATUS_EOP) {
- if (this.parentPlan.endOfAllInput) {
- return eop;
- } else {
- continue;
- }
+ return res;
} else if (res.returnStatus == POStatus.STATUS_ERR) {
return res;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Tue Apr 7 23:21:01 2015
@@ -109,6 +109,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
@@ -739,6 +740,8 @@ public class TezDagBuilder extends TezOp
additionalLocalResources));
}
+ // Union within a split can have multiple stores writing to same output
+ Set<String> uniqueStoreOutputs = new HashSet<String>();
for (POStore store : stores) {
ArrayList<POStore> emptyList = new ArrayList<POStore>();
@@ -763,10 +766,13 @@ public class TezDagBuilder extends TezOp
continue;
}
}
- vertex.addDataSink(store.getOperatorKey().toString(),
- new DataSinkDescriptor(storeOutDescriptor,
-
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
- dag.getCredentials()));
+ String outputKey = ((POStoreTez) store).getOutputKey();
+ if (!uniqueStoreOutputs.contains(outputKey)) {
+ vertex.addDataSink(outputKey.toString(),
+ new DataSinkDescriptor(storeOutDescriptor,
+
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
+ dag.getCredentials()));
+ }
}
// LoadFunc and StoreFunc add delegation tokens to Job Credentials in
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Tue Apr 7 23:21:01 2015
@@ -403,11 +403,12 @@ public class TezLauncher extends Launche
skOptimizer.visit();
}
+ boolean isUnionOpt =
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
boolean isMultiQuery =
conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
if (isMultiQuery) {
// reduces the number of TezOpers in the Tez plan generated
// by multi-query (multi-store) script.
- MultiQueryOptimizerTez mqOptimizer = new
MultiQueryOptimizerTez(tezPlan);
+ MultiQueryOptimizerTez mqOptimizer = new
MultiQueryOptimizerTez(tezPlan, isUnionOpt);
mqOptimizer.visit();
}
@@ -419,7 +420,6 @@ public class TezLauncher extends Launche
}
// Use VertexGroup in Tez
- boolean isUnionOpt =
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
if (isUnionOpt) {
UnionOptimizer uo = new UnionOptimizer(tezPlan);
uo.visit();
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Tue Apr 7 23:21:01 2015
@@ -284,6 +284,11 @@ public class TezCompiler extends PhyPlan
FuncSpec newSpec = new
FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
userFunc.setFuncSpec(newSpec);
+ //Remove unused store filename
+ if (userFunc.getInputs().size() == 2) {
+ userFunc.getInputs().remove(1);
+ }
+
if (storeSeen.containsKey(store)) {
storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
} else {
@@ -292,9 +297,6 @@ public class TezCompiler extends PhyPlan
from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
from.plan.addAsLeaf(output);
storeSeen.put(store, output);
-
- //Remove unused store filename
- userFunc.getInputs().remove(1);
}
if (tezPlan.getPredecessors(tezOp)==null ||
!tezPlan.getPredecessors(tezOp).contains(from)) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
Tue Apr 7 23:21:01 2015
@@ -441,7 +441,7 @@ public class TezOperator extends Operato
this.useSecondaryKey = useSecondaryKey;
}
- public List<OperatorKey> getUnionPredecessors() {
+ public List<OperatorKey> getUnionMembers() {
return vertexGroupMembers;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
Tue Apr 7 23:21:01 2015
@@ -182,8 +182,8 @@ public class TezPOPackageAnnotator exten
Integer index = Integer.valueOf(lrearrange.getIndex());
if(keyInfo.get(index) != null) {
- if (isPOSplit && predTezOpVertexGrp != null ) {
- // Case of POSplit having more than one member of the
vertex group
+ if (isPOSplit) {
+ // Case of POSplit having more than one input in case of
self join or union
loRearrangeFound--;
} else {
// something is wrong - we should not be getting key info
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
Tue Apr 7 23:21:01 2015
@@ -53,11 +53,13 @@ public class POFRJoinTez extends POFRJoi
private static final Log log = LogFactory.getLog(POFRJoinTez.class);
private static final long serialVersionUID = 1L;
- // For replicated tables
- private List<LogicalInput> replInputs = Lists.newArrayList();
- private List<KeyValueReader> replReaders = Lists.newArrayList();
private List<String> inputKeys;
+
+ // For replicated tables
+ private transient List<LogicalInput> replInputs;
+ private transient List<KeyValueReader> replReaders;
private transient boolean isInputCached;
+ private transient String cacheKey;
public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws
ExecException {
super(copy);
@@ -78,7 +80,7 @@ public class POFRJoinTez extends POFRJoi
@Override
public void addInputsToSkip(Set<String> inputsToSkip) {
- String cacheKey = "replicatemap-" + getOperatorKey().toString();
+ cacheKey = "replicatemap-" + inputKeys.toString();
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
if (cacheValue != null) {
isInputCached = true;
@@ -93,10 +95,14 @@ public class POFRJoinTez extends POFRJoi
return;
}
try {
+ this.replInputs = Lists.newArrayList();
+ this.replReaders = Lists.newArrayList();
for (String key : inputKeys) {
LogicalInput input = inputs.get(key);
- this.replInputs.add(input);
- this.replReaders.add((KeyValueReader) input.getReader());
+ if (!this.replInputs.contains(input)) {
+ this.replInputs.add(input);
+ this.replReaders.add((KeyValueReader) input.getReader());
+ }
}
} catch (Exception e) {
throw new ExecException(e);
@@ -110,10 +116,11 @@ public class POFRJoinTez extends POFRJoi
*/
@Override
protected void setUpHashMap() throws ExecException {
- String cacheKey = "replicatemap-" + getOperatorKey().toString();
- if (isInputCached) {
- Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ // Re-check again in case of Split + union + replicate join
+ // where same POFRJoinTez occurs in different Split sub-plans
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
replicates = (TupleToMapKey[]) cacheValue;
log.info("Found " + (replicates.length - 1) + " replication hash
tables in Tez cache. cachekey=" + cacheKey);
return;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
Tue Apr 7 23:21:01 2015
@@ -42,6 +42,7 @@ import org.apache.pig.impl.builtin.Parti
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
@@ -255,4 +256,16 @@ public class POPartitionRearrangeTez ext
cache.cache(reducerMapCacheKey, reducerMap);
inited = true;
}
+
+ @Override
+ public POPartitionRearrangeTez clone() throws CloneNotSupportedException {
+ POPartitionRearrangeTez clone = new POPartitionRearrangeTez(new
OperatorKey(
+ mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
+ mKey.scope)), requestedParallelism);
+ deepCopyTo(clone);
+ clone.isSkewedJoin = isSkewedJoin;
+ clone.connectedToPackage = connectedToPackage;
+ clone.setOutputKey(outputKey);
+ return clone;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
Tue Apr 7 23:21:01 2015
@@ -50,15 +50,14 @@ public class POShuffleTezLoad extends PO
private static final long serialVersionUID = 1L;
protected List<String> inputKeys = new ArrayList<String>();
- protected List<LogicalInput> inputs = new ArrayList<LogicalInput>();
- protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
-
- private boolean[] finished;
- private boolean[] readOnce;
-
- private WritableComparator comparator = null;
private boolean isSkewedJoin = false;
+ private transient List<LogicalInput> inputs;
+ private transient List<KeyValuesReader> readers;
+ private transient int numTezInputs;
+ private transient boolean[] finished;
+ private transient boolean[] readOnce;
+ private transient WritableComparator comparator = null;
private transient Configuration conf;
private transient int accumulativeBatchSize;
@@ -86,33 +85,39 @@ public class POShuffleTezLoad extends PO
public void attachInputs(Map<String, LogicalInput> inputs, Configuration
conf)
throws ExecException {
this.conf = conf;
- comparator = (WritableComparator)
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+ this.inputs = new ArrayList<LogicalInput>();
+ this.readers = new ArrayList<KeyValuesReader>();
+ this.comparator = (WritableComparator)
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+ this.accumulativeBatchSize =
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
try {
- for (String key : inputKeys) {
- LogicalInput input = inputs.get(key);
- this.inputs.add(input);
- this.readers.add((KeyValuesReader)input.getReader());
+ for (String inputKey : inputKeys) {
+ LogicalInput input = inputs.get(inputKey);
+ // 1) Case of self join/cogroup/cross with Split.
+ // - Same TezInput will contain multiple indexes in case
of join
+ // 2) data unioned within Split
+ // - Input key will be repeated, but index would be same
within a TezInput
+ if (!this.inputs.contains(input)) {
+ this.inputs.add(input);
+ this.readers.add((KeyValuesReader)input.getReader());
+ }
}
- // We need to adjust numInputs because it's possible for both
- // OrderedGroupedKVInput and non-OrderedGroupedKVInput to be
attached
- // to the same vertex. If so, we're only interested in
- // OrderedGroupedKVInputs. So we ignore the others.
- this.numInputs = this.inputs.size();
+ this.numInputs = this.pkgr.getKeyInfo().size();
+ this.numTezInputs = this.inputs.size();
readOnce = new boolean[numInputs];
for (int i = 0; i < numInputs; i++) {
readOnce[i] = false;
}
- finished = new boolean[numInputs];
- for (int i = 0; i < numInputs; i++) {
+ finished = new boolean[numTezInputs];
+ for (int i = 0; i < numTezInputs; i++) {
finished[i] = !readers.get(i).next();
}
} catch (Exception e) {
throw new ExecException(e);
}
- accumulativeBatchSize =
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
}
@Override
@@ -131,7 +136,7 @@ public class POShuffleTezLoad extends PO
int minIndex = -1;
try {
- for (int i = 0; i < numInputs; i++) {
+ for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
hasData = true;
cur = readers.get(i).getCurrentKey();
@@ -172,8 +177,10 @@ public class POShuffleTezLoad extends PO
} else {
for (int i = 0; i < numInputs; i++) {
+ bags[i] = new InternalCachedBag(numInputs);
+ }
- DataBag bag = null;
+ for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
@@ -181,14 +188,12 @@ public class POShuffleTezLoad extends PO
while (comparator.compare(min, cur) == 0
&& (!min.isNull() || (min.isNull() && i ==
minIndex))) {
Iterable<Object> vals =
readers.get(i).getCurrentValues();
- bag = bags[i] == null ? new
InternalCachedBag(numInputs) : bags[i];
for (Object val : vals) {
NullableTuple nTup = (NullableTuple) val;
int index = nTup.getIndex();
Tuple tup =
pkgr.getValueTuple(keyWritable, nTup, index);
- bag.add(tup);
+ bags[index].add(tup);
}
- bags[i] = bag;
finished[i] = !readers.get(i).next();
if (finished[i]) {
break;
@@ -196,10 +201,6 @@ public class POShuffleTezLoad extends PO
cur = readers.get(i).getCurrentKey();
}
}
-
- if (bag == null) {
- bags[i] = new InternalCachedBag(numInputs);
- }
}
}
@@ -269,7 +270,7 @@ public class POShuffleTezLoad extends PO
public boolean hasNextBatch() {
Object cur = null;
try {
- for (int i = 0; i < numInputs; i++) {
+ for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
if (comparator.compare(min, cur) == 0
@@ -292,7 +293,7 @@ public class POShuffleTezLoad extends PO
bags[i].clear();
}
try {
- for (int i = 0; i < numInputs; i++) {
+ for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
int batchCount = 0;
@@ -300,7 +301,9 @@ public class POShuffleTezLoad extends PO
min.isNull() && i==minIndex)) {
Iterator<Object> iter =
readers.get(i).getCurrentValues().iterator();
while (iter.hasNext() && batchCount < batchSize) {
- bags[i].add(pkgr.getValueTuple(keyWritable,
(NullableTuple) iter.next(), i));
+ NullableTuple nTup = (NullableTuple)
iter.next();
+ int index = nTup.getIndex();
+
bags[index].add(pkgr.getValueTuple(keyWritable, nTup, index));
batchCount++;
}
if (batchCount == batchSize) {
@@ -333,7 +336,7 @@ public class POShuffleTezLoad extends PO
// early termination of accumulator
Object cur = null;
try {
- for (int i = 0; i < numInputs; i++) {
+ for (int i = 0; i < numTezInputs; i++) {
if (!finished[i]) {
cur = readers.get(i).getCurrentKey();
while (comparator.compare(min, cur) == 0 &&
(!min.isNull() ||
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
Tue Apr 7 23:21:01 2015
@@ -43,10 +43,12 @@ import org.apache.tez.runtime.library.ap
public class POStoreTez extends POStore implements TezOutput,
TezTaskConfigurable {
private static final long serialVersionUID = 1L;
+
+ private String outputKey;
+
private transient MROutput output;
private transient KeyValueWriter writer;
- private String outputKey;
- private TezCounter outputRecordCounter;
+ private transient TezCounter outputRecordCounter;
public POStoreTez(OperatorKey k) {
super(k);
@@ -143,4 +145,9 @@ public class POStoreTez extends POStore
return res;
}
+ @Override
+ public String name() {
+ return super.name() + (getOperatorKey().toString().equals(outputKey) ?
"" : "\t->\t " +outputKey);
+ }
+
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
Tue Apr 7 23:21:01 2015
@@ -97,7 +97,7 @@ public class POValueOutputTez extends Ph
@Override
public void replaceOutput(String oldOutputKey, String newOutputKey) {
if (outputKeys.remove(oldOutputKey)) {
- outputKeys.add(oldOutputKey);
+ outputKeys.add(newOutputKey);
}
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Tue Apr 7 23:21:01 2015
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.commons.lang.ArrayUtils;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -35,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -42,8 +44,21 @@ import org.apache.pig.impl.plan.ReverseD
import org.apache.pig.impl.plan.VisitorException;
public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
- public MultiQueryOptimizerTez(TezOperPlan plan) {
+
+ private boolean unionOptimizerOn;
+
+ public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn) {
super(plan, new ReverseDependencyOrderWalker<TezOperator,
TezOperPlan>(plan));
+ this.unionOptimizerOn = unionOptimizerOn;
+ }
+
+ private void addAllPredecessors(TezOperator tezOp, List<TezOperator>
predsList) {
+ if (getPlan().getPredecessors(tezOp) != null) {
+ for (TezOperator pred : getPlan().getPredecessors(tezOp)) {
+ predsList.add(pred);
+ addAllPredecessors(pred, predsList);
+ }
+ }
}
@Override
@@ -57,19 +72,55 @@ public class MultiQueryOptimizerTez exte
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
for (TezOperator successor : successors) {
+ List<TezOperator> predecessors = new
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
+ predecessors.remove(tezOp);
+ if (!predecessors.isEmpty()) {
+ // If has other dependency that conflicts with other
splittees, don't merge into split
+ // For eg: self replicate join/skewed join
+ // But if replicate input is from a different operator
allow it, but ensure
+ // that we don't have more than one input coming from that
operator into the split
+
+ // Check if other splittees or its predecessors (till the
root) are not present in
+ // the predecessors (till the root) of this splittee.
+ // Need to check the whole predecessors hierarchy till
root as the conflict
+ // could be multiple levels up
+ for (TezOperator predecessor :
getPlan().getPredecessors(successor)) {
+ if (predecessor != tezOp) {
+ predecessors.add(predecessor);
+ addAllPredecessors(predecessor, predecessors);
+ }
+ }
+ List<TezOperator> toMergeSuccPredecessors = new
ArrayList<TezOperator>(successors);
+ toMergeSuccPredecessors.remove(successor);
+ for (TezOperator splittee : splittees) {
+ for (TezOperator spliteePred :
getPlan().getPredecessors(splittee)) {
+ if (spliteePred != tezOp) {
+ toMergeSuccPredecessors.add(spliteePred);
+ addAllPredecessors(spliteePred,
toMergeSuccPredecessors);
+ }
+ }
+ }
+ if (predecessors.removeAll(toMergeSuccPredecessors)) {
+ continue;
+ }
+ }
- // If has other dependency, don't merge into split,
- if (getPlan().getPredecessors(successor).size()!=1) {
+ // Split contains right input of different skewed joins
+ if (successor.getSampleOperator() != null
+ && tezOp.getSampleOperator() != null
+ && !successor.getSampleOperator().equals(
+ tezOp.getSampleOperator())) {
continue;
}
- // Detect diamond shape, we cannot merge it into split, since
Tez
- // does not handle double edge between vertexes
- // TODO:
- // - Vertex groups handles double edges though. For the
case where the
- // double edges are unioned (successor is a union vertex),
- // try merge into split if union optimizer is turned on.
- // - PIG-3876 to handle this by writing to same edge
+ // Detect diamond shape into successor operator, we cannot
merge it into split,
+ // since Tez does not handle double edge between vertexes
+ // Successor could be
+ // - union operator (if no union optimizer changing it to
vertex group which supports multiple edges)
+ // - self replicate join
+ // - self skewed join
+ // Self hash joins can write to same output edge and is
handled by POShuffleTezLoad
+ // TODO: PIG-3876 to handle this by writing to same edge
Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
Set<TezOperator> toMergeSuccessors = new
HashSet<TezOperator>();
mergedSuccessors.addAll(successors);
@@ -79,15 +130,26 @@ public class MultiQueryOptimizerTez exte
}
}
if (getPlan().getSuccessors(successor) != null) {
-
toMergeSuccessors.addAll(getPlan().getSuccessors(successor));
+ for (TezOperator succSuccessor :
getPlan().getSuccessors(successor)) {
+ if (succSuccessor.isUnion()) {
+ if (!(unionOptimizerOn
+ &&
UnionOptimizer.isOptimizable(succSuccessor))) {
+ toMergeSuccessors.add(succSuccessor);
+ }
+ } else if (successors.contains(succSuccessor)) {
+ // Self replicate/skewed join
+ toMergeSuccessors.add(succSuccessor);
+ }
+ }
}
+
mergedSuccessors.retainAll(toMergeSuccessors);
if (mergedSuccessors.isEmpty()) { // no shared edge after merge
splittees.add(successor);
}
}
- if (splittees.size()==0) {
+ if (splittees.size() == 0) {
return;
}
@@ -140,31 +202,61 @@ public class MultiQueryOptimizerTez exte
}
}
- static public void removeSplittee(TezOperPlan plan, TezOperator splitter,
TezOperator splittee) throws PlanException {
- if (plan.getSuccessors(splittee)!=null) {
- List<TezOperator> succs = new ArrayList<TezOperator>();
- succs.addAll(plan.getSuccessors(splittee));
- plan.disconnect(splitter, splittee);
+ private void removeSplittee(TezOperPlan plan, TezOperator splitter,
+ TezOperator splittee) throws PlanException, VisitorException {
+
+ plan.disconnect(splitter, splittee);
+
+ String spliteeKey = splittee.getOperatorKey().toString();
+ String splitterKey = splitter.getOperatorKey().toString();
+
+ if (plan.getPredecessors(splittee) != null) {
+ for (TezOperator pred : new
ArrayList<TezOperator>(plan.getPredecessors(splittee))) {
+ List<TezOutput> tezOutputs =
PlanHelper.getPhysicalOperators(pred.plan,
+ TezOutput.class);
+ for (TezOutput tezOut : tezOutputs) {
+ if (ArrayUtils.contains(tezOut.getTezOutputs(),
spliteeKey)) {
+ tezOut.replaceOutput(spliteeKey, splitterKey);
+ }
+ }
+
+ TezEdgeDescriptor edge =
pred.outEdges.remove(splittee.getOperatorKey());
+ if (edge == null) {
+ throw new VisitorException("Edge description is empty");
+ }
+ pred.outEdges.put(splitter.getOperatorKey(), edge);
+ splitter.inEdges.put(pred.getOperatorKey(), edge);
+ plan.disconnect(pred, splittee);
+ plan.connect(pred, splitter);
+ }
+ }
+
+ if (plan.getSuccessors(splittee) != null) {
+ List<TezOperator> succs = new
ArrayList<TezOperator>(plan.getSuccessors(splittee));
+ List<TezOperator> splitterSuccs = plan.getSuccessors(splitter);
for (TezOperator succTezOperator : succs) {
TezEdgeDescriptor edge =
succTezOperator.inEdges.get(splittee.getOperatorKey());
-
splitter.outEdges.remove(splittee.getOperatorKey());
succTezOperator.inEdges.remove(splittee.getOperatorKey());
plan.disconnect(splittee, succTezOperator);
- TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
+
+ // Do not connect again in case of self join/cross/cogroup or
union
+ if (splitterSuccs == null ||
!splitterSuccs.contains(succTezOperator)) {
+ TezCompilerUtil.connect(plan, splitter, succTezOperator,
edge);
+ }
try {
List<TezInput> inputs =
PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
for (TezInput input : inputs) {
-
input.replaceInput(splittee.getOperatorKey().toString(),
- splitter.getOperatorKey().toString());
+ input.replaceInput(spliteeKey,
+ splitterKey);
}
List<POUserFunc> userFuncs =
PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
for (POUserFunc userFunc : userFuncs) {
if (userFunc.getFunc() instanceof ReadScalarsTez) {
TezInput tezInput = (TezInput)userFunc.getFunc();
-
tezInput.replaceInput(splittee.getOperatorKey().toString(),
- splitter.getOperatorKey().toString());
+ tezInput.replaceInput(spliteeKey,
+ splitterKey);
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
}
}
@@ -173,9 +265,10 @@ public class MultiQueryOptimizerTez exte
}
if (succTezOperator.isUnion()) {
- int index =
succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey());
- if (index > -1) {
- succTezOperator.getUnionPredecessors().set(index,
splitter.getOperatorKey());
+ int index =
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
+ while (index > -1) {
+ succTezOperator.getUnionMembers().set(index,
splitter.getOperatorKey());
+ index =
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
}
}
}
@@ -183,7 +276,7 @@ public class MultiQueryOptimizerTez exte
plan.remove(splittee);
}
- static public void addSubPlanPropertiesToParent(TezOperator parentOper,
TezOperator subPlanOper) {
+ private void addSubPlanPropertiesToParent(TezOperator parentOper,
TezOperator subPlanOper) {
// Copy only map side properties. For eg: crossKeys.
// Do not copy reduce side specific properties. For eg:
useSecondaryKey, segmentBelow, sortOrder, etc
if (subPlanOper.getCrossKeys() != null) {
@@ -193,6 +286,11 @@ public class MultiQueryOptimizerTez exte
}
parentOper.copyFeatures(subPlanOper, null);
+ // For skewed join right input
+ if (subPlanOper.getSampleOperator() != null) {
+ parentOper.setSampleOperator(subPlanOper.getSampleOperator());
+ }
+
if (subPlanOper.getRequestedParallelism() >
parentOper.getRequestedParallelism()) {
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
Tue Apr 7 23:21:01 2015
@@ -157,7 +157,7 @@ public class TezOperDependencyParallelis
List<TezOperator> preds = plan.getPredecessors(tezOper);
for (TezOperator pred : preds) {
if (pred.isVertexGroup()) {
- for (OperatorKey unionPred : pred.getUnionPredecessors()) {
+ for (OperatorKey unionPred : pred.getVertexGroupMembers()) {
if (unionPred.toString().equals(inputKey)) {
return plan.getOperator(unionPred);
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Tue Apr 7 23:21:01 2015
@@ -19,11 +19,16 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.commons.lang.ArrayUtils;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -35,10 +40,12 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -63,8 +70,17 @@ import org.apache.tez.runtime.library.ou
*/
public class UnionOptimizer extends TezOpPlanVisitor {
+ private TezOperPlan tezPlan;
public UnionOptimizer(TezOperPlan plan) {
super(plan, new ReverseDependencyOrderWalker<TezOperator,
TezOperPlan>(plan));
+ tezPlan = plan;
+ }
+
+ public static boolean isOptimizable(TezOperator tezOp) {
+ if((tezOp.isLimit() || tezOp.isLimitAfterSort()) &&
tezOp.getRequestedParallelism() == 1) {
+ return false;
+ }
+ return true;
}
@Override
@@ -73,34 +89,66 @@ public class UnionOptimizer extends TezO
return;
}
- if((tezOp.isLimit() || tezOp.isLimitAfterSort()) &&
tezOp.getRequestedParallelism() == 1) {
+ if (!isOptimizable(tezOp)) {
return;
}
TezOperator unionOp = tezOp;
- String unionOpKey = unionOp.getOperatorKey().toString();
String scope = unionOp.getOperatorKey().scope;
- TezOperPlan tezPlan = getPlan();
+ PhysicalPlan unionOpPlan = unionOp.plan;
- //TODO: PIG-3856 Handle replicated join. Replicate join input that was
broadcast to union vertex
+ // TODO: PIG-3856 Handle replicated join and skewed join sample.
+ // Replicate join small table/skewed join sample that was broadcast to
union vertex
// now needs to be broadcast to all the union predecessors. How do we
do that??
// Wait for shared edge and do it or write multiple times??
- // For now don't optimize
- // Create a copy as disconnect while iterating modifies the original
list
+ // For now don't optimize except in the case of Split where we need to
write only once
+
+ Set<OperatorKey> uniqueUnionMembers = new
HashSet<OperatorKey>(unionOp.getUnionMembers());
List<TezOperator> predecessors = new
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
- if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
- return;
+ List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null
? null
+ : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+
+ if (predecessors.size() > unionOp.getUnionMembers().size()
+ && uniqueUnionMembers.size() != 1) {
+ return; // TODO: PIG-3856
}
+ if (uniqueUnionMembers.size() == 1) {
+ // We actually don't need VertexGroup in this case. The multiple
+ // sub-plans of Split can write to same MROutput or the Tez
LogicalOutput
+ OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
+ TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
+ PhysicalPlan splitPredPlan = splitPredOp.plan;
+ if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It
has to be. But check anyways
+
+ try {
+ connectUnionNonMemberPredecessorsToSplit(unionOp,
splitPredOp, predecessors);
+
+ // Remove POShuffledValueInputTez from union plan root
+ unionOpPlan.remove(unionOpPlan.getRoots().get(0));
+ // Clone union plan into split subplans
+ for (int i=0; i <
Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
+ cloneAndMergeUnionPlan(unionOp, splitPredOp);
+ }
+ copyOperatorProperties(splitPredOp, unionOp);
+ tezPlan.disconnect(splitPredOp, unionOp);
- PhysicalPlan unionOpPlan = unionOp.plan;
+ connectSplitOpToUnionSuccessors(unionOp, splitPredOp,
successors);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
- // Union followed by Split followed by Store could have multiple stores
+ //Remove union operator from the plan
+ tezPlan.remove(unionOp);
+ return;
+ } else {
+ throw new VisitorException("Expected POSplit but found " +
splitPredPlan.getLeaves().get(0));
+ }
+ }
+
+ // Create vertex group operator for each store. Union followed by Split
+ // followed by Store could have multiple stores
List<POStoreTez> unionStoreOutputs =
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
TezOperator[] storeVertexGroupOps = new
TezOperator[unionStoreOutputs.size()];
- List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
- // Create a copy as disconnect while iterating modifies the original
list
- List<TezOperator> successors = succs == null ? null : new
ArrayList<TezOperator>(succs);
-
for (int i=0; i < storeVertexGroupOps.length; i++) {
TezOperator existingVertexGroup = null;
if (successors != null) {
@@ -116,12 +164,13 @@ public class UnionOptimizer extends TezO
storeVertexGroupOps[i] = new
TezOperator(OperatorKey.genOpKey(scope));
storeVertexGroupOps[i].setVertexGroupInfo(new
VertexGroupInfo(unionStoreOutputs.get(i)));
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
-
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getUnionMembers());
tezPlan.add(storeVertexGroupOps[i]);
}
}
- // Case of split, orderby, skewed join, rank, etc will have multiple
outputs
+ // Create vertex group operator for each output. Case of split,
orderby,
+ // skewed join, rank, etc will have multiple outputs
List<TezOutput> unionOutputs =
PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
// One TezOutput can write to multiple LogicalOutputs (POCounterTez,
POValueOutputTez, etc)
List<String> unionOutputKeys = new ArrayList<String>();
@@ -133,133 +182,343 @@ public class UnionOptimizer extends TezO
unionOutputKeys.add(key);
}
}
-
- // Create vertex group operator for each output
TezOperator[] outputVertexGroupOps = new
TezOperator[unionOutputKeys.size()];
String[] newOutputKeys = new String[unionOutputKeys.size()];
for (int i=0; i < outputVertexGroupOps.length; i++) {
outputVertexGroupOps[i] = new
TezOperator(OperatorKey.genOpKey(scope));
outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
-
outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+
outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getUnionMembers());
newOutputKeys[i] =
outputVertexGroupOps[i].getOperatorKey().toString();
tezPlan.add(outputVertexGroupOps[i]);
}
+ // Change plan from Predecessors -> Union -> Successor(s) to
+ // Predecessors -> Vertex Group(s) -> Successor(s)
try {
-
- // Clone plan of union and merge it into the predecessor operators
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
- for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
+
+ for (OperatorKey predKey : unionOp.getUnionMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
- PhysicalPlan predPlan = pred.plan;
- PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
- // if predLeaf not POValueOutputTez
- if (predLeaf instanceof POSplit) {
- // Find the subPlan that connects to the union operator
- predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
- predLeaf = predPlan.getLeaves().get(0);
- }
+ PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
+ connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
+ storeVertexGroupOps, outputVertexGroupOps);
+ }
- PhysicalPlan clonePlan = unionOpPlan.clone();
- //Clone changes the operator keys
- List<POStoreTez> clonedUnionStoreOutputs =
PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
-
- // Remove POValueOutputTez from predecessor leaf
- predPlan.remove(predLeaf);
- boolean isEmptyPlan = predPlan.isEmpty();
- if (!isEmptyPlan) {
- predLeaf = predPlan.getLeaves().get(0);
- }
- predPlan.merge(clonePlan);
- if (!isEmptyPlan) {
- predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
- }
+ connectVertexGroupsToSuccessors(unionOp, successors,
+ unionOutputKeys, outputVertexGroupOps);
- // Connect predecessor to the storeVertexGroups
- int i = 0;
- for (TezOperator storeVertexGroup : storeVertexGroupOps) {
-
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
- //Set the output key of cloned POStore to that of the
initial union POStore.
- clonedUnionStoreOutputs.get(i).setOutputKey(
- storeVertexGroup.getVertexGroupInfo().getStore()
- .getOperatorKey().toString());
-
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
- storeVertexGroup.getOperatorKey());
- tezPlan.connect(pred, storeVertexGroup);
- }
+ replaceSuccessorInputsAndDisconnect(unionOp, successors,
unionOutputKeys, newOutputKeys);
- for (TezOperator outputVertexGroup : outputVertexGroupOps) {
-
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
- tezPlan.connect(pred, outputVertexGroup);
+ //Remove union operator from the plan
+ tezPlan.remove(unionOp);
+ } catch (VisitorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new VisitorException(e);
+ }
+
+ }
+
+ /**
+ * Connect the predecessors of the union which are not members of the union
+ * (usually FRJoin replicated table orSkewedJoin sample) to the Split op
+ * which is the only member of the union. Disconnect those predecessors
from the union.
+ *
+ * Replace the output keys of those predecessors with the split operator
+ * key instead of the union operator key.
+ *
+ * @param unionOp Union operator
+ * @param splitPredOp Split operator which is the only member of the union
and its predecessor
+ * @param unionPredecessors Predecessors of the union including the split
operator
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ private void connectUnionNonMemberPredecessorsToSplit(TezOperator unionOp,
+ TezOperator splitPredOp,
+ List<TezOperator> unionPredecessors) throws PlanException,
VisitorException {
+ String unionOpKey = unionOp.getOperatorKey().toString();
+ OperatorKey splitPredKey = splitPredOp.getOperatorKey();
+ for (TezOperator pred : unionPredecessors) {
+
+ if (!pred.getOperatorKey().equals(splitPredKey)) { //Skip
splitPredOp which is also a predecessor
+ // Get actual predecessors if predecessor is a vertex group
+ TezOperator predVertexGroup = null;
+ List<TezOperator> actualPreds = new ArrayList<TezOperator>();
+ if (pred.isVertexGroup()) {
+ predVertexGroup = pred;
+ for (OperatorKey opKey : pred.getVertexGroupMembers()) {
+ // There should not be multiple levels of vertex
group. So no recursion required.
+ actualPreds.add(tezPlan.getOperator(opKey));
+ }
+ tezPlan.disconnect(predVertexGroup, unionOp);
+ tezPlan.connect(predVertexGroup, splitPredOp);
+ } else {
+ actualPreds.add(pred);
}
- copyOperatorProperties(pred, unionOp);
- tezPlan.disconnect(pred, unionOp);
- }
+ for (TezOperator actualPred : actualPreds) {
+ List<TezOutput> tezOutputs =
PlanHelper.getPhysicalOperators(actualPred.plan,
+ TezOutput.class);
- List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
- for (TezOutput tezOutput : unionOutputs) {
- if (tezOutput instanceof POValueOutputTez) {
- valueOnlyOutputs.add(tezOutput);
+ for (TezOutput tezOut : tezOutputs) {
+ if (ArrayUtils.contains(tezOut.getTezOutputs(),
unionOpKey)) {
+ tezOut.replaceOutput(unionOpKey,
splitPredKey.toString());
+ }
+ }
+ TezEdgeDescriptor edge =
actualPred.outEdges.remove(unionOp.getOperatorKey());
+ if (edge == null) {
+ throw new VisitorException("Edge description is
empty");
+ }
+ actualPred.outEdges.put(splitPredKey, edge);
+ splitPredOp.inEdges.put(actualPred.getOperatorKey(), edge);
+ if (predVertexGroup == null) {
+ // Disconnect FRJoin table/SkewedJoin sample edge to
+ // union op and connect to POSplit
+ tezPlan.disconnect(actualPred, unionOp);
+ tezPlan.connect(actualPred, splitPredOp);
+ }
}
}
- // Connect to outputVertexGroupOps
- // Copy output edges of union -> successor to
predecessor->successor, vertexgroup -> successor
- // and connect vertexgroup -> successor in the plan.
- for (Entry<OperatorKey, TezEdgeDescriptor> entry :
unionOp.outEdges.entrySet()) {
- TezOperator succOp = tezPlan.getOperator(entry.getKey());
- // Case of union followed by union.
- // unionOp.outEdges will not point to vertex group, but to its
output.
- // So find the vertex group if there is one.
- TezOperator succOpVertexGroup = null;
- for (TezOperator succ : successors) {
- if (succ.isVertexGroup()
- && succ.getVertexGroupInfo().getOutput()
-
.equals(succOp.getOperatorKey().toString())) {
- succOpVertexGroup = succ;
- break;
+ }
+ }
+
+ /**
+ * Connect the split operator to the successors of the union operators and
update the edges.
+ * Also change the inputs of the successor from the union operator to the
split operator.
+ *
+ * @param unionOp Union operator
+ * @param splitPredOp Split operator which is the only member of the union
+ * @param successors Successors of the union operator
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ private void connectSplitOpToUnionSuccessors(TezOperator unionOp,
+ TezOperator splitPredOp, List<TezOperator> successors)
+ throws PlanException, VisitorException {
+ String unionOpKey = unionOp.getOperatorKey().toString();
+ String splitPredOpKey = splitPredOp.getOperatorKey().toString();
+ if (successors != null) {
+ for (TezOperator succ : successors) {
+ TezOperator successorVertexGroup = null;
+ boolean removeSuccessorVertexGroup = false;
+ List<TezOperator> actualSuccs = new ArrayList<TezOperator>();
+ if (succ.isVertexGroup()) {
+ successorVertexGroup = succ;
+ if (tezPlan.getSuccessors(successorVertexGroup) != null) {
+ // There should not be multiple levels of vertex
group. So no recursion required.
+
actualSuccs.addAll(tezPlan.getSuccessors(successorVertexGroup));
}
- }
- TezEdgeDescriptor edge = entry.getValue();
- // Edge cannot be one to one as it will get input from two or
- // more union predecessors. Change it to SCATTER_GATHER
- if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
- edge.dataMovementType = DataMovementType.SCATTER_GATHER;
- edge.partitionerClass = RoundRobinPartitioner.class;
- edge.outputClassName =
UnorderedPartitionedKVOutput.class.getName();
- edge.inputClassName = UnorderedKVInput.class.getName();
- }
- TezOperator vertexGroupOp =
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
- for (OperatorKey predKey :
vertexGroupOp.getVertexGroupMembers()) {
- TezOperator pred = tezPlan.getOperator(predKey);
- // Keep the output edge directly to successor
- // Don't need to keep output edge for vertexgroup
- pred.outEdges.put(entry.getKey(), edge);
- succOp.inEdges.put(predKey, edge);
- if (succOpVertexGroup != null) {
- succOpVertexGroup.getVertexGroupMembers().add(predKey);
-
succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
- // Connect directly to the successor vertex group
- tezPlan.disconnect(pred, vertexGroupOp);
- tezPlan.connect(pred, succOpVertexGroup);
+ int index =
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+ while (index > -1) {
+ succ.getVertexGroupMembers().set(index,
splitPredOp.getOperatorKey());
+ index =
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+ }
+ tezPlan.disconnect(unionOp, successorVertexGroup);
+ Set<OperatorKey> uniqueVertexGroupMembers = new
HashSet<OperatorKey>(succ.getVertexGroupMembers());
+ if (uniqueVertexGroupMembers.size() == 1) {
+ //Only splitPredOp is member of the vertex group. Get
rid of the vertex group
+ removeSuccessorVertexGroup = true;
+ } else {
+ tezPlan.connect(splitPredOp, successorVertexGroup);
}
- }
- if (succOpVertexGroup != null) {
-
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
-
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
- //Discard the new vertex group created
- tezPlan.remove(vertexGroupOp);
} else {
- tezPlan.connect(vertexGroupOp, succOp);
+ actualSuccs.add(succ);
+ }
+
+ for (TezOperator actualSucc : actualSuccs) {
+ LinkedList<TezInput> inputs =
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
+ for (TezInput tezInput : inputs) {
+ for (String inputKey : tezInput.getTezInputs()) {
+ if (inputKey.equals(unionOpKey)) {
+ tezInput.replaceInput(inputKey,
splitPredOpKey);
+ }
+ }
+ }
+
+ List<POUserFunc> userFuncs =
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ TezInput tezInput = (TezInput)userFunc.getFunc();
+ for (String inputKey : tezInput.getTezInputs()) {
+ if (inputKey.equals(unionOpKey)) {
+ tezInput.replaceInput(inputKey,
splitPredOpKey);
+
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+ }
+ }
+ }
+ }
+
+ TezEdgeDescriptor edge =
actualSucc.inEdges.remove(unionOp.getOperatorKey());
+ if (edge == null) {
+ throw new VisitorException("Edge description is
empty");
+ }
+ actualSucc.inEdges.put(splitPredOp.getOperatorKey(), edge);
+ splitPredOp.outEdges.put(actualSucc.getOperatorKey(),
edge);
+ if (successorVertexGroup == null ||
removeSuccessorVertexGroup) {
+ if (removeSuccessorVertexGroup) {
+ // Changes plan from SplitOp -> Union ->
VertexGroup - > Successor
+ // to SplitOp -> Successor
+ tezPlan.disconnect(successorVertexGroup,
actualSucc);
+ tezPlan.remove(successorVertexGroup);
+ } else {
+ // Changes plan from SplitOp -> Union -> Successor
+ // to SplitOp -> Successor
+ tezPlan.disconnect(unionOp, actualSucc);
+ }
+ tezPlan.connect(splitPredOp, actualSucc);
+ }
}
}
+ }
+ }
+
+ /**
+ * Clone plan of union and merge it into the predecessor operator
+ *
+ * @param unionOp Union operator
+ * @param predOp Predecessor operator of union to which union plan should
be merged to
+ */
+ private PhysicalPlan cloneAndMergeUnionPlan(TezOperator unionOp,
TezOperator predOp) throws VisitorException {
+ try {
+ PhysicalPlan predPlan = predOp.plan;
+ PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+ // if predLeaf not POValueOutputTez
+ if (predLeaf instanceof POSplit) {
+ // Find the subPlan that connects to the union operator
+ predPlan = getUnionPredPlanFromSplit(predPlan,
unionOp.getOperatorKey().toString());
+ predLeaf = predPlan.getLeaves().get(0);
+ }
+ PhysicalPlan clonePlan = unionOp.plan.clone();
+
+ // Remove POValueOutputTez from predecessor leaf
+ predPlan.remove(predLeaf);
+ boolean isEmptyPlan = predPlan.isEmpty();
+ if (!isEmptyPlan) {
+ predLeaf = predPlan.getLeaves().get(0);
+ }
+ predPlan.merge(clonePlan);
+ if (!isEmptyPlan) {
+ predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+ }
+ return clonePlan;
} catch (Exception e) {
throw new VisitorException(e);
}
+ }
+
+ /**
+ * Connects the unionOp predecessor to the store vertex groups and the
output vertex groups
+ * and disconnects it from the unionOp.
+ *
+ * @param pred Predecessor of union which will be made part of the vertex
group
+ * @param unionOp Union operator
+ * @param predClonedUnionPlan Cloned plan of the union merged to the
predecessor
+ * @param storeVertexGroupOps Store vertex groups to connect to
+ * @param outputVertexGroupOps Tez LogicalOutput vertex groups to connect
to
+ */
+ public void connectPredecessorsToVertexGroups(TezOperator unionOp,
+ TezOperator pred, PhysicalPlan predClonedUnionPlan,
+ TezOperator[] storeVertexGroupOps,
+ TezOperator[] outputVertexGroupOps) throws
VisitorException,PlanException {
+
+ //Clone changes the operator keys
+ List<POStoreTez> clonedUnionStoreOutputs =
PlanHelper.getPhysicalOperators(predClonedUnionPlan, POStoreTez.class);
+
+ // Connect predecessor to the storeVertexGroups
+ int i = 0;
+ for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
+ storeVertexGroup.getOperatorKey());
+ tezPlan.connect(pred, storeVertexGroup);
+ }
+
+ for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+ tezPlan.connect(pred, outputVertexGroup);
+ }
+ copyOperatorProperties(pred, unionOp);
+ tezPlan.disconnect(pred, unionOp);
+ }
+
+ /**
+ * Connect vertexgroup operator to successor operator in the plan.
+ *
+ * Copy the output edge between union operator and successor to between
+ * predecessors and successor. Predecessor output key and output edge
points
+ * to successor so that we have all the edge configuration, but they are
+ * connected to the vertex group in the plan.
+ *
+ * @param unionOp Union operator
+ * @param successors Successors of the union operator
+ * @param unionOutputKeys Output keys of union
+ * @param outputVertexGroupOp Tez LogicalOutput vertex groups
corresponding to the output keys
+ *
+ * @throws PlanException
+ */
+ private void connectVertexGroupsToSuccessors(TezOperator unionOp,
+ List<TezOperator> successors, List<String> unionOutputKeys,
+ TezOperator[] outputVertexGroupOps) throws PlanException {
+ // Connect to outputVertexGroupOps
+ for (Entry<OperatorKey, TezEdgeDescriptor> entry :
unionOp.outEdges.entrySet()) {
+ TezOperator succOp = tezPlan.getOperator(entry.getKey());
+ // Case of union followed by union.
+ // unionOp.outEdges will not point to vertex group, but to its
output.
+ // So find the vertex group if there is one.
+ TezOperator succOpVertexGroup = null;
+ for (TezOperator succ : successors) {
+ if (succ.isVertexGroup()
+ && succ.getVertexGroupInfo().getOutput()
+ .equals(succOp.getOperatorKey().toString())) {
+ succOpVertexGroup = succ;
+ break;
+ }
+ }
+ TezEdgeDescriptor edge = entry.getValue();
+ // Edge cannot be one to one as it will get input from two or
+ // more union predecessors. Change it to SCATTER_GATHER
+ if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+ edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+ edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.outputClassName =
UnorderedPartitionedKVOutput.class.getName();
+ edge.inputClassName = UnorderedKVInput.class.getName();
+ }
+ TezOperator vertexGroupOp =
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+ for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+ TezOperator pred = tezPlan.getOperator(predKey);
+ // Keep the output edge directly to successor
+ // Don't need to keep output edge for vertexgroup
+ pred.outEdges.put(entry.getKey(), edge);
+ succOp.inEdges.put(predKey, edge);
+ if (succOpVertexGroup != null) {
+ succOpVertexGroup.getVertexGroupMembers().add(predKey);
+ succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+ // Connect directly to the successor vertex group
+ tezPlan.disconnect(pred, vertexGroupOp);
+ tezPlan.connect(pred, succOpVertexGroup);
+ }
+ }
+ if (succOpVertexGroup != null) {
+
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+ //Discard the new vertex group created
+ tezPlan.remove(vertexGroupOp);
+ } else {
+ tezPlan.connect(vertexGroupOp, succOp);
+ }
+ }
+ }
+
+ private void replaceSuccessorInputsAndDisconnect(TezOperator unionOp,
+ List<TezOperator> successors,
+ List<String> unionOutputKeys,
+ String[] newOutputKeys)
+ throws VisitorException {
if (successors != null) {
+ String unionOpKey = unionOp.getOperatorKey().toString();
// Successor inputs should now point to the vertex groups.
for (TezOperator succ : successors) {
LinkedList<TezInput> inputs =
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
@@ -271,16 +530,27 @@ public class UnionOptimizer extends TezO
}
}
}
+
+ List<POUserFunc> userFuncs =
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ if (userFunc.getFunc() instanceof ReadScalarsTez) {
+ TezInput tezInput = (TezInput)userFunc.getFunc();
+ for (String inputKey : tezInput.getTezInputs()) {
+ if (inputKey.equals(unionOpKey)) {
+ tezInput.replaceInput(inputKey,
+
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
+
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+ }
+ }
+ }
+ }
+
tezPlan.disconnect(unionOp, succ);
}
}
-
- //Remove union operator from the plan
- tezPlan.remove(unionOp);
-
}
- private void copyOperatorProperties(TezOperator pred, TezOperator unionOp)
{
+ private void copyOperatorProperties(TezOperator pred, TezOperator unionOp)
throws VisitorException {
pred.UDFs.addAll(unionOp.UDFs);
pred.scalars.addAll(unionOp.scalars);
// Copy only map side properties. For eg: crossKeys.
@@ -292,6 +562,17 @@ public class UnionOptimizer extends TezO
}
}
pred.copyFeatures(unionOp, Arrays.asList(new
OPER_FEATURE[]{OPER_FEATURE.UNION}));
+
+ // For skewed join right input
+ if (unionOp.getSampleOperator() != null) {
+ if (pred.getSampleOperator() == null) {
+ pred.setSampleOperator(unionOp.getSampleOperator());
+ } else if
(!pred.getSampleOperator().equals(unionOp.getSampleOperator())) {
+ throw new VisitorException("Conflicting sample operators "
+ + pred.getSampleOperator().toString() + " and "
+ + unionOp.getSampleOperator().toString());
+ }
+ }
}
public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan,
String unionOpKey) throws VisitorException {
Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL:
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Tue Apr 7 23:21:01 2015
@@ -52,6 +52,21 @@
# - _TEST_ Streaming with multiple stores.
# - _TEST_ Streaming in demux.
# - _TEST_ Streaming in nested demux.
+# MultiQuery_Union (Also refer Union in nightly.conf)
+# - _TEST_ Multiple levels of union with join
+# - _TEST_ Union with replicate join left table part of split
+# - _TEST_ Union with replicate join right table part of split
+# - _TEST_ Union with skewed join left table part of split
+# - _TEST_ Union with skewed join right table part of split
+# - _TEST_ Union with group by + combiner
+# - _TEST_ Union with group by + secondary key partitioner
+# - _TEST_ Union with order by
+# MultiQuery_Self
+# - _TEST_ Self cross
+# - _TEST_ Self cogroup
+# - _TEST_ Three way join (two self)
+# - _TEST_ Self replicate join
+# - _TEST_ Self skewed join
$cfg = {
@@ -554,7 +569,168 @@ $cfg = {
},
] # end of tests
},
+
+ {
+ 'name' => 'MultiQuery_Union',
+ 'floatpostprocess' => 1,
+ 'delimiter' => ' ',
+ 'tests' => [
+ {
+ # Multiple levels of union + join
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name,
age, gpa:float);
+c = filter a by gpa >= 4;
+c1 = foreach c generate *;
+c2 = foreach c generate *;
+c3 = union c1, c2;
+d = filter a by gpa < 4;
+d1 = foreach d generate *;
+d2 = foreach d generate *;
+d3 = union d1, d2;
+a1 = union c3, d3;
+e = join a1 by name, b by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Replicate Join left
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+e = join c by name, d by name using 'replicated';
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Replicate Join right
+ 'num' => 3,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+e = join d by name, c by name using 'replicated';
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Skewed Join left
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+e = join c by name, d by name using 'skewed' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Skewed Join right
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+e = join d by name, c by name using 'skewed' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Groupby + Combiner
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Groupby + Secondary key partitioner
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d { f = order c by $1,$2; generate group, f; };
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Union + Orderby
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = order c by name PARALLEL 3;
+store d into ':OUTPATH:';\,
+ 'sortArgs' => ['-t', ' ', '-k', '1,1'],
+ },
+ ] # end of tests
+ },
+
+ {
+ 'name' => 'MultiQuery_Self',
+ 'floatpostprocess' => 1,
+ 'delimiter' => ' ',
+ 'tests' => [
+ # Self cross
+ {
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa <= 0.5;
+d = filter a by gpa >= 3.5 and gpa < 3.9;
+e = filter a by gpa > 0.5 and gpa < 1;
+f = CROSS b, c PARALLEL 3;
+g = CROSS d, e PARALLEL 4;
+store f into ':OUTPATH:.1';
+store g into ':OUTPATH:.2';\,
+ },
+ {
+ # Self cogroup
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = cogroup c by name, b by name;
+e = foreach d generate flatten(c), flatten(b);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Three way join (two self)
+ 'num' => 3,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration,
contributions);
+e = join b by name, c by name, d by name PARALLEL 2;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Self join replicated
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'replicated';
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join skewed
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name,
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'skewed' PARALLEL 2;
+store d into ':OUTPATH:';\,
+ },
+ ] # end of tests
+ },
] # end of groups
}
-;
\ No newline at end of file
+;