Author: daijy
Date: Wed Jan 21 06:09:06 2015
New Revision: 1653445
URL: http://svn.apache.org/r1653445
Log:
PIG-4359: Port local mode tests to Tez - part4
Added:
pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java
pig/trunk/test/org/apache/pig/test/TestStoreBase.java
pig/trunk/test/org/apache/pig/test/TestStoreLocal.java
pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java
pig/trunk/test/org/apache/pig/tez/TezUtil.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/test/excluded-tests-20
pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
pig/trunk/test/org/apache/pig/test/TestStore.java
pig/trunk/test/tez-local-tests
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 21 06:09:06 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4359: Port local mode tests to Tez - part4 (daijy)
+
PIG-4340: PigStorage fails parsing empty map (daijy)
PIG-4366: Port local mode tests to Tez - part5 (daijy)
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL:
http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
(original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Wed Jan
21 06:09:06 2015
@@ -27,7 +27,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
/**
* This class builds a single instance of itself with the Singleton
@@ -128,4 +130,8 @@ public class MiniCluster extends MiniGen
if (m_mr != null) { m_mr.stop(); }
m_mr = null;
}
+
+ static public Launcher getLauncher() {
+ return new MapReduceLauncher();
+ }
}
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL:
http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
(original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Wed
Jan 21 06:09:06 2015
@@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.v2.Mi
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -185,4 +187,8 @@ public class TezMiniCluster extends Mini
YARN_CONF_FILE.delete();
}
}
+
+ static public Launcher getLauncher() {
+ return new TezLauncher();
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Wed Jan 21 06:09:06 2015
@@ -368,7 +368,7 @@ public class TezCompiler extends PhyPlan
storeOnlyPhyPlan.addAsLeaf(store);
storeOnlyTezOperator.plan = storeOnlyPhyPlan;
tezPlan.add(storeOnlyTezOperator);
- phyToTezOpMap.put(store, storeOnlyTezOperator);
+ phyToTezOpMap.put(p, storeOnlyTezOperator);
// Create new operator as second splittee
curTezOp = getTezOp();
Modified: pig/trunk/test/excluded-tests-20
URL:
http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-20?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-20 (original)
+++ pig/trunk/test/excluded-tests-20 Wed Jan 21 06:09:06 2015
@@ -7,3 +7,4 @@
**/TestGroupConstParallelTez.java
**/TestLoaderStorerShipCacheFilesTez.java
**/TestPigStatsTez.java
+**/TestPOPartialAggPlanTez.java
Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Wed Jan 21
06:09:06 2015
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
/**
* This class builds a single instance of itself with the Singleton
@@ -146,4 +147,18 @@ abstract public class MiniGenericCluster
String msg = "function called on MiniCluster that has been shutdown";
throw new RuntimeException(msg);
}
+
+ static public Launcher getLauncher() {
+ String execType = System.getProperty("test.exec.type");
+ if (execType == null) {
+ System.setProperty("test.exec.type", EXECTYPE_MR);
+ }
+ if (execType.equalsIgnoreCase(EXECTYPE_MR)) {
+ return MiniCluster.getLauncher();
+ } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
+ return TezMiniCluster.getLauncher();
+ } else {
+ throw new RuntimeException("Unknown test.exec.type: " + execType);
+ }
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Jan 21
06:09:06 2015
@@ -19,8 +19,8 @@ package org.apache.pig.test;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,17 +29,19 @@ import java.util.Properties;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.builtin.PigStorage;
@@ -56,17 +58,18 @@ import org.apache.pig.tools.pigscript.pa
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.After;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
public class TestMultiQueryLocal {
- private PigServer myPig;
+ protected PigServer myPig;
private String TMP_DIR;
@Before
public void setUp() throws Exception {
- PigContext context = new PigContext(ExecType.LOCAL, new Properties());
+ PigContext context = new PigContext(Util.getLocalTestMode(), new
Properties());
context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,
""+true);
myPig = new PigServer(context);
myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan",
"false");
@@ -351,22 +354,33 @@ public class TestMultiQueryLocal {
public static class PigStorageWithConfig extends PigStorage {
- private static final String key = "test.key";
+ private static final String key1 = "test.key1";
+ private static final String key2 = "test.key2";
private String suffix;
+ private String myKey;
- public PigStorageWithConfig(String s) {
+ public PigStorageWithConfig(String key, String s) {
this.suffix = s;
+ this.myKey = key;
}
@Override
public void setStoreLocation(String location, Job job) throws
IOException {
super.setStoreLocation(location, job);
- Assert.assertNull(job.getConfiguration().get(key));
+ if (myKey.equals(key1)) {
+ Assert.assertNull(job.getConfiguration().get(key2));
+ } else {
+ Assert.assertNull(job.getConfiguration().get(key1));
+ }
}
@Override
public OutputFormat getOutputFormat() {
- return new PigTextOutputFormatWithConfig();
+ if (myKey.equals(key1)) {
+ return new PigTextOutputFormatWithConfig1();
+ } else {
+ return new PigTextOutputFormatWithConfig2();
+ }
}
@Override
@@ -384,16 +398,30 @@ public class TestMultiQueryLocal {
}
}
- private static class PigTextOutputFormatWithConfig extends
PigTextOutputFormat {
+ private static class PigTextOutputFormatWithConfig1 extends
PigTextOutputFormat {
+
+ public PigTextOutputFormatWithConfig1() {
+ super((byte) '\t');
+ }
+
+ @Override
+ public synchronized OutputCommitter
getOutputCommitter(TaskAttemptContext context)
+ throws IOException {
+ context.getConfiguration().set(PigStorageWithConfig.key1,
MRConfiguration.WORK_OUPUT_DIR);
+ return super.getOutputCommitter(context);
+ }
+ }
+
+ private static class PigTextOutputFormatWithConfig2 extends
PigTextOutputFormat {
- public PigTextOutputFormatWithConfig() {
+ public PigTextOutputFormatWithConfig2() {
super((byte) '\t');
}
@Override
public synchronized OutputCommitter
getOutputCommitter(TaskAttemptContext context)
throws IOException {
- context.getConfiguration().set(PigStorageWithConfig.key,
MRConfiguration.WORK_OUPUT_DIR);
+ context.getConfiguration().set(PigStorageWithConfig.key2,
MRConfiguration.WORK_OUPUT_DIR);
return super.getOutputCommitter(context);
}
}
@@ -411,17 +439,20 @@ public class TestMultiQueryLocal {
"using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = filter a by uid < 5;");
myPig.registerQuery("c = filter a by uid > 5;");
- myPig.registerQuery("store b into '" + TMP_DIR +
"/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() +
"('a');");
- myPig.registerQuery("store c into '" + TMP_DIR +
"/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() +
"('b');");
+ myPig.registerQuery("store b into '" + TMP_DIR +
"/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() +
"('test.key1', 'a');");
+ myPig.registerQuery("store c into '" + TMP_DIR +
"/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() +
"('test.key2', 'b');");
myPig.executeBatch();
myPig.discardBatch();
- BufferedReader reader = new BufferedReader(new FileReader(TMP_DIR
+ "/Pig-TestMultiQueryLocal1/part-m-00000"));
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ BufferedReader reader = new BufferedReader(new InputStreamReader
+ (fs.open(Util.getFirstPartFile(new Path(TMP_DIR +
"/Pig-TestMultiQueryLocal1")))));
String line;
while ((line = reader.readLine())!=null) {
Assert.assertTrue(line.endsWith("a"));
}
- reader = new BufferedReader(new FileReader(TMP_DIR +
"/Pig-TestMultiQueryLocal2/part-m-00000"));
+ reader = new BufferedReader(new InputStreamReader
+ (fs.open(Util.getFirstPartFile(new Path(TMP_DIR +
"/Pig-TestMultiQueryLocal2")))));
while ((line = reader.readLine())!=null) {
Assert.assertTrue(line.endsWith("b"));
}
@@ -505,8 +536,9 @@ public class TestMultiQueryLocal {
}
@Test
- public void testMultiQueryWithIllustrate() {
+ public void testMultiQueryWithIllustrate() throws Exception {
+ Assume.assumeTrue("illustrate does not work in tez (PIG-3993)",
!Util.getLocalTestMode().toString().startsWith("TEZ"));
System.out.println("===== test multi-query with illustrate =====");
try {
@@ -626,7 +658,7 @@ public class TestMultiQueryLocal {
lp.optimize(myPig.getPigContext());
System.out.println("===== check physical plan =====");
- PhysicalPlan pp =
((MRExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
+ PhysicalPlan pp =
((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
lp, null);
Assert.assertEquals(expectedRoots, pp.getRoots().size());
@@ -638,9 +670,9 @@ public class TestMultiQueryLocal {
return pp;
}
- private boolean executePlan(PhysicalPlan pp) throws IOException {
+ protected boolean executePlan(PhysicalPlan pp) throws IOException {
boolean failed = true;
- MapReduceLauncher launcher = new MapReduceLauncher();
+ Launcher launcher = MiniGenericCluster.getLauncher();
PigStats stats = null;
try {
stats = launcher.launchPig(pp, "execute", myPig.getPigContext());
Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Wed Jan 21
06:09:06 2015
@@ -23,28 +23,28 @@ import static org.junit.Assert.assertNul
import java.util.Iterator;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.impl.PigContext;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
/**
* Test POPartialAgg runtime
*/
+@Ignore
public class TestPOPartialAggPlan {
- private static PigContext pc;
- private static PigServer ps;
+ protected static PigContext pc;
+ protected static PigServer ps;
@Before
- public void setUp() throws ExecException {
- ps = new PigServer(ExecType.LOCAL);
+ public void setUp() throws Exception {
+ ps = new PigServer(Util.getLocalTestMode());
pc = ps.getPigContext();
pc.connect();
}
@@ -89,7 +89,7 @@ public class TestPOPartialAggPlan {
return findPOPartialAgg(mapPlan);
}
- private String getGByQuery() {
+ protected String getGByQuery() {
return "l = load 'x' as (a,b,c);" +
"g = group l by a;" +
"f = foreach g generate group, COUNT(l.b);";
@@ -122,8 +122,8 @@ public class TestPOPartialAggPlan {
assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
}
- private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
- Iterator<PhysicalOperator> it = mapPlan.iterator();
+ protected PhysicalOperator findPOPartialAgg(PhysicalPlan plan) {
+ Iterator<PhysicalOperator> it = plan.iterator();
while(it.hasNext()){
PhysicalOperator op = it.next();
if(op instanceof POPartialAgg){
@@ -132,7 +132,4 @@ public class TestPOPartialAggPlan {
}
return null;
}
-
-
-
}
Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlanMR.java Wed Jan 21
06:09:06 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.pig.PigConfiguration;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.junit.Test;
+
+public class TestPOPartialAggPlanMR extends TestPOPartialAggPlan {
+ @Test
+ public void testNoMapAggProp() throws Exception{
+ //test with pig.exec.mapPartAgg not set
+ String query = getGByQuery();
+
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggPropFalse() throws Exception{
+ //test with pig.exec.mapPartAgg set to false
+ String query = getGByQuery();
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"false");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggPropTrue() throws Exception{
+ //test with pig.exec.mapPartAgg to true
+ String query = getGByQuery();
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNotNull("POPartialAgg should be present",findPOPartialAgg(mrp));
+
+ }
+
+
+ private PhysicalOperator findPOPartialAgg(MROperPlan mrp) {
+ PhysicalPlan mapPlan = mrp.getRoots().get(0).mapPlan;
+ return findPOPartialAgg(mapPlan);
+ }
+
+ @Test
+ public void testMapAggNoAggFunc() throws Exception{
+ //no agg func, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group;";
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggNotCombinable() throws Exception{
+ //not combinable, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group, COUNT(l.b), l.b;";
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Wed Jan 21 06:09:06 2015
@@ -28,8 +28,6 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -56,12 +53,9 @@ import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
@@ -84,37 +78,22 @@ import org.apache.pig.test.utils.GenRand
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
-public class TestStore {
+public class TestStore extends TestStoreBase {
POStore st;
DataBag inpDB;
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
PigContext pc;
POProject proj;
- PigServer pig;
-
- String inputFileName;
- String outputFileName;
-
- private static final String DUMMY_STORE_CLASS_NAME
- = "org.apache.pig.test.TestStore\\$DummyStore";
-
- private static final String FAIL_UDF_NAME
- = "org.apache.pig.test.TestStore\\$FailUDF";
- private static final String MAP_MAX_ATTEMPTS =
MRConfiguration.MAP_MAX_ATTEMPTS;
- private static final String TESTDIR = "/tmp/" +
TestStore.class.getSimpleName();
- private static ExecType[] modes = new ExecType[] { ExecType.LOCAL,
cluster.getExecType() };
-
+
@Before
public void setUp() throws Exception {
- pig = new PigServer(cluster.getExecType(), cluster.getProperties());
- pc = pig.getPigContext();
- inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() +
".txt";
- outputFileName = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
-
+ mode = cluster.getExecType();
+ setupPigServer();
+ pc = ps.getPigContext();
+ super.setUp();
}
@After
@@ -124,14 +103,20 @@ public class TestStore {
Util.deleteFile(cluster, TESTDIR);
}
+ @Override
+ protected void setupPigServer() throws Exception {
+ ps = new PigServer(cluster.getExecType(),
+ cluster.getProperties());
+ }
+
private void storeAndCopyLocally(DataBag inpDB) throws Exception {
setUpInputFileOnCluster(inpDB);
String script = "a = load '" + inputFileName + "'; " +
"store a into '" + outputFileName + "' using
PigStorage('\t');" +
"fs -ls " + TESTDIR;
- pig.setBatchOn();
- Util.registerMultiLineQuery(pig, script);
- pig.executeBatch();
+ ps.setBatchOn();
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
Path path = getFirstOutputFile(cluster.getConfiguration(),
new Path(outputFileName), cluster.getExecType(), true);
Util.copyFromClusterToLocal(
@@ -139,28 +124,6 @@ public class TestStore {
path.toString(), outputFileName);
}
- public static Path getFirstOutputFile(Configuration conf, Path outputDir,
- ExecType exectype, boolean isMapOutput) throws IOException {
- FileSystem fs = outputDir.getFileSystem(conf);
- FileStatus[] outputFiles = fs.listStatus(outputDir,
- Util.getSuccessMarkerPathFilter());
-
- boolean filefound = false;
- if (outputFiles != null && outputFiles.length != 0) {
- String name = outputFiles[0].getPath().getName();
- if (exectype == ExecType.LOCAL || exectype == ExecType.MAPREDUCE) {
- if (isMapOutput) {
- filefound = name.equals("part-m-00000");
- } else {
- filefound = name.equals("part-r-00000");
- }
- } else {
- filefound = name.startsWith("part-");
- }
- }
- return filefound ? outputFiles[0].getPath() : null;
- }
-
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
@@ -173,13 +136,13 @@ public class TestStore {
String query = "a = load '" + inputFileName + "' as (c:chararray,
" +
"i:int,d:double);" +
"store a into '" + outputFileName + "' using " +
"PigStorage();";
- org.apache.pig.newplan.logical.relational.LogicalPlan lp =
Util.buildLp( pig, query );
+ org.apache.pig.newplan.logical.relational.LogicalPlan lp =
Util.buildLp( ps, query );
} catch (PlanValidationException e){
// Since output file is not present, validation should pass
// and not throw this exception.
fail("Store validation test failed.");
} finally {
- Util.deleteFile(pig.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
}
}
@@ -189,11 +152,11 @@ public class TestStore {
String outputFileName = "test-output.txt";
boolean sawException = false;
try {
- Util.createInputFile(pig.getPigContext(),outputFileName, input);
+ Util.createInputFile(ps.getPigContext(),outputFileName, input);
String query = "a = load '" + inputFileName + "' as (c:chararray,
" +
"i:int,d:double);" +
"store a into '" + outputFileName + "' using
PigStorage();";
- Util.buildLp( pig, query );
+ Util.buildLp( ps, query );
} catch (InvocationTargetException e){
FrontendException pve = (FrontendException)e.getCause();
pve.printStackTrace();
@@ -205,7 +168,7 @@ public class TestStore {
sawException = true;
} finally {
assertTrue(sawException);
- Util.deleteFile(pig.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
}
}
@@ -363,24 +326,24 @@ public class TestStore {
String inputFileName = "testGetSchema-input.txt";
String outputFileName = "testGetSchema-output.txt";
try {
- Util.createInputFile(pig.getPigContext(),
+ Util.createInputFile(ps.getPigContext(),
inputFileName, input);
String query = "a = load '" + inputFileName + "' as (c:chararray,
" +
"i:int,d:double);store a into '" + outputFileName + "'
using " +
"BinStorage();";
- pig.setBatchOn();
- Util.registerMultiLineQuery(pig, query);
- pig.executeBatch();
+ ps.setBatchOn();
+ Util.registerMultiLineQuery(ps, query);
+ ps.executeBatch();
ResourceSchema rs = new BinStorage().getSchema(outputFileName,
- new
Job(ConfigurationUtil.toConfiguration(pig.getPigContext().
+ new
Job(ConfigurationUtil.toConfiguration(ps.getPigContext().
getProperties())));
Schema expectedSchema = Utils.getSchemaFromString(
"c:chararray,i:int,d:double");
assertTrue("Checking binstorage getSchema output", Schema.equals(
expectedSchema, Schema.getPigSchema(rs), true, true));
} finally {
- Util.deleteFile(pig.getPigContext(), inputFileName);
- Util.deleteFile(pig.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
}
}
@@ -414,391 +377,6 @@ public class TestStore {
checkStorePath("/tmp/foo/../././","/tmp/foo/.././.");
}
- @Test
- public void testSetStoreSchema() throws Exception {
- PigServer ps = null;
- Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
- filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED,
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED,
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED,
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED,
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED,
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED,
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED,
Boolean.FALSE);
- String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
- String script = "a = load '"+ inputFileName + "' as (a0:chararray,
a1:chararray);" +
- "store a into '" + outputFileName + "' using " +
- DUMMY_STORE_CLASS_NAME + "();";
-
- for (ExecType execType : modes) {
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- ps = new PigServer(cluster.getExecType(),
- cluster.getProperties());
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED,
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED,
Boolean.TRUE);
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ps = new PigServer(ExecType.LOCAL, props);
- if (Util.isHadoop1_x()) {
- // MAPREDUCE-1447/3563 (LocalJobRunner does not call
methods of mapreduce
- // OutputCommitter) is fixed only in 0.23.1
-
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE);
-
filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE);
- }
- }
- ps.setBatchOn();
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- ps.executeBatch();
- for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
- String condition = entry.getValue() ? "" : "not";
- assertEquals("Checking if file " + entry.getKey() +
- " does " + condition + " exists in " + execType +
- " mode", (boolean) entry.getValue(),
- Util.exists(ps.getPigContext(), entry.getKey()));
- }
- }
- }
-
- @Test
- public void testCleanupOnFailure() throws Exception {
- PigServer ps = null;
- String cleanupSuccessFile = outputFileName +
"_cleanupOnFailure_succeeded";
- String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
- String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
- String script = "a = load '"+ inputFileName + "';" +
- "store a into '" + outputFileName + "' using " +
- DUMMY_STORE_CLASS_NAME + "('true');";
-
- for (ExecType execType : modes) {
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- ps = new PigServer(cluster.getExecType(),
- cluster.getProperties());
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ps = new PigServer(ExecType.LOCAL, props);
- }
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- ps.setBatchOn();
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- ps.executeBatch();
- assertEquals(
- "Checking if file indicating that cleanupOnFailure failed
" +
- " does not exists in " + execType + " mode", false,
- Util.exists(ps.getPigContext(), cleanupFailFile));
- assertEquals(
- "Checking if file indicating that cleanupOnFailure was " +
- "successfully called exists in " + execType + " mode",
true,
- Util.exists(ps.getPigContext(), cleanupSuccessFile));
- }
- }
-
-
- @Test
- public void testCleanupOnFailureMultiStore() throws Exception {
- PigServer ps = null;
- String outputFileName1 = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
- String outputFileName2 = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
-
- Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
- filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1",
Boolean.TRUE);
- filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2",
Boolean.TRUE);
- filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1",
Boolean.FALSE);
- filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2",
Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1",
Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2",
Boolean.FALSE);
-
- String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
-
- // though the second store should
- // not cause a failure, the first one does and the result should be
- // that both stores are considered to have failed
- String script = "a = load '"+ inputFileName + "';" +
- "store a into '" + outputFileName1 + "' using " +
- DUMMY_STORE_CLASS_NAME + "('true', '1');" +
- "store a into '" + outputFileName2 + "' using " +
- DUMMY_STORE_CLASS_NAME + "('false', '2');";
-
- for (ExecType execType : new ExecType[] {cluster.getExecType(),
ExecType.LOCAL}) {
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- ps = new PigServer(cluster.getExecType(),
- cluster.getProperties());
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ps = new PigServer(ExecType.LOCAL, props);
- // LocalJobRunner does not call abortTask
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED +
"1", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED +
"2", Boolean.FALSE);
- if (Util.isHadoop1_x()) {
- // MAPREDUCE-1447/3563 (LocalJobRunner does not call
methods of mapreduce
- // OutputCommitter) is fixed only in 0.23.1
-
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1",
Boolean.FALSE);
-
filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2",
Boolean.FALSE);
-
filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1",
Boolean.FALSE);
-
filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2",
Boolean.FALSE);
- }
- }
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- ps.setBatchOn();
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- ps.executeBatch();
- for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
- String condition = entry.getValue() ? "" : "not";
- assertEquals("Checking if file " + entry.getKey() +
- " does " + condition + " exists in " + execType +
- " mode", (boolean) entry.getValue(),
- Util.exists(ps.getPigContext(), entry.getKey()));
- }
- }
- }
-
- // Test that "_SUCCESS" file is created when
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
- // property is set to true
- // The test covers multi store and single store case in local and
mapreduce mode
- // The test also checks that "_SUCCESS" file is NOT created when the
property
- // is not set to true in all the modes.
- @Test
- public void testSuccessFileCreation1() throws Exception {
- PigServer ps = null;
-
- try {
- String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
-
- String multiStoreScript = "a = load '"+ inputFileName + "';" +
- "b = filter a by $0 == 'hello';" +
- "c = filter a by $0 == 'hi';" +
- "d = filter a by $0 == 'bye';" +
- "store b into '" + outputFileName + "_1';" +
- "store c into '" + outputFileName + "_2';" +
- "store d into '" + outputFileName + "_3';";
-
- String singleStoreScript = "a = load '"+ inputFileName + "';" +
- "store a into '" + outputFileName + "_1';" ;
-
- for (ExecType execType : modes) {
- for(boolean isPropertySet: new boolean[] { true, false}) {
- for(boolean isMultiStore: new boolean[] { true, false}) {
- String script = (isMultiStore ? multiStoreScript :
- singleStoreScript);
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- ps = new PigServer(cluster.getExecType(),
- cluster.getProperties());
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME,
"file:///");
- ps = new PigServer(ExecType.LOCAL, props);
- }
- ps.getPigContext().getProperties().setProperty(
-
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
- Boolean.toString(isPropertySet));
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- ps.setBatchOn();
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- ps.executeBatch();
- for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
- String sucFile = outputFileName + "_" + i + "/" +
-
MapReduceLauncher.SUCCEEDED_FILE_NAME;
- assertEquals("Checking if _SUCCESS file exists in
" +
- execType + " mode", isPropertySet,
- Util.exists(ps.getPigContext(), sucFile));
- }
- }
- }
- }
- } finally {
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- }
- }
-
- // Test _SUCCESS file is NOT created when job fails and when
- // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to
true
- // The test covers multi store and single store case in local and
mapreduce mode
- // The test also checks that "_SUCCESS" file is NOT created when the
property
- // is not set to true in all the modes.
- @Test
- public void testSuccessFileCreation2() throws Exception {
- PigServer ps = null;
- try {
- String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
- System.err.println("XXX: " + TestStore.FailUDF.class.getName());
- String multiStoreScript = "a = load '"+ inputFileName + "';" +
- "b = filter a by $0 == 'hello';" +
- "b = foreach b generate " + FAIL_UDF_NAME + "($0);" +
- "c = filter a by $0 == 'hi';" +
- "d = filter a by $0 == 'bye';" +
- "store b into '" + outputFileName + "_1';" +
- "store c into '" + outputFileName + "_2';" +
- "store d into '" + outputFileName + "_3';";
-
- String singleStoreScript = "a = load '"+ inputFileName + "';" +
- "b = foreach a generate " + FAIL_UDF_NAME + "($0);" +
- "store b into '" + outputFileName + "_1';" ;
-
- for (ExecType execType : modes) {
- for(boolean isPropertySet: new boolean[] { true, false}) {
- for(boolean isMultiStore: new boolean[] { true, false}) {
- String script = (isMultiStore ? multiStoreScript :
- singleStoreScript);
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- // since the job is guaranteed to fail, let's set
- // number of retries to 1.
- Properties props = cluster.getProperties();
- props.setProperty(MAP_MAX_ATTEMPTS, "1");
- ps = new PigServer(cluster.getExecType(), props);
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME,
"file:///");
- // since the job is guaranteed to fail, let's set
- // number of retries to 1.
- props.setProperty(MAP_MAX_ATTEMPTS, "1");
- ps = new PigServer(ExecType.LOCAL, props);
- }
- ps.getPigContext().getProperties().setProperty(
-
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
- Boolean.toString(isPropertySet));
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- ps.setBatchOn();
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- try {
- ps.executeBatch();
- } catch(IOException ioe) {
- if(!ioe.getMessage().equals("FailUDFException")) {
- // an unexpected exception
- throw ioe;
- }
- }
- for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
- String sucFile = outputFileName + "_" + i + "/" +
-
MapReduceLauncher.SUCCEEDED_FILE_NAME;
- assertEquals("Checking if _SUCCESS file exists in
" +
- execType + " mode", false,
- Util.exists(ps.getPigContext(), sucFile));
- }
- }
- }
- }
- } finally {
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- }
- }
-
- /**
- * Test whether "part-m-00000" file is created on empty output when
- * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat
is
- * supported by Hadoop.
- * The test covers multi store and single store case in local and
mapreduce mode
- *
- * @throws IOException
- */
- @Test
- public void testEmptyPartFileCreation() throws IOException {
-
- boolean isLazyOutputPresent = true;
- try {
- Class<?> clazz = PigContext
-
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
- }
- catch (Exception e) {
- isLazyOutputPresent = false;
- }
-
- //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
- Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is
skipped", isLazyOutputPresent);
-
- PigServer ps = null;
-
- try {
- String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
-
- String multiStoreScript = "a = load '"+ inputFileName + "';" +
- "b = filter a by $0 == 'hey';" +
- "c = filter a by $1 == 'globe';" +
- "d = limit a 2;" +
- "e = foreach d generate *, 'x';" +
- "f = filter e by $3 == 'y';" +
- "store b into '" + outputFileName + "_1';" +
- "store c into '" + outputFileName + "_2';" +
- "store f into '" + outputFileName + "_3';";
-
- String singleStoreScript = "a = load '"+ inputFileName + "';" +
- "b = filter a by $0 == 'hey';" +
- "store b into '" + outputFileName + "_1';" ;
-
- for (ExecType execType : modes) {
- for(boolean isMultiStore: new boolean[] { true, false}) {
- if (isMultiStore && (execType.equals(ExecType.LOCAL) ||
- execType.equals(ExecType.MAPREDUCE))) {
- // Skip this test for Mapreduce as MapReducePOStoreImpl
- // does not handle LazyOutputFormat
- continue;
- }
-
- String script = (isMultiStore ? multiStoreScript
- : singleStoreScript);
- Util.resetStateForExecModeSwitch();
- if(execType == cluster.getExecType()) {
- ps = new PigServer(cluster.getExecType(),
- cluster.getProperties());
- } else {
- Properties props = new Properties();
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME,
"file:///");
- ps = new PigServer(ExecType.LOCAL, props);
- }
- ps.getPigContext().getProperties().setProperty(
- PigConfiguration.PIG_OUTPUT_LAZY, "true");
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- ps.setBatchOn();
- Util.createInputFile(ps.getPigContext(),
- inputFileName, inputData);
- Util.registerMultiLineQuery(ps, script);
- ps.executeBatch();
- Configuration conf =
ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties());
- for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
- assertEquals("For an empty output part-m-00000 should
not exist in " + execType + " mode",
- null,
- getFirstOutputFile(conf, new
Path(outputFileName + "_" + i), execType, true));
- }
- }
- }
- } finally {
- Util.deleteFile(ps.getPigContext(), TESTDIR);
- }
- }
-
// A UDF which always throws an Exception so that the job can fail
public static class FailUDF extends EvalFunc<String> {
Added: pig/trunk/test/org/apache/pig/test/TestStoreBase.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Wed Jan 21 06:09:06
2015
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.TestStore.DummyOutputCommitter;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public abstract class TestStoreBase {
+ protected ExecType mode;
+ protected String inputFileName;
+ protected String outputFileName;
+
+ protected static final String TESTDIR = "/tmp/" +
TestStore.class.getSimpleName();
+
+ protected static final String DUMMY_STORE_CLASS_NAME
+ = "org.apache.pig.test.TestStore\\$DummyStore";
+
+ protected static final String FAIL_UDF_NAME
+ = "org.apache.pig.test.TestStore\\$FailUDF";
+ protected static final String MAP_MAX_ATTEMPTS =
MRConfiguration.MAP_MAX_ATTEMPTS;
+
+ protected PigServer ps = null;
+
+ @Before
+ public void setUp() throws Exception {
+ inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() +
".txt";
+ outputFileName = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
+ setupPigServer();
+ }
+
+ abstract protected void setupPigServer() throws Exception;
+
+ @Test
+ public void testSetStoreSchema() throws Exception {
+ Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+ filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED,
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED,
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED,
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED,
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED,
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED,
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED,
Boolean.FALSE);
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+ String script = "a = load '"+ inputFileName + "' as (a0:chararray,
a1:chararray);" +
+ "store a into '" + outputFileName + "' using " +
+ DUMMY_STORE_CLASS_NAME + "();";
+
+ if(!mode.isLocal()) {
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED,
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED,
Boolean.TRUE);
+ } else {
+ if (Util.isHadoop1_x()) {
+ // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods
of mapreduce
+ // OutputCommitter) is fixed only in 0.23.1
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED,
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED,
Boolean.FALSE);
+ }
+ }
+ ps.setBatchOn();
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+ String condition = entry.getValue() ? "" : "not";
+ assertEquals("Checking if file " + entry.getKey() +
+ " does " + condition + " exists in " + mode +
+ " mode", (boolean) entry.getValue(),
+ Util.exists(ps.getPigContext(), entry.getKey()));
+ }
+ }
+
+ @Test
+ public void testCleanupOnFailure() throws Exception {
+ String cleanupSuccessFile = outputFileName +
"_cleanupOnFailure_succeeded";
+ String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+ String script = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName + "' using " +
+ DUMMY_STORE_CLASS_NAME + "('true');";
+
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure failed " +
+ " does not exists in " + mode + " mode", false,
+ Util.exists(ps.getPigContext(), cleanupFailFile));
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure was " +
+ "successfully called exists in " + mode + " mode", true,
+ Util.exists(ps.getPigContext(), cleanupSuccessFile));
+ }
+
+ @Test
+ public void testCleanupOnFailureMultiStore() throws Exception {
+ String outputFileName1 = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
+ String outputFileName2 = TESTDIR + "/TestStore-output-" + new
Random().nextLong() + ".txt";
+
+ Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1",
Boolean.TRUE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2",
Boolean.TRUE);
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1",
Boolean.FALSE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2",
Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1",
Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2",
Boolean.FALSE);
+
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+ // though the second store should
+ // not cause a failure, the first one does and the result should be
+ // that both stores are considered to have failed
+ String script = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName1 + "' using " +
+ DUMMY_STORE_CLASS_NAME + "('true', '1');" +
+ "store a into '" + outputFileName2 + "' using " +
+ DUMMY_STORE_CLASS_NAME + "('false', '2');";
+
+ if(mode.isLocal()) {
+ // MR LocalJobRunner does not call abortTask
+ if (!mode.toString().startsWith("TEZ")) {
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED +
"1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED +
"2", Boolean.FALSE);
+ }
+ if (Util.isHadoop1_x()) {
+ // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods
of mapreduce
+ // OutputCommitter) is fixed only in 0.23.1
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED +
"1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED +
"2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED +
"1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED +
"2", Boolean.FALSE);
+ }
+ }
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+ String condition = entry.getValue() ? "" : "not";
+ assertEquals("Checking if file " + entry.getKey() +
+ " does " + condition + " exists in " + mode +
+ " mode", (boolean) entry.getValue(),
+ Util.exists(ps.getPigContext(), entry.getKey()));
+ }
+ }
+
+ // Test that "_SUCCESS" file is created when
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
+ // property is set to true
+ // The test covers multi store and single store case in local and
mapreduce mode
+ // The test also checks that "_SUCCESS" file is NOT created when the
property
+ // is not set to true in all the modes.
+ @Test
+ public void testSuccessFileCreation1() throws Exception {
+
+ try {
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
+
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hello';" +
+ "c = filter a by $0 == 'hi';" +
+ "d = filter a by $0 == 'bye';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store d into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName + "_1';" ;
+
+ for(boolean isPropertySet: new boolean[] { true, false}) {
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ String script = (isMultiStore ? multiStoreScript :
+ singleStoreScript);
+ ps.getPigContext().getProperties().setProperty(
+
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
+ Boolean.toString(isPropertySet));
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ String sucFile = outputFileName + "_" + i + "/" +
+
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+ assertEquals("Checking if _SUCCESS file exists in " +
+ mode + " mode", isPropertySet,
+ Util.exists(ps.getPigContext(), sucFile));
+ }
+ }
+ }
+ } finally {
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ }
+ }
+
+ // Test _SUCCESS file is NOT created when job fails and when
+ // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to
true
+ // The test covers multi store and single store case in local and
mapreduce mode
+ // The test also checks that "_SUCCESS" file is NOT created when the
property
+ // is not set to true in all the modes.
+ @Test
+ public void testSuccessFileCreation2() throws Exception {
+ try {
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
+ System.err.println("XXX: " + TestStore.FailUDF.class.getName());
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hello';" +
+ "b = foreach b generate " + FAIL_UDF_NAME + "($0);" +
+ "c = filter a by $0 == 'hi';" +
+ "d = filter a by $0 == 'bye';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store d into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = foreach a generate " + FAIL_UDF_NAME + "($0);" +
+ "store b into '" + outputFileName + "_1';" ;
+
+ for(boolean isPropertySet: new boolean[] { true, false}) {
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ String script = (isMultiStore ? multiStoreScript :
+ singleStoreScript);
+ if (mode.isLocal()) {
+ // since the job is guaranteed to fail, let's set
+ // number of retries to 1.
+
ps.getPigContext().getProperties().setProperty(MAP_MAX_ATTEMPTS, "1");
+ }
+ ps.getPigContext().getProperties().setProperty(
+
MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS,
+ Boolean.toString(isPropertySet));
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ try {
+ ps.executeBatch();
+ } catch(IOException ioe) {
+ if(!ioe.getMessage().equals("FailUDFException")) {
+ // an unexpected exception
+ throw ioe;
+ }
+ }
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ String sucFile = outputFileName + "_" + i + "/" +
+
MapReduceLauncher.SUCCEEDED_FILE_NAME;
+ assertEquals("Checking if _SUCCESS file exists in " +
+ mode + " mode", false,
+ Util.exists(ps.getPigContext(), sucFile));
+ }
+ }
+ }
+ } finally {
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ }
+ }
+
+ /**
+ * Test whether "part-m-00000" file is created on empty output when
+ * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat
is
+ * supported by Hadoop.
+ * The test covers multi store and single store case in local and
mapreduce mode
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEmptyPartFileCreation() throws Exception {
+
+ boolean isLazyOutputPresent = true;
+ try {
+ Class<?> clazz = PigContext
+
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ clazz.getMethod("setOutputFormatClass", Job.class, Class.class);
+ }
+ catch (Exception e) {
+ isLazyOutputPresent = false;
+ }
+
+ //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0)
+ Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is
skipped", isLazyOutputPresent);
+
+ try {
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld",
"bye\tworld"};
+
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hey';" +
+ "c = filter a by $1 == 'globe';" +
+ "d = limit a 2;" +
+ "e = foreach d generate *, 'x';" +
+ "f = filter e by $3 == 'y';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store f into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hey';" +
+ "store b into '" + outputFileName + "_1';" ;
+
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ if (isMultiStore && (mode.isLocal() ||
+ mode.equals(ExecType.MAPREDUCE))) {
+ // Skip this test for Mapreduce as MapReducePOStoreImpl
+ // does not handle LazyOutputFormat
+ continue;
+ }
+
+ String script = (isMultiStore ? multiStoreScript
+ : singleStoreScript);
+ ps.getPigContext().getProperties().setProperty(
+ PigConfiguration.PIG_OUTPUT_LAZY, "true");
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ Configuration conf =
ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties());
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ assertEquals("For an empty output part-m-00000 should not
exist in " + mode + " mode",
+ null,
+ getFirstOutputFile(conf, new Path(outputFileName +
"_" + i), mode, true));
+ }
+ }
+ } finally {
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
+ }
+ }
+
+ public static Path getFirstOutputFile(Configuration conf, Path outputDir,
+ ExecType exectype, boolean isMapOutput) throws Exception {
+ FileSystem fs = outputDir.getFileSystem(conf);
+ FileStatus[] outputFiles = fs.listStatus(outputDir,
+ Util.getSuccessMarkerPathFilter());
+
+ boolean filefound = false;
+ if (outputFiles != null && outputFiles.length != 0) {
+ String name = outputFiles[0].getPath().getName();
+ if (exectype == Util.getLocalTestMode() || exectype ==
ExecType.MAPREDUCE) {
+ if (isMapOutput) {
+ filefound = name.equals("part-m-00000");
+ } else {
+ filefound = name.equals("part-r-00000");
+ }
+ } else {
+ filefound = name.startsWith("part-");
+ }
+ }
+ return filefound ? outputFiles[0].getPath() : null;
+ }
+}
Added: pig/trunk/test/org/apache/pig/test/TestStoreLocal.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreLocal.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreLocal.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestStoreLocal.java Wed Jan 21 06:09:06
2015
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.Properties;
+
+import org.apache.pig.PigServer;
+import org.junit.Before;
+
+public class TestStoreLocal extends TestStoreBase {
+ @Before
+ public void setUp() throws Exception {
+ mode = Util.getLocalTestMode();
+ super.setUp();
+ }
+
+ @Override
+ protected void setupPigServer() throws Exception {
+ Properties props = new Properties();
+ ps = new PigServer(Util.getLocalTestMode(), props);
+ }
+}
Added: pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TestPOPartialAggPlanTez.java Wed Jan 21
06:09:06 2015
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.pig.PigConfiguration;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.test.TestPOPartialAggPlan;
+import org.junit.Test;
+
+public class TestPOPartialAggPlanTez extends TestPOPartialAggPlan {
+ @Test
+ public void testNoMapAggProp() throws Exception{
+ //test with pig.exec.mapPartAgg not set
+ String query = getGByQuery();
+
+ TezPlanContainer tezPlanContainer =
TezUtil.buildTezPlanContainer(query, pc);
+ assertEquals(tezPlanContainer.size(), 1);
+
+ assertNull("POPartialAgg should be
absent",findPOPartialAgg(tezPlanContainer));
+ }
+
+ @Test
+ public void testMapAggPropFalse() throws Exception{
+ //test with pig.exec.mapPartAgg set to false
+ String query = getGByQuery();
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"false");
+ TezPlanContainer tezPlanContainer =
TezUtil.buildTezPlanContainer(query, pc);
+ assertEquals(tezPlanContainer.size(), 1);
+
+ assertNull("POPartialAgg should be absent",
findPOPartialAgg(tezPlanContainer));
+ }
+
+ @Test
+ public void testMapAggPropTrue() throws Exception{
+ //test with pig.exec.mapPartAgg to true
+ String query = getGByQuery();
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ TezPlanContainer tezPlanContainer =
TezUtil.buildTezPlanContainer(query, pc);
+ assertEquals(tezPlanContainer.size(), 1);
+
+ assertNotNull("POPartialAgg should be
present",findPOPartialAgg(tezPlanContainer));
+
+ }
+
+ @Test
+ public void testMapAggNoAggFunc() throws Exception{
+ //no agg func, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group;";
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ TezPlanContainer tezPlanContainer =
TezUtil.buildTezPlanContainer(query, pc);
+ assertEquals(tezPlanContainer.size(), 1);
+
+ assertNull("POPartialAgg should be
absent",findPOPartialAgg(tezPlanContainer));
+ }
+
+ @Test
+ public void testMapAggNotCombinable() throws Exception{
+ //not combinable, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group, COUNT(l.b), l.b;";
+ pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"true");
+ TezPlanContainer tezPlanContainer =
TezUtil.buildTezPlanContainer(query, pc);
+ assertEquals(tezPlanContainer.size(), 1);
+
+ assertNull("POPartialAgg should be absent",
findPOPartialAgg(tezPlanContainer));
+ }
+
+ private PhysicalOperator findPOPartialAgg(TezPlanContainer
tezPlanContainer) {
+ for (TezPlanContainerNode node : tezPlanContainer) {
+ TezOperPlan tezPlan = node.getTezOperPlan();
+ for (TezOperator tezOper : tezPlan) {
+ PhysicalOperator partialAgg = findPOPartialAgg(tezOper.plan);
+ if (partialAgg != null) {
+ return partialAgg;
+ }
+ }
+ }
+ return null;
+ }
+}
Added: pig/trunk/test/org/apache/pig/tez/TezUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TezUtil.java?rev=1653445&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TezUtil.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TezUtil.java Wed Jan 21 06:09:06 2015
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.test.Util;
+
+public class TezUtil {
+ public static TezPlanContainer buildTezPlanContainer(String query,
PigContext pc) throws Exception {
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+ PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
+ TezPlanContainer tezPlanContainer = buildTezPlanWithOptimizer(pp, pc);
+ return tezPlanContainer;
+ }
+
+ public static TezPlanContainer buildTezPlanWithOptimizer(PhysicalPlan pp,
PigContext pc) throws Exception {
+ MapRedUtil.checkLeafIsStore(pp, pc);
+ TezLauncher launcher = new TezLauncher();
+ return launcher.compile(pp, pc);
+ }
+}
Modified: pig/trunk/test/tez-local-tests
URL:
http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653445&r1=1653444&r2=1653445&view=diff
==============================================================================
--- pig/trunk/test/tez-local-tests (original)
+++ pig/trunk/test/tez-local-tests Wed Jan 21 06:09:06 2015
@@ -80,3 +80,6 @@
**/TestRank3.java
**/TestScalarAliasesLocal.java
**/TestPigStatsTez.java
+**/TestStoreLocal.java
+**/TestPOPartialAggPlanTez.java
+**/TestMultiQueryLocal.java