Author: daijy
Date: Tue Jan 20 22:16:33 2015
New Revision: 1653391
URL: http://svn.apache.org/r1653391
Log:
PIG-4366: Port local mode tests to Tez - part5
Added:
pig/trunk/test/org/apache/pig/test/TestJoinBase.java
pig/trunk/test/org/apache/pig/test/TestJoinLocal.java
pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java
pig/trunk/test/org/apache/pig/test/TestScalarAliasesLocal.java
pig/trunk/test/org/apache/pig/tez/TestPigStatsTez.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
pig/trunk/test/excluded-tests-20
pig/trunk/test/org/apache/pig/test/TestCubeOperator.java
pig/trunk/test/org/apache/pig/test/TestJoin.java
pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
pig/trunk/test/org/apache/pig/test/TestPigServer.java
pig/trunk/test/org/apache/pig/test/TestPigStats.java
pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
pig/trunk/test/org/apache/pig/test/TestRank3.java
pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/tez-local-tests
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 20 22:16:33 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4366: Port local mode tests to Tez - part5 (daijy)
+
PIG-4381: PIG grunt shell DEFINE commands fails when it spans multiple lines
(daijy)
PIG-4384: TezLauncher thread should be deamon thread (zjffdu via daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
Tue Jan 20 22:16:33 2015
@@ -108,23 +108,24 @@ public class POReservoirSample extends P
}
}
- Random randGen = new Random();
+ if (res.returnStatus != POStatus.STATUS_EOP) {
+ Random randGen = new Random();
+ while (true) {
+ // pick this as sample
+ res = processInput();
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else if (res.returnStatus != POStatus.STATUS_OK) {
+ break;
+ }
- while (true) {
- // pick this as sample
- res = processInput();
- if (res.returnStatus == POStatus.STATUS_NULL) {
- continue;
- } else if (res.returnStatus != POStatus.STATUS_OK) {
- break;
+ // collect samples until input is exhausted
+ int rand = randGen.nextInt(rowProcessed);
+ if (rand < numSamples) {
+ samples[rand] = res;
+ }
+ rowProcessed++;
}
-
- // collect samples until input is exhausted
- int rand = randGen.nextInt(rowProcessed);
- if (rand < numSamples) {
- samples[rand] = res;
- }
- rowProcessed++;
}
if (this.parentPlan.endOfAllInput && res.returnStatus ==
POStatus.STATUS_EOP) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Tue Jan 20 22:16:33 2015
@@ -297,9 +297,11 @@ public class TezCompiler extends PhyPlan
userFunc.getInputs().remove(1);
}
- TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan,
from, tezOp);
- //TODO shared edge once support is available in Tez
- TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
+ if (tezPlan.getPredecessors(tezOp)==null ||
!tezPlan.getPredecessors(tezOp).contains(from)) {
+ TezEdgeDescriptor edge =
TezCompilerUtil.connect(tezPlan, from, tezOp);
+ //TODO shared edge once support is available in Tez
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge,
DataMovementType.BROADCAST);
+ }
}
}
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
Tue Jan 20 22:16:33 2015
@@ -18,8 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
@@ -54,7 +56,6 @@ public class MultiQueryOptimizerTez exte
List<TezOperator> splittees = new ArrayList<TezOperator>();
List<TezOperator> successors = getPlan().getSuccessors(tezOp);
- List<TezOperator> succ_successors = new ArrayList<TezOperator>();
for (TezOperator successor : successors) {
// If has other dependency, don't merge into split,
@@ -65,20 +66,21 @@ public class MultiQueryOptimizerTez exte
// Detect diamond shape, we cannot merge it into split, since
Tez
// does not handle double edge between vertexes
// TODO: PIG-3876 to handle this by writing to same edge
- boolean sharedSucc = false;
- if (getPlan().getSuccessors(successor)!=null) {
- for (TezOperator succ_successor :
getPlan().getSuccessors(successor)) {
- if (succ_successors.contains(succ_successor)) {
- sharedSucc = true;
- break;
- }
+ Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
+ Set<TezOperator> toMergeSuccessors = new
HashSet<TezOperator>();
+ mergedSuccessors.addAll(successors);
+ for (TezOperator splittee : splittees) {
+ if (getPlan().getSuccessors(splittee) != null) {
+
mergedSuccessors.addAll(getPlan().getSuccessors(splittee));
}
- succ_successors.addAll(getPlan().getSuccessors(successor));
}
- if (sharedSucc) {
- continue;
+ if (getPlan().getSuccessors(successor) != null) {
+
toMergeSuccessors.addAll(getPlan().getSuccessors(successor));
+ }
+ mergedSuccessors.retainAll(toMergeSuccessors);
+ if (mergedSuccessors.isEmpty()) { // no shared edge after merge
+ splittees.add(successor);
}
- splittees.add(successor);
}
if (splittees.size()==0) {
Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Tue Jan 20 22:16:33
2015
@@ -459,7 +459,7 @@ private MockRecordWriter mockRecordWrite
@Override
public void putNext(Tuple t) throws IOException {
- mockRecordWriter.dataBeingWritten.add(t);
+ mockRecordWriter.dataBeingWritten.add(TF.newTuple(t.getAll()));
}
@Override
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Tue Jan
20 22:16:33 2015
@@ -203,7 +203,7 @@ public class TezScriptState extends Scri
return dagScriptInfo.get(dagName);
}
- static class TezDAGScriptInfo {
+ public static class TezDAGScriptInfo {
private static final Log LOG =
LogFactory.getLog(TezDAGScriptInfo.class);
private TezOperPlan tezPlan;
Modified: pig/trunk/test/excluded-tests-20
URL:
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-20?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-20 (original)
+++ pig/trunk/test/excluded-tests-20 Tue Jan 20 22:16:33 2015
@@ -6,3 +6,4 @@
**/TestJobSubmissionTez.java
**/TestGroupConstParallelTez.java
**/TestLoaderStorerShipCacheFilesTez.java
+**/TestPigStatsTez.java
Modified: pig/trunk/test/org/apache/pig/test/TestCubeOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCubeOperator.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCubeOperator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCubeOperator.java Tue Jan 20
22:16:33 2015
@@ -39,6 +39,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.newplan.Operator;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -53,7 +54,7 @@ public class TestCubeOperator {
@BeforeClass
public static void oneTimeSetUp() throws Exception {
- pigServer = new PigServer("local");
+ pigServer = new PigServer(Util.getLocalTestMode());
}
@Before
@@ -622,6 +623,18 @@ public class TestCubeOperator {
}
@Test
+ public void testIllustrate() throws Exception {
+ // test for illustrate
+ Assume.assumeTrue("illustrate does not work in tez (PIG-3993)",
!Util.getLocalTestMode().toString().startsWith("TEZ"));
+ String query = "a = load 'input' USING mock.Storage() as
(a1:chararray,b1:chararray,c1:long); "
+ + "b = cube a by cube(a1,b1);";
+
+ Util.registerMultiLineQuery(pigServer, query);
+ Map<Operator, DataBag> examples = pigServer.getExamples("b");
+ assertTrue(examples != null);
+ }
+
+ @Test
public void testRollupHIIAfterCogroup() throws IOException {
// test for cubing on co-grouped relation
String query = "a = load 'input1' USING mock.Storage() as
(a1:chararray,b1,c1,d1); "
@@ -652,17 +665,6 @@ public class TestCubeOperator {
}
@Test
- public void testIllustrate() throws IOException {
- // test for illustrate
- String query = "a = load 'input' USING mock.Storage() as
(a1:chararray,b1:chararray,c1:long); "
- + "b = cube a by cube(a1,b1);";
-
- Util.registerMultiLineQuery(pigServer, query);
- Map<Operator, DataBag> examples = pigServer.getExamples("b");
- assertTrue(examples != null);
- }
-
- @Test
public void testExplainCube() throws IOException {
// test for explain
String query = "a = load 'input' USING mock.Storage() as
(a1:chararray,b1:chararray,c1:long); "
Modified: pig/trunk/test/org/apache/pig/test/TestJoin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJoin.java Tue Jan 20 22:16:33 2015
@@ -15,95 +15,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.pig.test;
-import static org.apache.pig.builtin.mock.Storage.resetData;
-import static org.apache.pig.builtin.mock.Storage.tuple;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import java.io.File;
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigException;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.mock.Storage.Data;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.Utils;
-import org.apache.pig.newplan.Operator;
-import org.apache.pig.newplan.logical.relational.LOJoin;
-import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
-import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.parser.ParserException;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Sets;
-
/**
* Test cases to test join statement
*/
-public class TestJoin {
-
+public class TestJoin extends TestJoinBase {
private static MiniGenericCluster cluster =
MiniGenericCluster.buildCluster();
- private PigServer pigServer;
-
- TupleFactory mTf = TupleFactory.getInstance();
- BagFactory mBf = BagFactory.getInstance();
- private static ExecType[] execTypes = new ExecType[] {ExecType.LOCAL,
cluster.getExecType()};
@AfterClass
public static void oneTimeTearDown() throws Exception {
if (cluster != null) cluster.shutDown();
}
- private void setUp(ExecType execType) throws ExecException {
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
- } else if(execType == ExecType.LOCAL) {
- pigServer = new PigServer(ExecType.LOCAL);
- }
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
}
- private String createInputFile(ExecType execType, String fileNameHint,
String[] data) throws IOException {
+ protected String createInputFile(String fileNameHint, String[] data)
throws IOException {
String fileName = "";
- if(execType == cluster.getExecType()) {
- Util.createInputFile(cluster, fileNameHint, data);
- fileName = fileNameHint;
- } else if (execType == ExecType.LOCAL) {
- File f = Util.createInputFile("test", fileNameHint, data);
- fileName = Util.generateURI(f.getAbsolutePath(),
pigServer.getPigContext());
- }
+ Util.createInputFile(cluster, fileNameHint, data);
+ fileName = fileNameHint;
return fileName;
}
- private void deleteInputFile(ExecType execType, String fileName) throws
IOException {
- if(execType == cluster.getExecType()) {
- Util.deleteFile(cluster, fileName);
- } else if(execType == ExecType.LOCAL){
- fileName = fileName.replace("file://", "");
- new File(fileName).delete();
- }
+ protected void deleteInputFile(String fileName) throws Exception {
+ Util.deleteFile(cluster, fileName);
}
@Test
- public void testJoinWithMissingFieldsInTuples() throws IOException{
- setUp(cluster.getExecType());
+ public void testJoinWithMissingFieldsInTuples() throws Exception{
String[] input1 = {
"ff ff ff",
"",
@@ -122,615 +76,15 @@ public class TestJoin {
""
};
- String firstInput = createInputFile(cluster.getExecType(), "a.txt",
input1);
- String secondInput = createInputFile(cluster.getExecType(), "b.txt",
input2);
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
String script = "a = load 'a.txt' using PigStorage(' ');" +
"b = load 'b.txt' using PigStorage('\u0001');" +
"c = join a by $0, b by $0;";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
assertFalse(it.hasNext());
- deleteInputFile(cluster.getExecType(), firstInput);
- deleteInputFile(cluster.getExecType(), secondInput);
- }
-
- @Test
- public void testJoinUnkownSchema() throws Exception {
- // If any of the input schema is unknown, the resulting schema should
be unknown as well
- for (ExecType execType : execTypes) {
- setUp(execType);
- String script = "a = load 'a.txt';" +
- "b = load 'b.txt'; " +
- "c = join a by $0, b by $0;";
- Util.registerMultiLineQuery(pigServer, script);
- Schema schema = pigServer.dumpSchema("c");
- assertNull(schema);
- }
- }
-
- @Test
- public void testDefaultJoin() throws IOException, ParserException {
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- Tuple expectedResult =
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
-
- // with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(n:chararray, a:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); " +
- "c = join a by $0, b by $0;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("c");
- assertTrue(it.hasNext());
- assertEquals(expectedResult, it.next());
- assertFalse(it.hasNext());
-
- // without schema
- script = "a = load '"+ Util.encodeEscape(firstInput) + "'; " +
- "b = load '" + Util.encodeEscape(secondInput) + "'; " +
- "c = join a by $0, b by $0;";
- Util.registerMultiLineQuery(pigServer, script);
- it = pigServer.openIterator("c");
- assertTrue(it.hasNext());
- assertEquals(expectedResult.toString(), it.next().toString());
- assertFalse(it.hasNext());
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
-
- @Test
- public void testJoinSchema() throws Exception {
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "1\t2",
- "2\t3",
- "3\t4"
- };
- String[] input2 = {
- "1\thello",
- "4\tbye",
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- Tuple expectedResult =
(Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
-
- // with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(i:int, j:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(k:int, l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate i,j,k,l,a::i as ai,a::j as aj,b::k
as bk,b::l as bl;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- assertTrue(it.hasNext());
- assertEquals(expectedResult, it.next());
- assertFalse(it.hasNext());
-
- // schema with duplicates
- script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(i:int, j:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int,
l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate i,j,l,a::i as ai,a::j as aj,b::i as
bi,b::l as bl;";
- boolean exceptionThrown = false;
- try{
- Util.registerMultiLineQuery(pigServer, script);
- pigServer.openIterator("d");
- }catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1025, pe.getErrorCode());
- exceptionThrown = true;
- }
- assertTrue(exceptionThrown);
-
- // schema with duplicates with resolution
- script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(i:int, j:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int,
l:chararray); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate a::i as ai1,j,b::i as bi1,l,a::i as
ai2,a::j as aj2,b::i as bi3,b::l as bl3;";
- Util.registerMultiLineQuery(pigServer, script);
- it = pigServer.openIterator("d");
- assertTrue(it.hasNext());
- assertEquals(expectedResult, it.next());
- assertFalse(it.hasNext());
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testJoinSchema2() throws Exception {
- // test join where one load does not have schema
- ExecType execType = ExecType.LOCAL;
- setUp(execType );
- String[] input1 = {
- "1\t2",
- "2\t3",
- "3\t4"
- };
- String[] input2 = {
- "1\thello",
- "4\tbye",
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- Tuple expectedResultCharArray =
-
(Tuple)Util.getPigConstant("('1','2','1','hello','1','2','1','hello')");
-
- Tuple expectedResult = TupleFactory.getInstance().newTuple();
- for(Object field : expectedResultCharArray.getAll()){
- expectedResult.append(new DataByteArray(field.toString()));
- }
-
- // with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput) +"' ; " +
- //re-using alias a for new operator below, doing this intentionally
- // because such use case has been seen
- "a = foreach a generate $0 as i, $1 as j ;" +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as (k, l); " +
- "c = join a by $0, b by $0;" +
- "d = foreach c generate i,j,k,l,a::i as ai,a::j as aj,b::k as bk,b::l
as bl;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- assertTrue(it.hasNext());
- Tuple res = it.next();
- assertEquals(expectedResult, res);
- assertFalse(it.hasNext());
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
-
- }
-
- @Test
- public void testLeftOuterJoin() throws Exception {
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('hello',1,'hello','world')",
- "('bye',2,null,null)",
- "(null,3,null,null)"
- });
-
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- //with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput)
+"' as (n:chararray, a:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 left outer, b by $0;" ;
- } else {
- script += "c = join a by $0 left, b by $0;" ;
- }
- script += "d = order c by $1;";
- // ensure we parse correctly
- Util.buildLp(pigServer, script);
-
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++),
it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load '"+ Util.encodeEscape(firstInput) +"';
" +
- "b = load '"+ Util.encodeEscape(secondInput) +"'; ";
- if(i == 0) {
- script += "c = join a by $0 left outer, b by $0;" ;
- } else {
- script += "c = join a by $0 left, b by $0;" ;
- }
- try {
- Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
- }
- }
- }
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testRightOuterJoin() throws Exception {
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(null,null,null,'evening')",
- "(null,null,'good','morning')",
- "('hello',1,'hello','world')"
- });
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- // with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput)
+"' as (n:chararray, a:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 right outer, b by $0;" ;
- } else {
- script += "c = join a by $0 right, b by $0;" ;
- }
- script += "d = order c by $3;";
- // ensure we parse correctly
- Util.buildLp(pigServer, script);
-
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++),
it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load '"+ Util.encodeEscape(firstInput) +"';
" +
- "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
- if(i == 0) {
- script += "c = join a by $0 right outer, b by $0;" ;
- } else {
- script += "c = join a by $0 right, b by $0;" ;
- }
- try {
- Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
- }
- }
- }
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testFullOuterJoin() throws Exception {
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "hello\t1",
- "bye\t2",
- "\t3"
- };
- String[] input2 = {
- "hello\tworld",
- "good\tmorning",
- "\tevening"
-
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
- List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(null,null,null,'evening')" ,
- "(null,null,'good','morning')" ,
- "('hello',1,'hello','world')" ,
- "('bye',2,null,null)" ,
- "(null,3,null,null)"
- });
- // with and without optional outer
- for(int i = 0; i < 2; i++) {
- // with schema
- String script = "a = load '"+ Util.encodeEscape(firstInput)
+"' as (n:chararray, a:int); " +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
- if(i == 0) {
- script += "c = join a by $0 full outer, b by $0;" ;
- } else {
- script += "c = join a by $0 full, b by $0;" ;
- }
- script += "d = order c by $1, $3;";
- // ensure we parse correctly
- Util.buildLp(pigServer, script);
-
- // run query and test results only once
- if(i == 0) {
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("d");
- int counter= 0;
- while(it.hasNext()) {
- assertEquals(expectedResults.get(counter++),
it.next());
- }
- assertEquals(expectedResults.size(), counter);
-
- // without schema
- script = "a = load '"+ Util.encodeEscape(firstInput) +"';
" +
- "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
- if(i == 0) {
- script += "c = join a by $0 full outer, b by $0;" ;
- } else {
- script += "c = join a by $0 full, b by $0;" ;
- }
- try {
- Util.registerMultiLineQuery(pigServer, script);
- } catch (Exception e) {
- PigException pe = LogUtils.getPigException(e);
- assertEquals(1105, pe.getErrorCode());
- }
- }
- }
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testMultiOuterJoinFailure() throws ExecException {
- setUp(ExecType.LOCAL);
- String[] types = new String[] { "left", "right", "full" };
- String query = "a = load 'a.txt' as (n:chararray, a:int);\n" +
- "b = load 'b.txt' as (n:chararray, m:chararray);\n"+
- "c = load 'c.txt' as (n:chararray, m:chararray);\n";
- for (int i = 0; i < types.length; i++) {
- boolean errCaught = false;
- try {
- String q = query +
- "d = join a by $0 " + types[i] + " outer, b by $0,
c by $0;" +
- "store d into 'output';";
- Util.buildLp(pigServer, q);
- } catch(Exception e) {
- errCaught = true;
- assertTrue(e.getMessage().contains("mismatched input ','
expecting SEMI_COLON"));
- }
- assertTrue(errCaught);
-
- }
-
- }
-
- @Test
- public void testNonRegularOuterJoinFailure() throws ExecException {
- setUp(ExecType.LOCAL);
- String query = "a = load 'a.txt' as (n:chararray, a:int); "+
- "b = load 'b.txt' as (n:chararray, m:chararray); ";
- String[] types = new String[] { "left", "right", "full" };
- String[] joinTypes = new String[] { "replicated", "repl"};
- for (int i = 0; i < types.length; i++) {
- for(int j = 0; j < joinTypes.length; j++) {
- boolean errCaught = false;
- try {
- String q = query + "d = join a by $0 " +
- types[i] + " outer, b by $0 using '" + joinTypes[j] +"';" +
- "store d into 'output';";
- Util.buildLp(pigServer, q);
-
- } catch(Exception e) {
- errCaught = true;
- // This after adding support of LeftOuter Join to
replicated Join
- assertTrue(e.getMessage().contains("does not support
(right|full) outer joins"));
- }
- assertEquals( i == 0 ? false : true, errCaught);
- }
- }
- }
-
- @Test
- public void testJoinTupleFieldKey() throws Exception{
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "(1,a)",
- "(2,aa)"
- };
- String[] input2 = {
- "(1,b)",
- "(2,bb)"
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
-
- String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(a:tuple(a1:int, a2:chararray));" +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(b:tuple(b1:int, b2:chararray));" +
- "c = join a by a.a1, b by b.b1;";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("c");
-
- List<Tuple> expectedResults =
Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "((1,'a'),(1,'b'))",
- "((2,'aa'),(2,'bb'))"
- });
- Util.checkQueryOutputsAfterSort(it, expectedResults);
-
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testJoinNullTupleFieldKey() throws Exception{
- for (ExecType execType : execTypes) {
- setUp(execType);
- String[] input1 = {
- "1\t",
- "2\taa"
- };
- String[] input2 = {
- "1\t",
- "2\taa"
- };
-
- String firstInput = createInputFile(execType, "a.txt", input1);
- String secondInput = createInputFile(execType, "b.txt", input2);
-
- String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(a1:int, a2:chararray);" +
- "b = load '"+ Util.encodeEscape(secondInput) +"' as
(b1:int, b2:chararray);" +
- "c = join a by (a1, a2), b by (b1, b2);";
- Util.registerMultiLineQuery(pigServer, script);
- Iterator<Tuple> it = pigServer.openIterator("c");
-
- List<Tuple> expectedResults = Util
- .getTuplesFromConstantTupleStrings(new String[] {
"(2,'aa',2,'aa')" });
- Util.checkQueryOutputs(it, expectedResults);
-
- deleteInputFile(execType, firstInput);
- deleteInputFile(execType, secondInput);
- }
- }
-
- @Test
- public void testLiteralsForJoinAlgoSpecification1() throws Exception {
- setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
- "b = load 'B'; " +
- "c = Join a by $0, b by $0 using 'merge';" +
- "store c into 'output';";
- LogicalPlan lp = Util.buildLp(pigServer, query);
- Operator store = lp.getSinks().get(0);
- LOJoin join = (LOJoin)lp.getPredecessors( store ).get(0);
- assertEquals(JOINTYPE.MERGE, join.getJoinType());
- }
-
- @Test
- public void testLiteralsForJoinAlgoSpecification2() throws Exception {
- setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
- "b = load 'B'; " +
- "c = Join a by $0, b by $0 using 'hash'; "+
- "store c into 'output';";
- LogicalPlan lp = Util.buildLp(pigServer, query);
- Operator store = lp.getSinks().get(0);
- LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
- assertEquals(JOINTYPE.HASH, join.getJoinType());
- }
-
- @Test
- public void testLiteralsForJoinAlgoSpecification5() throws Exception {
- setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
- "b = load 'B'; " +
- "c = Join a by $0, b by $0 using 'default'; "+
- "store c into 'output';";
- LogicalPlan lp = Util.buildLp(pigServer, query);
- Operator store = lp.getSinks().get(0);
- LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
- assertEquals(JOINTYPE.HASH, join.getJoinType());
- }
-
- @Test
- public void testLiteralsForJoinAlgoSpecification3() throws Exception {
- setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
- "b = load 'B'; " +
- "c = Join a by $0, b by $0 using 'repl'; "+
- "store c into 'output';";
- LogicalPlan lp = Util.buildLp(pigServer, query);
- Operator store = lp.getSinks().get(0);
- LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
- assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
- }
-
- @Test
- public void testLiteralsForJoinAlgoSpecification4() throws Exception {
- setUp(ExecType.LOCAL);
- String query = "a = load 'A'; " +
- "b = load 'B'; " +
- "c = Join a by $0, b by $0 using 'replicated'; "+
- "store c into 'output';";
- LogicalPlan lp = Util.buildLp(pigServer, query);
- Operator store = lp.getSinks().get(0);
- LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
- assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
- }
-
- // See: https://issues.apache.org/jira/browse/PIG-3093
- @Test
- public void testIndirectSelfJoinRealias() throws Exception {
- setUp(ExecType.LOCAL);
- Data data = resetData(pigServer);
-
- Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"),
tuple("c"));
- data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
- pigServer.registerQuery("A = load 'foo' using mock.Storage();");
- pigServer.registerQuery("B = foreach A generate *;");
- pigServer.registerQuery("C = join A by field1, B by field1;");
- assertEquals(Utils.getSchemaFromString("A::field1:chararray,
B::field1:chararray"), pigServer.dumpSchema("C"));
- pigServer.registerQuery("D = foreach C generate B::field1, A::field1
as field2;");
- assertEquals(Utils.getSchemaFromString("B::field1:chararray,
field2:chararray"), pigServer.dumpSchema("D"));
- pigServer.registerQuery("E = foreach D generate field1, field2;");
- assertEquals(Utils.getSchemaFromString("B::field1:chararray,
field2:chararray"), pigServer.dumpSchema("E"));
- pigServer.registerQuery("F = foreach E generate field2;");
- pigServer.registerQuery("store F into 'foo_out' using
mock.Storage();");
- List<Tuple> out = data.get("foo_out");
- assertEquals("Expected size was "+tuples.size()+" but was
"+out.size(), tuples.size(), out.size());
- for (Tuple t : out) {
- assertTrue("Should have found tuple "+t+" in expected: "+tuples,
tuples.remove(t));
- }
- assertTrue("All expected tuples should have been found, remaining:
"+tuples, tuples.isEmpty());
- }
-
- @Test
- public void testIndirectSelfJoinData() throws Exception {
- setUp(ExecType.LOCAL);
- Data data = resetData(pigServer);
-
- Set<Tuple> tuples = Sets.newHashSet(tuple("a", 1), tuple("b", 2),
tuple("c", 3));
- data.set("foo",
Utils.getSchemaFromString("field1:chararray,field2:int"), tuples);
- pigServer.registerQuery("A = load 'foo' using mock.Storage();");
- pigServer.registerQuery("B = foreach A generate field1, field2*2 as
field2;");
- pigServer.registerQuery("C = join A by field1, B by field1;");
- pigServer.registerQuery("D = foreach C generate A::field1 as field1_a,
B::field1 as field1_b, A::field2 as field2_a, B::field2 as field2_b;");
- pigServer.registerQuery("store D into 'foo_out' using
mock.Storage();");
-
- Set<Tuple> expected = Sets.newHashSet(tuple("a", "a", 1, 2),
tuple("b", "b", 2, 4), tuple("c", "c", 3, 6));
- List<Tuple> out = data.get("foo_out");
- assertEquals("Expected size was "+expected.size()+" but was
"+out.size(), expected.size(), out.size());
- for (Tuple t : out) {
- assertTrue("Should have found tuple "+t+" in expected: "+expected,
expected.remove(t));
- }
- assertTrue("All expected tuples should have been found, remaining:
"+expected, expected.isEmpty());
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
}
}
Added: pig/trunk/test/org/apache/pig/test/TestJoinBase.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJoinBase.java?rev=1653391&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJoinBase.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestJoinBase.java Tue Jan 20 22:16:33
2015
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.LogUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+abstract public class TestJoinBase {
+ protected PigServer pigServer;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ abstract protected String createInputFile(String fileNameHint, String[]
data) throws IOException;
+
+ abstract protected void deleteInputFile(String fileName) throws Exception;
+
+ @Test
+ public void testJoinUnkownSchema() throws Exception {
+ // If any of the input schema is unknown, the resulting schema should
be unknown as well
+ String script = "a = load 'a.txt';" +
+ "b = load 'b.txt'; " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Schema schema = pigServer.dumpSchema("c");
+ assertNull(schema);
+ }
+
+ @Test
+ public void testDefaultJoin() throws Exception {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ Tuple expectedResult =
(Tuple)Util.getPigConstant("('hello',1,'hello','world')");
+
+ // with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(n:chararray, a:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+
+ // without schema
+ script = "a = load '"+ Util.encodeEscape(firstInput) + "'; " +
+ "b = load '" + Util.encodeEscape(secondInput) + "'; " +
+ "c = join a by $0, b by $0;";
+ Util.registerMultiLineQuery(pigServer, script);
+ it = pigServer.openIterator("c");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult.toString(), it.next().toString());
+ assertFalse(it.hasNext());
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+
+ @Test
+ public void testJoinSchema() throws Exception {
+ String[] input1 = {
+ "1\t2",
+ "2\t3",
+ "3\t4"
+ };
+ String[] input2 = {
+ "1\thello",
+ "4\tbye",
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ Tuple expectedResult =
(Tuple)Util.getPigConstant("(1,2,1,'hello',1,2,1,'hello')");
+
+ // with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(i:int, j:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as (k:int,
l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate i,j,k,l,a::i as ai,a::j as aj,b::k as
bk,b::l as bl;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+
+ // schema with duplicates
+ script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int,
j:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int,
l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate i,j,l,a::i as ai,a::j as aj,b::i as bi,b::l as
bl;";
+ boolean exceptionThrown = false;
+ try{
+ Util.registerMultiLineQuery(pigServer, script);
+ pigServer.openIterator("d");
+ }catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1025, pe.getErrorCode());
+ exceptionThrown = true;
+ }
+ assertTrue(exceptionThrown);
+
+ // schema with duplicates with resolution
+ script = "a = load '"+ Util.encodeEscape(firstInput) +"' as (i:int,
j:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as (i:int,
l:chararray); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate a::i as ai1,j,b::i as bi1,l,a::i as ai2,a::j
as aj2,b::i as bi3,b::l as bl3;";
+ Util.registerMultiLineQuery(pigServer, script);
+ it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('hello',1,'hello','world')",
+ "('bye',2,null,null)",
+ "(null,3,null,null)"
+ });
+
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ //with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(n:chararray, a:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 left outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 left, b by $0;" ;
+ }
+ script += "d = order c by $1;";
+ // ensure we parse correctly
+ Util.buildLp(pigServer, script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"'; ";
+ if(i == 0) {
+ script += "c = join a by $0 left outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 left, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')",
+ "(null,null,'good','morning')",
+ "('hello',1,'hello','world')"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(n:chararray, a:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 right outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 right, b by $0;" ;
+ }
+ script += "d = order c by $3;";
+ // ensure we parse correctly
+ Util.buildLp(pigServer, script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 right outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 right, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+ @Test
+ public void testFullOuterJoin() throws Exception {
+ String[] input1 = {
+ "hello\t1",
+ "bye\t2",
+ "\t3"
+ };
+ String[] input2 = {
+ "hello\tworld",
+ "good\tmorning",
+ "\tevening"
+
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(null,null,null,'evening')" ,
+ "(null,null,'good','morning')" ,
+ "('hello',1,'hello','world')" ,
+ "('bye',2,null,null)" ,
+ "(null,3,null,null)"
+ });
+ // with and without optional outer
+ for(int i = 0; i < 2; i++) {
+ // with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(n:chararray, a:int); " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as
(n:chararray, m:chararray); ";
+ if(i == 0) {
+ script += "c = join a by $0 full outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 full, b by $0;" ;
+ }
+ script += "d = order c by $1, $3;";
+ // ensure we parse correctly
+ Util.buildLp(pigServer, script);
+
+ // run query and test results only once
+ if(i == 0) {
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ int counter= 0;
+ while(it.hasNext()) {
+ assertEquals(expectedResults.get(counter++), it.next());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ // without schema
+ script = "a = load '"+ Util.encodeEscape(firstInput) +"'; " +
+ "b = load '"+ Util.encodeEscape(secondInput) +"'; " ;
+ if(i == 0) {
+ script += "c = join a by $0 full outer, b by $0;" ;
+ } else {
+ script += "c = join a by $0 full, b by $0;" ;
+ }
+ try {
+ Util.registerMultiLineQuery(pigServer, script);
+ } catch (Exception e) {
+ PigException pe = LogUtils.getPigException(e);
+ assertEquals(1105, pe.getErrorCode());
+ }
+ }
+ }
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+
+ @Test
+ public void testJoinTupleFieldKey() throws Exception{
+ String[] input1 = {
+ "(1,a)",
+ "(2,aa)"
+ };
+ String[] input2 = {
+ "(1,b)",
+ "(2,bb)"
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(a:tuple(a1:int, a2:chararray));" +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as
(b:tuple(b1:int, b2:chararray));" +
+ "c = join a by a.a1, b by b.b1;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "((1,'a'),(1,'b'))",
+ "((2,'aa'),(2,'bb'))"
+ });
+ Util.checkQueryOutputsAfterSort(it, expectedResults);
+
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+ @Test
+ public void testJoinNullTupleFieldKey() throws Exception{
+ String[] input1 = {
+ "1\t",
+ "2\taa"
+ };
+ String[] input2 = {
+ "1\t",
+ "2\taa"
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' as
(a1:int, a2:chararray);" +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as (b1:int,
b2:chararray);" +
+ "c = join a by (a1, a2), b by (b1, b2);";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("c");
+
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] {
"(2,'aa',2,'aa')" });
+ Util.checkQueryOutputs(it, expectedResults);
+
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/test/TestJoinLocal.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJoinLocal.java?rev=1653391&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJoinLocal.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestJoinLocal.java Tue Jan 20 22:16:33
2015
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.junit.Before;
+import org.junit.Test;
+import org.python.google.common.collect.Sets;
+
+public class TestJoinLocal extends TestJoinBase {
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(Util.getLocalTestMode());
+ }
+ @Override
+ protected String createInputFile(String fileNameHint, String[] data)
+ throws IOException {
+ String fileName = "";
+ File f = Util.createInputFile("test", fileNameHint, data);
+ fileName = Util.generateURI(f.getAbsolutePath(),
pigServer.getPigContext());
+ return fileName;
+ }
+
+ @Override
+ protected void deleteInputFile(String fileName) throws Exception {
+ fileName = fileName.replace("file://", "");
+ new File(fileName).delete();
+ }
+
+ @Test
+ public void testJoinSchema2() throws Exception {
+ // test join where one load does not have schema
+ String[] input1 = {
+ "1\t2",
+ "2\t3",
+ "3\t4"
+ };
+ String[] input2 = {
+ "1\thello",
+ "4\tbye",
+ };
+
+ String firstInput = createInputFile("a.txt", input1);
+ String secondInput = createInputFile("b.txt", input2);
+ Tuple expectedResultCharArray =
+
(Tuple)Util.getPigConstant("('1','2','1','hello','1','2','1','hello')");
+
+ Tuple expectedResult = TupleFactory.getInstance().newTuple();
+ for(Object field : expectedResultCharArray.getAll()){
+ expectedResult.append(new DataByteArray(field.toString()));
+ }
+
+ // with schema
+ String script = "a = load '"+ Util.encodeEscape(firstInput) +"' ; " +
+ //re-using alias a for new operator below, doing this intentionally
+ // because such use case has been seen
+ "a = foreach a generate $0 as i, $1 as j ;" +
+ "b = load '"+ Util.encodeEscape(secondInput) +"' as (k, l); " +
+ "c = join a by $0, b by $0;" +
+ "d = foreach c generate i,j,k,l,a::i as ai,a::j as aj,b::k as bk,b::l
as bl;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ Tuple res = it.next();
+ assertEquals(expectedResult, res);
+ assertFalse(it.hasNext());
+ deleteInputFile(firstInput);
+ deleteInputFile(secondInput);
+
+ }
+
+ @Test
+ public void testMultiOuterJoinFailure() throws Exception {
+ String[] types = new String[] { "left", "right", "full" };
+ String query = "a = load 'a.txt' as (n:chararray, a:int);\n" +
+ "b = load 'b.txt' as (n:chararray, m:chararray);\n"+
+ "c = load 'c.txt' as (n:chararray, m:chararray);\n";
+ for (int i = 0; i < types.length; i++) {
+ boolean errCaught = false;
+ try {
+ String q = query +
+ "d = join a by $0 " + types[i] + " outer, b by $0,
c by $0;" +
+ "store d into 'output';";
+ Util.buildLp(pigServer, q);
+ } catch(Exception e) {
+ errCaught = true;
+ assertTrue(e.getMessage().contains("mismatched input ','
expecting SEMI_COLON"));
+ }
+ assertTrue(errCaught);
+
+ }
+
+ }
+
+ @Test
+ public void testNonRegularOuterJoinFailure() throws Exception {
+ String query = "a = load 'a.txt' as (n:chararray, a:int); "+
+ "b = load 'b.txt' as (n:chararray, m:chararray); ";
+ String[] types = new String[] { "left", "right", "full" };
+ String[] joinTypes = new String[] { "replicated", "repl"};
+ for (int i = 0; i < types.length; i++) {
+ for(int j = 0; j < joinTypes.length; j++) {
+ boolean errCaught = false;
+ try {
+ String q = query + "d = join a by $0 " +
+ types[i] + " outer, b by $0 using '" + joinTypes[j] +"';" +
+ "store d into 'output';";
+ Util.buildLp(pigServer, q);
+
+ } catch(Exception e) {
+ errCaught = true;
+ // This after adding support of LeftOuter Join to
replicated Join
+ assertTrue(e.getMessage().contains("does not support
(right|full) outer joins"));
+ }
+ assertEquals( i == 0 ? false : true, errCaught);
+ }
+ }
+ }
+
+ @Test
+ public void testLiteralsForJoinAlgoSpecification1() throws Exception {
+ String query = "a = load 'A'; " +
+ "b = load 'B'; " +
+ "c = Join a by $0, b by $0 using 'merge';" +
+ "store c into 'output';";
+ LogicalPlan lp = Util.buildLp(pigServer, query);
+ Operator store = lp.getSinks().get(0);
+ LOJoin join = (LOJoin)lp.getPredecessors( store ).get(0);
+ assertEquals(JOINTYPE.MERGE, join.getJoinType());
+ }
+
+ @Test
+ public void testLiteralsForJoinAlgoSpecification2() throws Exception {
+ String query = "a = load 'A'; " +
+ "b = load 'B'; " +
+ "c = Join a by $0, b by $0 using 'hash'; "+
+ "store c into 'output';";
+ LogicalPlan lp = Util.buildLp(pigServer, query);
+ Operator store = lp.getSinks().get(0);
+ LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
+ assertEquals(JOINTYPE.HASH, join.getJoinType());
+ }
+
+ @Test
+ public void testLiteralsForJoinAlgoSpecification5() throws Exception {
+ String query = "a = load 'A'; " +
+ "b = load 'B'; " +
+ "c = Join a by $0, b by $0 using 'default'; "+
+ "store c into 'output';";
+ LogicalPlan lp = Util.buildLp(pigServer, query);
+ Operator store = lp.getSinks().get(0);
+ LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
+ assertEquals(JOINTYPE.HASH, join.getJoinType());
+ }
+
+ @Test
+ public void testLiteralsForJoinAlgoSpecification3() throws Exception {
+ String query = "a = load 'A'; " +
+ "b = load 'B'; " +
+ "c = Join a by $0, b by $0 using 'repl'; "+
+ "store c into 'output';";
+ LogicalPlan lp = Util.buildLp(pigServer, query);
+ Operator store = lp.getSinks().get(0);
+ LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
+ assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
+ }
+
+ @Test
+ public void testLiteralsForJoinAlgoSpecification4() throws Exception {
+ String query = "a = load 'A'; " +
+ "b = load 'B'; " +
+ "c = Join a by $0, b by $0 using 'replicated'; "+
+ "store c into 'output';";
+ LogicalPlan lp = Util.buildLp(pigServer, query);
+ Operator store = lp.getSinks().get(0);
+ LOJoin join = (LOJoin) lp.getPredecessors( store ).get(0);
+ assertEquals(JOINTYPE.REPLICATED, join.getJoinType());
+ }
+
+ // See: https://issues.apache.org/jira/browse/PIG-3093
+ @Test
+ public void testIndirectSelfJoinRealias() throws Exception {
+ Data data = resetData(pigServer);
+
+ Set<Tuple> tuples = Sets.newHashSet(tuple("a"), tuple("b"),
tuple("c"));
+ data.set("foo", Utils.getSchemaFromString("field1:chararray"), tuples);
+ pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate *;");
+ pigServer.registerQuery("C = join A by field1, B by field1;");
+ assertEquals(Utils.getSchemaFromString("A::field1:chararray,
B::field1:chararray"), pigServer.dumpSchema("C"));
+ pigServer.registerQuery("D = foreach C generate B::field1, A::field1
as field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray,
field2:chararray"), pigServer.dumpSchema("D"));
+ pigServer.registerQuery("E = foreach D generate field1, field2;");
+ assertEquals(Utils.getSchemaFromString("B::field1:chararray,
field2:chararray"), pigServer.dumpSchema("E"));
+ pigServer.registerQuery("F = foreach E generate field2;");
+ pigServer.registerQuery("store F into 'foo_out' using
mock.Storage();");
+ List<Tuple> out = data.get("foo_out");
+ assertEquals("Expected size was "+tuples.size()+" but was
"+out.size(), tuples.size(), out.size());
+ for (Tuple t : out) {
+ assertTrue("Should have found tuple "+t+" in expected: "+tuples,
tuples.remove(t));
+ }
+ assertTrue("All expected tuples should have been found, remaining:
"+tuples, tuples.isEmpty());
+ }
+
+ @Test
+ public void testIndirectSelfJoinData() throws Exception {
+ Data data = resetData(pigServer);
+
+ Set<Tuple> tuples = Sets.newHashSet(tuple("a", 1), tuple("b", 2),
tuple("c", 3));
+ data.set("foo",
Utils.getSchemaFromString("field1:chararray,field2:int"), tuples);
+ pigServer.registerQuery("A = load 'foo' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate field1, field2*2 as
field2;");
+ pigServer.registerQuery("C = join A by field1, B by field1;");
+ pigServer.registerQuery("D = foreach C generate A::field1 as field1_a,
B::field1 as field1_b, A::field2 as field2_a, B::field2 as field2_b;");
+ pigServer.registerQuery("store D into 'foo_out' using
mock.Storage();");
+
+ Set<Tuple> expected = Sets.newHashSet(tuple("a", "a", 1, 2),
tuple("b", "b", 2, 4), tuple("c", "c", 3, 6));
+ List<Tuple> out = data.get("foo_out");
+ assertEquals("Expected size was "+expected.size()+" but was
"+out.size(), expected.size(), out.size());
+ for (Tuple t : out) {
+ assertTrue("Should have found tuple "+t+" in expected: "+expected,
expected.remove(t));
+ }
+ assertTrue("All expected tuples should have been found, remaining:
"+expected, expected.isEmpty());
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Jan 20 22:16:33
2015
@@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
@@ -53,13 +52,13 @@ public class TestMultiQuery {
"test/org/apache/pig/test/data/passwd2", "passwd2");
Properties props = new Properties();
props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
- myPig = new PigServer(ExecType.LOCAL, props);
+ myPig = new PigServer(Util.getLocalTestMode(), props);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
- Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()),
"passwd");
- Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()),
"passwd2");
+ Util.deleteFile(new PigContext(Util.getLocalTestMode(), new
Properties()), "passwd");
+ Util.deleteFile(new PigContext(Util.getLocalTestMode(), new
Properties()), "passwd2");
deleteOutputFiles();
}
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=1653391&r1=1653390&r2=1653391&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java Tue Jan 20
22:16:33 2015
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
@@ -46,12 +45,17 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
+import org.junit.Before;
import org.junit.Test;
public class TestNewPlanColumnPrune {
LogicalPlan plan = null;
- PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+ PigContext pc;
+ @Before
+ public void setUp() throws Exception {
+ pc = new PigContext(Util.getLocalTestMode(), new Properties());
+ }
private LogicalPlan buildPlan(String query) throws Exception{
PigServer pigServer = new PigServer( pc );
return Util.buildLp(pigServer, query);