Author: daijy
Date: Wed Jan 21 21:53:42 2015
New Revision: 1653670
URL: http://svn.apache.org/r1653670
Log:
PIG-4352: Port local mode tests to Tez - TestUnionOnSchema
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
pig/trunk/test/tez-local-tests
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 21 21:53:42 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4352: Port local mode tests to Tez - TestUnionOnSchema (daijy)
+
PIG-4359: Port local mode tests to Tez - part4 (daijy)
PIG-4340: PigStorage fails parsing empty map (daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
Wed Jan 21 21:53:42 2015
@@ -608,6 +608,7 @@ public class TezOperator extends Operato
private POStore store;
private OutputDescriptor storeOutDescriptor;
private VertexGroup vertexGroup;
+ private FileSpec sFile;
public VertexGroupInfo() {
}
@@ -659,6 +660,13 @@ public class TezOperator extends Operato
this.vertexGroup = vertexGroup;
}
+ public void setSFile(FileSpec sFile) {
+ this.sFile = sFile;
+ }
+
+ public FileSpec getSFile() {
+ return sFile;
+ }
}
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
Wed Jan 21 21:53:42 2015
@@ -97,11 +97,28 @@ public class UnionOptimizer extends TezO
// Union followed by Split followed by Store could have multiple stores
List<POStoreTez> unionStoreOutputs =
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
TezOperator[] storeVertexGroupOps = new
TezOperator[unionStoreOutputs.size()];
+ List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
+ // Create a copy as disconnect while iterating modifies the original
list
+ List<TezOperator> successors = succs == null ? null : new
ArrayList<TezOperator>(succs);
+
for (int i=0; i < storeVertexGroupOps.length; i++) {
- storeVertexGroupOps[i] = new
TezOperator(OperatorKey.genOpKey(scope));
- storeVertexGroupOps[i].setVertexGroupInfo(new
VertexGroupInfo(unionStoreOutputs.get(i)));
-
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
- tezPlan.add(storeVertexGroupOps[i]);
+ TezOperator existingVertexGroup = null;
+ if (successors != null) {
+ for (TezOperator succ : successors) {
+ if (succ.isVertexGroup() &&
succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile()))
{
+ existingVertexGroup = succ;
+ }
+ }
+ }
+ if (existingVertexGroup != null) {
+ storeVertexGroupOps[i] = existingVertexGroup;
+ } else {
+ storeVertexGroupOps[i] = new
TezOperator(OperatorKey.genOpKey(scope));
+ storeVertexGroupOps[i].setVertexGroupInfo(new
VertexGroupInfo(unionStoreOutputs.get(i)));
+
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
+
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+ tezPlan.add(storeVertexGroupOps[i]);
+ }
}
// Case of split, orderby, skewed join, rank, etc will have multiple
outputs
@@ -182,7 +199,6 @@ public class UnionOptimizer extends TezO
tezPlan.disconnect(pred, unionOp);
}
- List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
for (TezOutput tezOutput : unionOutputs) {
if (tezOutput instanceof POValueOutputTez) {
@@ -243,9 +259,6 @@ public class UnionOptimizer extends TezO
throw new VisitorException(e);
}
- List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
- // Create a copy as disconnect while iterating modifies the original
list
- List<TezOperator> successors = succs == null ? null : new
ArrayList<TezOperator>(succs);
if (successors != null) {
// Successor inputs should now point to the vertex groups.
for (TezOperator succ : successors) {
Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Wed Jan 21
21:53:42 2015
@@ -28,7 +28,6 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
@@ -101,8 +100,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaSameSchema() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaSameSchema() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
@@ -133,8 +132,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaFilter() throws IOException, ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaFilter() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
@@ -166,8 +165,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaSuccOps() throws IOException, ParserException
{
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaSuccOps() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);"
@@ -199,8 +198,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaCastOnByteArray() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaCastOnByteArray() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i, j);"
+ " f1 = foreach l1 generate (int)i, (int)j;"
@@ -228,8 +227,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaScopedColumnName() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaScopedColumnName() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "g = group l1 by i; "
@@ -271,8 +270,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaScopedColumnNameBothInp1() throws
IOException, ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "g1 = group l1 by i; "
@@ -307,8 +306,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaScopedColumnNameBothInp2() throws
IOException, ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x :
chararray); "
@@ -345,7 +344,7 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaScopedColumnNameNeg() throws IOException,
ParserException {
+ public void testUnionOnSchemaScopedColumnNameNeg() throws Exception {
String expectedErr = "Found more than one match: l1::i, l2::i";
String query_prefix =
@@ -371,8 +370,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaDiffNumType() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaDiffNumType() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : double);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : float);"
@@ -401,8 +400,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaNoCommonCols() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaNoCommonCols() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
@@ -429,8 +428,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaAdditionalColumn() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaAdditionalColumn() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
@@ -458,8 +457,8 @@ public class TestUnionOnSchema {
}
@Test
- public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws
IOException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws
Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
Data data = Storage.resetData(pig);
// Use batch to force multiple outputs from relation l3. This causes
@@ -503,8 +502,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchema3Inputs() throws IOException, ParserException
{
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchema3Inputs() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : double, x : int); "
@@ -538,8 +537,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaByteArrayConversions() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaByteArrayConversions() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : bytearray, x : bytearray, j : bytearray "
@@ -577,7 +576,7 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaNoSchema() throws IOException,
ParserException {
+ public void testUnionOnSchemaNoSchema() throws Exception {
String expectedErr = "UNION ONSCHEMA cannot be used with " +
"relations that have null schema";
String query =
@@ -602,7 +601,7 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaNullAliasInFieldSchema() throws IOException,
ParserException {
+ public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception {
String expectedErr = "Schema of relation f has a null fieldschema for
" +
"column(s). Schema ::long,y:float";
String query =
@@ -615,8 +614,8 @@ public class TestUnionOnSchema {
}
- private void checkSchemaEx(String query, String expectedErr) throws
IOException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ private void checkSchemaEx(String query, String expectedErr) throws
Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
boolean foundEx = false;
try{
@@ -645,7 +644,7 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaIncompatibleTypes() throws IOException,
ParserException {
+ public void testUnionOnSchemaIncompatibleTypes() throws Exception {
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y :
chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
@@ -693,15 +692,15 @@ public class TestUnionOnSchema {
}
- private void checkSchemaEquals(String query, Schema expectedSch) throws
IOException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ private void checkSchemaEquals(String query, Schema expectedSch) throws
Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
Util.registerMultiLineQuery(pig, query);
Schema sch = pig.dumpSchema("u");
assertEquals("Checking expected schema", expectedSch, sch);
}
- private void checkSchemaEquals(String query, String schemaStr) throws
IOException, ParserException {
+ private void checkSchemaEquals(String query, String schemaStr) throws
Exception {
Schema expectedSch = Utils.getSchemaFromString(schemaStr);
checkSchemaEquals(query, expectedSch);
}
@@ -713,8 +712,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaInputUdfs() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaInputUdfs() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
@@ -750,8 +749,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaUdfTypeEvolution() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaUdfTypeEvolution() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
@@ -802,8 +801,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaUdfTypeEvolution2() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaUdfTypeEvolution2() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+ " (i : int, c : chararray, j : int "
@@ -874,8 +873,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testUnionOnSchemaScopeMulti() throws IOException,
ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testUnionOnSchemaScopeMulti() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query_prefix =
" a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); "
+ "b = group a by i; "
@@ -921,8 +920,8 @@ public class TestUnionOnSchema {
* @throws ParserException
*/
@Test
- public void testTwoUnions() throws IOException, ParserException {
- PigServer pig = new PigServer(ExecType.LOCAL);
+ public void testTwoUnions() throws Exception {
+ PigServer pig = new PigServer(Util.getLocalTestMode());
String query =
" l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
+ "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);"
Modified: pig/trunk/test/tez-local-tests
URL:
http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/test/tez-local-tests (original)
+++ pig/trunk/test/tez-local-tests Wed Jan 21 21:53:42 2015
@@ -83,3 +83,4 @@
**/TestStoreLocal.java
**/TestPOPartialAggPlanTez.java
**/TestMultiQueryLocal.java
+**/TestUnionOnSchema.java