Modified: pig/branches/spark/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStore.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestStore.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestStore.java Tue Jan 27 02:27:45 2015 @@ -28,8 +28,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.pig.EvalFunc; -import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -56,12 +53,9 @@ import org.apache.pig.StoreMetadata; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; @@ -84,37 +78,22 @@ import org.apache.pig.test.utils.GenRand import org.apache.pig.test.utils.TestHelper; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; -public class TestStore { +public class TestStore extends TestStoreBase { POStore st; DataBag inpDB; static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); PigContext pc; POProject proj; - PigServer pig; - - String inputFileName; - String outputFileName; - - private static final String DUMMY_STORE_CLASS_NAME - = "org.apache.pig.test.TestStore\\$DummyStore"; - - private static final String FAIL_UDF_NAME - = "org.apache.pig.test.TestStore\\$FailUDF"; - private static final String MAP_MAX_ATTEMPTS = MRConfiguration.MAP_MAX_ATTEMPTS; - private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName(); - private static ExecType[] modes = new ExecType[] { ExecType.LOCAL, cluster.getExecType() }; - + @Before public void setUp() throws Exception { - pig = new PigServer(cluster.getExecType(), cluster.getProperties()); - pc = pig.getPigContext(); - inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt"; - outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - + mode = cluster.getExecType(); + setupPigServer(); + pc = ps.getPigContext(); + super.setUp(); } @After @@ -124,14 +103,20 @@ public class TestStore { Util.deleteFile(cluster, TESTDIR); } + @Override + protected void setupPigServer() throws Exception { + ps = new PigServer(cluster.getExecType(), + cluster.getProperties()); + } + private void storeAndCopyLocally(DataBag inpDB) throws Exception { setUpInputFileOnCluster(inpDB); String script = "a = load '" + inputFileName + "'; " + "store a into '" + outputFileName + "' using PigStorage('\t');" + "fs -ls " + TESTDIR; - pig.setBatchOn(); - Util.registerMultiLineQuery(pig, script); - pig.executeBatch(); + ps.setBatchOn(); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); Path path = getFirstOutputFile(cluster.getConfiguration(), new Path(outputFileName), cluster.getExecType(), true); Util.copyFromClusterToLocal( @@ -139,28 +124,6 @@ public class TestStore { path.toString(), outputFileName); } - public static Path getFirstOutputFile(Configuration conf, Path outputDir, - ExecType exectype, boolean isMapOutput) throws IOException { - FileSystem fs = outputDir.getFileSystem(conf); - FileStatus[] outputFiles = fs.listStatus(outputDir, - Util.getSuccessMarkerPathFilter()); - - boolean filefound = false; - if (outputFiles != null && outputFiles.length != 0) { - String name = outputFiles[0].getPath().getName(); - if (exectype == ExecType.LOCAL || exectype == ExecType.MAPREDUCE) { - if (isMapOutput) { - filefound = name.equals("part-m-00000"); - } else { - filefound = name.equals("part-r-00000"); - } - } else { - filefound = name.startsWith("part-"); - } - } - return filefound ? outputFiles[0].getPath() : null; - } - @AfterClass public static void oneTimeTearDown() throws Exception { cluster.shutDown(); @@ -173,13 +136,13 @@ public class TestStore { String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);" + "store a into '" + outputFileName + "' using " + "PigStorage();"; - org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( pig, query ); + org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( ps, query ); } catch (PlanValidationException e){ // Since output file is not present, validation should pass // and not throw this exception. fail("Store validation test failed."); } finally { - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -189,11 +152,11 @@ public class TestStore { String outputFileName = "test-output.txt"; boolean sawException = false; try { - Util.createInputFile(pig.getPigContext(),outputFileName, input); + Util.createInputFile(ps.getPigContext(),outputFileName, input); String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);" + "store a into '" + outputFileName + "' using PigStorage();"; - Util.buildLp( pig, query ); + Util.buildLp( ps, query ); } catch (InvocationTargetException e){ FrontendException pve = (FrontendException)e.getCause(); pve.printStackTrace(); @@ -205,7 +168,7 @@ public class TestStore { sawException = true; } finally { assertTrue(sawException); - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -363,24 +326,24 @@ public class TestStore { String inputFileName = "testGetSchema-input.txt"; String outputFileName = "testGetSchema-output.txt"; try { - Util.createInputFile(pig.getPigContext(), + Util.createInputFile(ps.getPigContext(), inputFileName, input); String query = "a = load '" + inputFileName + "' as (c:chararray, " + "i:int,d:double);store a into '" + outputFileName + "' using " + "BinStorage();"; - pig.setBatchOn(); - Util.registerMultiLineQuery(pig, query); - pig.executeBatch(); + ps.setBatchOn(); + Util.registerMultiLineQuery(ps, query); + ps.executeBatch(); ResourceSchema rs = new BinStorage().getSchema(outputFileName, - new Job(ConfigurationUtil.toConfiguration(pig.getPigContext(). + new Job(ConfigurationUtil.toConfiguration(ps.getPigContext(). getProperties()))); Schema expectedSchema = Utils.getSchemaFromString( "c:chararray,i:int,d:double"); assertTrue("Checking binstorage getSchema output", Schema.equals( expectedSchema, Schema.getPigSchema(rs), true, true)); } finally { - Util.deleteFile(pig.getPigContext(), inputFileName); - Util.deleteFile(pig.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), inputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); } } @@ -414,391 +377,6 @@ public class TestStore { checkStorePath("/tmp/foo/../././","/tmp/foo/.././."); } - @Test - public void testSetStoreSchema() throws Exception { - PigServer ps = null; - Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); - filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE); - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - String script = "a = load '"+ inputFileName + "' as (a0:chararray, a1:chararray);" + - "store a into '" + outputFileName + "' using " + - DUMMY_STORE_CLASS_NAME + "();"; - - for (ExecType execType : modes) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - if (Util.isHadoop1_x()) { - // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce - // OutputCommitter) is fixed only in 0.23.1 - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE); - } - } - ps.setBatchOn(); - Util.deleteFile(ps.getPigContext(), TESTDIR); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { - String condition = entry.getValue() ? "" : "not"; - assertEquals("Checking if file " + entry.getKey() + - " does " + condition + " exists in " + execType + - " mode", (boolean) entry.getValue(), - Util.exists(ps.getPigContext(), entry.getKey())); - } - } - } - - @Test - public void testCleanupOnFailure() throws Exception { - PigServer ps = null; - String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded"; - String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed"; - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - String script = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName + "' using " + - DUMMY_STORE_CLASS_NAME + "('true');"; - - for (ExecType execType : modes) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - assertEquals( - "Checking if file indicating that cleanupOnFailure failed " + - " does not exists in " + execType + " mode", false, - Util.exists(ps.getPigContext(), cleanupFailFile)); - assertEquals( - "Checking if file indicating that cleanupOnFailure was " + - "successfully called exists in " + execType + " mode", true, - Util.exists(ps.getPigContext(), cleanupSuccessFile)); - } - } - - - @Test - public void testCleanupOnFailureMultiStore() throws Exception { - PigServer ps = null; - String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; - - Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); - filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); - filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); - - String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; - - // though the second store should - // not cause a failure, the first one does and the result should be - // that both stores are considered to have failed - String script = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName1 + "' using " + - DUMMY_STORE_CLASS_NAME + "('true', '1');" + - "store a into '" + outputFileName2 + "' using " + - DUMMY_STORE_CLASS_NAME + "('false', '2');"; - - for (ExecType execType : new ExecType[] {cluster.getExecType(), ExecType.LOCAL}) { - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - // LocalJobRunner does not call abortTask - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); - if (Util.isHadoop1_x()) { - // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce - // OutputCommitter) is fixed only in 0.23.1 - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); - } - } - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { - String condition = entry.getValue() ? "" : "not"; - assertEquals("Checking if file " + entry.getKey() + - " does " + condition + " exists in " + execType + - " mode", (boolean) entry.getValue(), - Util.exists(ps.getPigContext(), entry.getKey())); - } - } - } - - // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" - // property is set to true - // The test covers multi store and single store case in local and mapreduce mode - // The test also checks that "_SUCCESS" file is NOT created when the property - // is not set to true in all the modes. - @Test - public void testSuccessFileCreation1() throws Exception { - PigServer ps = null; - - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hello';" + - "c = filter a by $0 == 'hi';" + - "d = filter a by $0 == 'bye';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store d into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "store a into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isPropertySet: new boolean[] { true, false}) { - for(boolean isMultiStore: new boolean[] { true, false}) { - String script = (isMultiStore ? multiStoreScript : - singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, - Boolean.toString(isPropertySet)); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - String sucFile = outputFileName + "_" + i + "/" + - MapReduceLauncher.SUCCEEDED_FILE_NAME; - assertEquals("Checking if _SUCCESS file exists in " + - execType + " mode", isPropertySet, - Util.exists(ps.getPigContext(), sucFile)); - } - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - - // Test _SUCCESS file is NOT created when job fails and when - // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true - // The test covers multi store and single store case in local and mapreduce mode - // The test also checks that "_SUCCESS" file is NOT created when the property - // is not set to true in all the modes. - @Test - public void testSuccessFileCreation2() throws Exception { - PigServer ps = null; - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - System.err.println("XXX: " + TestStore.FailUDF.class.getName()); - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hello';" + - "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + - "c = filter a by $0 == 'hi';" + - "d = filter a by $0 == 'bye';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store d into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + - "store b into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isPropertySet: new boolean[] { true, false}) { - for(boolean isMultiStore: new boolean[] { true, false}) { - String script = (isMultiStore ? multiStoreScript : - singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - // since the job is guaranteed to fail, let's set - // number of retries to 1. - Properties props = cluster.getProperties(); - props.setProperty(MAP_MAX_ATTEMPTS, "1"); - ps = new PigServer(cluster.getExecType(), props); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - // since the job is guaranteed to fail, let's set - // number of retries to 1. - props.setProperty(MAP_MAX_ATTEMPTS, "1"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, - Boolean.toString(isPropertySet)); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - try { - ps.executeBatch(); - } catch(IOException ioe) { - if(!ioe.getMessage().equals("FailUDFException")) { - // an unexpected exception - throw ioe; - } - } - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - String sucFile = outputFileName + "_" + i + "/" + - MapReduceLauncher.SUCCEEDED_FILE_NAME; - assertEquals("Checking if _SUCCESS file exists in " + - execType + " mode", false, - Util.exists(ps.getPigContext(), sucFile)); - } - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - - /** - * Test whether "part-m-00000" file is created on empty output when - * {@link PigConfiguration#PIG_OUTPUT_LAZY} is set and if LazyOutputFormat is - * supported by Hadoop. - * The test covers multi store and single store case in local and mapreduce mode - * - * @throws IOException - */ - @Test - public void testEmptyPartFileCreation() throws IOException { - - boolean isLazyOutputPresent = true; - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - clazz.getMethod("setOutputFormatClass", Job.class, Class.class); - } - catch (Exception e) { - isLazyOutputPresent = false; - } - - //skip test if LazyOutputFormat is not supported (<= Hadoop 1.0.0) - Assume.assumeTrue("LazyOutputFormat couldn't be loaded, test is skipped", isLazyOutputPresent); - - PigServer ps = null; - - try { - String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; - - String multiStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hey';" + - "c = filter a by $1 == 'globe';" + - "d = limit a 2;" + - "e = foreach d generate *, 'x';" + - "f = filter e by $3 == 'y';" + - "store b into '" + outputFileName + "_1';" + - "store c into '" + outputFileName + "_2';" + - "store f into '" + outputFileName + "_3';"; - - String singleStoreScript = "a = load '"+ inputFileName + "';" + - "b = filter a by $0 == 'hey';" + - "store b into '" + outputFileName + "_1';" ; - - for (ExecType execType : modes) { - for(boolean isMultiStore: new boolean[] { true, false}) { - if (isMultiStore && (execType.equals(ExecType.LOCAL) || - execType.equals(ExecType.MAPREDUCE))) { - // Skip this test for Mapreduce as MapReducePOStoreImpl - // does not handle LazyOutputFormat - continue; - } - - String script = (isMultiStore ? multiStoreScript - : singleStoreScript); - Util.resetStateForExecModeSwitch(); - if(execType == cluster.getExecType()) { - ps = new PigServer(cluster.getExecType(), - cluster.getProperties()); - } else { - Properties props = new Properties(); - props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); - ps = new PigServer(ExecType.LOCAL, props); - } - ps.getPigContext().getProperties().setProperty( - PigConfiguration.PIG_OUTPUT_LAZY, "true"); - Util.deleteFile(ps.getPigContext(), TESTDIR); - ps.setBatchOn(); - Util.createInputFile(ps.getPigContext(), - inputFileName, inputData); - Util.registerMultiLineQuery(ps, script); - ps.executeBatch(); - Configuration conf = ConfigurationUtil.toConfiguration(ps.getPigContext().getProperties()); - for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { - assertEquals("For an empty output part-m-00000 should not exist in " + execType + " mode", - null, - getFirstOutputFile(conf, new Path(outputFileName + "_" + i), execType, true)); - } - } - } - } finally { - Util.deleteFile(ps.getPigContext(), TESTDIR); - } - } - // A UDF which always throws an Exception so that the job can fail public static class FailUDF extends EvalFunc<String> {
Modified: pig/branches/spark/test/org/apache/pig/test/TestStoreInstances.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStoreInstances.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestStoreInstances.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestStoreInstances.java Tue Jan 27 02:27:45 2015 @@ -41,7 +41,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.parser.ParserException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -98,8 +97,8 @@ public class TestStoreInstances { * @throws ParseException */ @Test - public void testBackendStoreCommunication() throws IOException, ParserException { - ExecType[] execTypes = { cluster.getExecType(), ExecType.LOCAL}; + public void testBackendStoreCommunication() throws Exception { + ExecType[] execTypes = { cluster.getExecType(), Util.getLocalTestMode()}; PigServer pig = null; for(ExecType execType : execTypes){ Util.resetStateForExecModeSwitch(); Modified: pig/branches/spark/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStreaming.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestStreaming.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestStreaming.java Tue Jan 27 02:27:45 2015 @@ -494,8 +494,8 @@ public class TestStreaming { String[] script = new String[] { "#!/usr/bin/perl", - "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";", - "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";", + "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", "while (<STDIN>) {", " print OUTFILE \"$_\n\";", " print STDERR \"STDERR: $_\n\";", @@ -554,8 +554,8 @@ public class TestStreaming { String[] script = new String[] { "#!/usr/bin/perl", - "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";", - "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";", + "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", "while (<STDIN>) {", " print OUTFILE \"$_\n\";", " print STDERR \"STDERR: $_\n\";", Modified: pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestStreamingLocal.java Tue Jan 27 02:27:45 2015 @@ -17,8 +17,6 @@ */ package org.apache.pig.test; -import static org.junit.Assert.assertTrue; - import java.io.File; import java.util.ArrayList; import java.util.Iterator; @@ -51,7 +49,7 @@ public class TestStreamingLocal { @Before public void setUp() throws Exception { - pigServer = new PigServer(ExecType.LOCAL); + pigServer = new PigServer(Util.getLocalTestMode()); } @After @@ -330,7 +328,7 @@ public class TestStreamingLocal { @Test public void testLocalNegativeLoadStoreOptimization() throws Exception { - testNegativeLoadStoreOptimization(ExecType.LOCAL); + testNegativeLoadStoreOptimization(Util.getLocalTestMode()); } private void testNegativeLoadStoreOptimization(ExecType execType) Modified: pig/branches/spark/test/org/apache/pig/test/TestTypedMap.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestTypedMap.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestTypedMap.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestTypedMap.java Tue Jan 27 02:27:45 2015 @@ -20,7 +20,6 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import java.io.File; -import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -29,18 +28,14 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.parser.ParserException; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; public class TestTypedMap { @@ -75,8 +70,8 @@ public class TestTypedMap { } @Test - public void testSimpleLoad() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testSimpleLoad() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key2#2]", "[key#2]", @@ -112,8 +107,8 @@ public class TestTypedMap { } @Test - public void testSimpleMapKeyLookup() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testSimpleMapKeyLookup() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key2#2]", "[key#2]", @@ -143,8 +138,8 @@ public class TestTypedMap { } @Test - public void testSimpleMapCast() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testSimpleMapCast() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key2#2]", "[key#2]", @@ -181,8 +176,8 @@ public class TestTypedMap { } @Test - public void testComplexLoad() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testComplexLoad() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#{(1,2),(1,3)},134#]", "[key2#]", @@ -215,8 +210,8 @@ public class TestTypedMap { } @Test - public void testComplexCast() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testComplexCast() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#{(1,2),(1,3)},134#]", "[key2#]", @@ -250,8 +245,8 @@ public class TestTypedMap { } @Test - public void testComplexCast2() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testComplexCast2() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key2#2]", }; @@ -280,8 +275,8 @@ public class TestTypedMap { } @Test - public void testUnTypedMap() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testUnTypedMap() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key2#2]", }; @@ -309,8 +304,8 @@ public class TestTypedMap { } @Test - public void testOrderBy() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + public void testOrderBy() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] input = { "[key#1,key1#2]", "[key#2,key3#2]", Modified: pig/branches/spark/test/org/apache/pig/test/TestUDF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUDF.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUDF.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUDF.java Tue Jan 27 02:27:45 2015 @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import org.apache.pig.EvalFunc; -import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.data.DataType; @@ -77,7 +76,7 @@ public class TestUDF { @Test public void testUDFReturnMap_LocalMode() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerScript(TempScriptFile.getAbsolutePath()); Iterator<Tuple> iterator = pig.openIterator("B"); @@ -128,7 +127,7 @@ public class TestUDF { public void testEvalFuncGetArgToFunc() throws Exception { String input = "udf_test_jira_2430.txt"; Util.createLocalInputFile(input, new String[]{"1,hey"}); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery("A = LOAD '"+input+"' USING PigStorage(',') AS (x:int,y:chararray);"); pigServer.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.TestUDF$UdfWithFuncSpecWithArgs(x);"); pigServer.registerQuery("C = FOREACH A GENERATE org.apache.pig.test.TestUDF$UdfWithFuncSpecWithArgs(y);"); @@ -152,7 +151,7 @@ public class TestUDF { public void testNormalDefine() throws Exception { String input = "udf_test_jira_2430_2.txt"; Util.createLocalInputFile(input, new String[]{"1"}); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery("A = LOAD '"+input+"' as (x:int);"); pigServer.registerQuery("DEFINE udftest1 org.apache.pig.test.TestUDF$UdfWithFuncSpecWithArgs('1');"); pigServer.registerQuery("DEFINE udftest2 org.apache.pig.test.TestUDF$UdfWithFuncSpecWithArgs('2');"); @@ -186,7 +185,7 @@ public class TestUDF { query += s + "(A),"; } query += udfs[udfs.length - 1] + "(A);"; - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery(query); Iterator<Tuple> it = pigServer.openIterator("B"); while (it.hasNext()) { @@ -202,7 +201,7 @@ public class TestUDF { @Test public void testEnsureProperSchema1() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerQuery("DEFINE goodSchema1 org.apache.pig.test.TestUDF$MirrorSchema('a:int');"); pig.registerQuery("DEFINE goodSchema2 org.apache.pig.test.TestUDF$MirrorSchema('t:(a:int, b:int, c:int)');"); pig.registerQuery("DEFINE goodSchema3 org.apache.pig.test.TestUDF$MirrorSchema('b:{(a:int, b:int, c:int)}');"); @@ -219,7 +218,7 @@ public class TestUDF { public void testEvalFuncGetVarArgToFunc() throws Exception { String input = "udf_test_jira_3444.txt"; Util.createLocalInputFile(input, new String[]{"dummy"}); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery("A = LOAD '"+input+"' USING PigStorage(',') AS (x:chararray);"); pigServer.registerQuery("B = FOREACH A GENERATE org.apache.pig.test.TestUDF$UdfWithFuncSpecWithVarArgs(3);"); pigServer.registerQuery("C = FOREACH A GENERATE org.apache.pig.test.TestUDF$UdfWithFuncSpecWithVarArgs(1,2,3,4);"); @@ -232,7 +231,7 @@ public class TestUDF { @Test(expected = FrontendException.class) public void testEnsureProperSchema2() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerQuery("DEFINE badSchema org.apache.pig.test.TestUDF$MirrorSchema('a:int, b:int, c:int');"); pig.registerQuery("a = load 'thing';"); pig.registerQuery("b = foreach a generate badSchema();"); @@ -324,7 +323,7 @@ public class TestUDF { @Test // See PIG-4184 public void testUDFNullInput() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); File inputFile = Util.createInputFile("tmp", "", new String[] {"\t", "2\t3"}); pig.registerQuery("a = load '" Modified: pig/branches/spark/test/org/apache/pig/test/TestUDFContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUDFContext.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUDFContext.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUDFContext.java Tue Jan 27 02:27:45 2015 @@ -25,7 +25,6 @@ import java.io.FileWriter; import java.util.Iterator; import java.util.Properties; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.Tuple; @@ -40,7 +39,7 @@ public class TestUDFContext { File a = Util.createLocalInputFile("a.txt", new String[] { "dumb" }); File b = Util.createLocalInputFile("b.txt", new String[] { "dumber" }); FileLocalizer.deleteTempFiles(); - PigServer pig = new PigServer(ExecType.LOCAL, new Properties()); + PigServer pig = new PigServer(Util.getLocalTestMode(), new Properties()); String[] statement = { "A = LOAD '" + Util.encodeEscape(a.getAbsolutePath()) + "' USING org.apache.pig.test.utils.UDFContextTestLoader('joe');", "B = LOAD '" + Util.encodeEscape(b.getAbsolutePath()) + @@ -77,7 +76,7 @@ public class TestUDFContext { */ @Test public void testUDFContextReset() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerQuery(" l = load 'file' as (a :int, b : int, c : int);"); pig.registerQuery(" f = foreach l generate a, b;"); pig.explain("f", System.out); Modified: pig/branches/spark/test/org/apache/pig/test/TestUDFGroovy.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUDFGroovy.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUDFGroovy.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUDFGroovy.java Tue Jan 27 02:27:45 2015 @@ -40,7 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataBag; @@ -280,7 +279,7 @@ public class TestUDFGroovy { } writer.close(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerCode(tmpScriptFile.getCanonicalPath(), "groovy", "groovyudfs"); @@ -327,7 +326,7 @@ public class TestUDFGroovy { } writer.close(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerCode(tmpScriptFile.getCanonicalPath(), "groovy", "groovyudfs"); @@ -386,7 +385,7 @@ public class TestUDFGroovy { } writer.close(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerCode(tmpScriptFile.getCanonicalPath(), "groovy", "groovyudfs"); @@ -442,7 +441,7 @@ public class TestUDFGroovy { } writer.close(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerCode(tmpScriptFile.getCanonicalPath(), "groovy", "groovyudfs"); @@ -486,7 +485,7 @@ public class TestUDFGroovy { } writer.close(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerCode(tmpScriptFile.getCanonicalPath(), "groovy", "groovyudfs"); Modified: pig/branches/spark/test/org/apache/pig/test/TestUDFWithoutParameter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUDFWithoutParameter.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUDFWithoutParameter.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUDFWithoutParameter.java Tue Jan 27 02:27:45 2015 @@ -49,7 +49,7 @@ public class TestUDFWithoutParameter { @Test public void testUDFWithoutParameter() throws Exception { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerScript(TempScriptFile.getAbsolutePath()); Iterator<Tuple> iterator=pig.openIterator("B"); Modified: pig/branches/spark/test/org/apache/pig/test/TestUTF8.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUTF8.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUTF8.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUTF8.java Tue Jan 27 02:27:45 2015 @@ -29,7 +29,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.Iterator; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataType; @@ -43,7 +42,7 @@ public class TestUTF8 { @Before public void setUp() throws Exception { - pigServer = new PigServer(ExecType.LOCAL); + pigServer = new PigServer(Util.getLocalTestMode()); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestUnion.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUnion.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUnion.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUnion.java Tue Jan 27 02:27:45 2015 @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Properties; -import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; @@ -88,7 +87,7 @@ public class TestUnion { @Before public void setUp() throws Exception { - pigServer = new PigServer(ExecType.LOCAL, new Properties()); + pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); pc = pigServer.getPigContext(); pc.connect(); GenPhyOp.setPc(pc); @@ -259,7 +258,7 @@ public class TestUnion { File f1 = Util.createInputFile("tmp", "i1.txt", new String[] {"aaa\t111"}); File f2 = Util.createInputFile("tmp", "i2.txt", new String[] {"bbb\t222"}); - PigServer ps = new PigServer(ExecType.LOCAL, new Properties()); + PigServer ps = new PigServer(Util.getLocalTestMode(), new Properties()); ps.registerQuery("A = load '" + Util.encodeEscape(f1.getAbsolutePath()) + "' as (a,b);"); ps.registerQuery("B = load '" + Util.encodeEscape(f2.getAbsolutePath()) + "' as (a,b);"); ps.registerQuery("C = union A,B;"); @@ -288,7 +287,7 @@ public class TestUnion { File f1 = Util.createInputFile("tmp", "i1.txt", new String[] {"1","2","3"}); File f2 = Util.createInputFile("tmp", "i2.txt", new String[] {"a","b","c"}); - PigServer ps = new PigServer(ExecType.LOCAL, new Properties()); + PigServer ps = new PigServer(Util.getLocalTestMode(), new Properties()); //PigStorage and TextLoader have different LoadCasters ps.registerQuery("A = load '" + Util.encodeEscape(f1.getAbsolutePath()) + "' as (a:bytearray);"); ps.registerQuery("B = load '" + Util.encodeEscape(f2.getAbsolutePath()) + "' using TextLoader() as (b:bytearray);"); @@ -320,7 +319,7 @@ public class TestUnion { File f2 = Util.createInputFile("tmp", "i2.txt", new String[] {"a","b","c"}); File f3 = Util.createInputFile("tmp", "i3.txt", new String[] {"1","2","3"}); - PigServer ps = new PigServer(ExecType.LOCAL, new Properties()); + PigServer ps = new PigServer(Util.getLocalTestMode(), new Properties()); ps.registerQuery("A = load '" + Util.encodeEscape(f1.getAbsolutePath()) + "' as (a:bytearray);"); // Using PigStorage() ps.registerQuery("B = load '" + Util.encodeEscape(f2.getAbsolutePath()) + "' using TextLoader() as (i:bytearray);"); ps.registerQuery("C = load '" + Util.encodeEscape(f3.getAbsolutePath()) + "' using TextLoader() as (i:bytearray);"); @@ -344,7 +343,7 @@ public class TestUnion { // Fields coming from different loaders but // having the same LoadCaster. File f1 = Util.createInputFile("tmp", "i1.txt", new String[] {"1\ta","2\tb","3\tc"}); - PigServer ps = new PigServer(ExecType.LOCAL, new Properties()); + PigServer ps = new PigServer(Util.getLocalTestMode(), new Properties()); // PigStorage and PigStorageWithStatistics have the same // LoadCaster(== Utf8StorageConverter) ps.registerQuery("A = load '" + Util.encodeEscape(f1.getAbsolutePath()) + "' as (a:bytearray, b:bytearray);"); Modified: pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestUnionOnSchema.java Tue Jan 27 02:27:45 2015 @@ -28,7 +28,6 @@ import java.util.List; import junit.framework.Assert; import org.apache.pig.EvalFunc; -import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage; @@ -101,8 +100,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaSameSchema() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaSameSchema() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" @@ -133,8 +132,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaFilter() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaFilter() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);" + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" @@ -166,8 +165,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaSuccOps() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaSuccOps() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int);" + "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);" @@ -199,8 +198,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaCastOnByteArray() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaCastOnByteArray() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i, j);" + " f1 = foreach l1 generate (int)i, (int)j;" @@ -228,8 +227,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaScopedColumnName() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaScopedColumnName() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query_prefix = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + "g = group l1 by i; " @@ -271,8 +270,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaScopedColumnNameBothInp1() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + "g1 = group l1 by i; " @@ -307,8 +306,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaScopedColumnNameBothInp2() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); " @@ -345,7 +344,7 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaScopedColumnNameNeg() throws IOException, ParserException { + public void testUnionOnSchemaScopedColumnNameNeg() throws Exception { String expectedErr = "Found more than one match: l1::i, l2::i"; String query_prefix = @@ -371,8 +370,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaDiffNumType() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaDiffNumType() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : double);" + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : float);" @@ -401,8 +400,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaNoCommonCols() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaNoCommonCols() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);" @@ -429,8 +428,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaAdditionalColumn() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaAdditionalColumn() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " @@ -458,8 +457,8 @@ public class TestUnionOnSchema { } @Test - public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws IOException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); Data data = Storage.resetData(pig); // Use batch to force multiple outputs from relation l3. This causes @@ -503,8 +502,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchema3Inputs() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchema3Inputs() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " + "l2 = load '" + INP_FILE_2NUMS + "' as (i : double, x : int); " @@ -538,8 +537,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaByteArrayConversions() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaByteArrayConversions() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + " (i : bytearray, x : bytearray, j : bytearray " @@ -577,7 +576,7 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaNoSchema() throws IOException, ParserException { + public void testUnionOnSchemaNoSchema() throws Exception { String expectedErr = "UNION ONSCHEMA cannot be used with " + "relations that have null schema"; String query = @@ -602,7 +601,7 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaNullAliasInFieldSchema() throws IOException, ParserException { + public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception { String expectedErr = "Schema of relation f has a null fieldschema for " + "column(s). Schema ::long,y:float"; String query = @@ -615,8 +614,8 @@ public class TestUnionOnSchema { } - private void checkSchemaEx(String query, String expectedErr) throws IOException { - PigServer pig = new PigServer(ExecType.LOCAL); + private void checkSchemaEx(String query, String expectedErr) throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); boolean foundEx = false; try{ @@ -645,7 +644,7 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaIncompatibleTypes() throws IOException, ParserException { + public void testUnionOnSchemaIncompatibleTypes() throws Exception { String query = " l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);" + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);" @@ -693,15 +692,15 @@ public class TestUnionOnSchema { } - private void checkSchemaEquals(String query, Schema expectedSch) throws IOException { - PigServer pig = new PigServer(ExecType.LOCAL); + private void checkSchemaEquals(String query, Schema expectedSch) throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); Util.registerMultiLineQuery(pig, query); Schema sch = pig.dumpSchema("u"); assertEquals("Checking expected schema", expectedSch, sch); } - private void checkSchemaEquals(String query, String schemaStr) throws IOException, ParserException { + private void checkSchemaEquals(String query, String schemaStr) throws Exception { Schema expectedSch = Utils.getSchemaFromString(schemaStr); checkSchemaEquals(query, expectedSch); } @@ -713,8 +712,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaInputUdfs() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaInputUdfs() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);" + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);" @@ -750,8 +749,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaUdfTypeEvolution() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaUdfTypeEvolution() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query_prefix = " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + " (i : int, c : chararray, j : int " @@ -802,8 +801,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaUdfTypeEvolution2() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaUdfTypeEvolution2() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query_prefix = " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + " (i : int, c : chararray, j : int " @@ -874,8 +873,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testUnionOnSchemaScopeMulti() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testUnionOnSchemaScopeMulti() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query_prefix = " a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); " + "b = group a by i; " @@ -921,8 +920,8 @@ public class TestUnionOnSchema { * @throws ParserException */ @Test - public void testTwoUnions() throws IOException, ParserException { - PigServer pig = new PigServer(ExecType.LOCAL); + public void testTwoUnions() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); String query = " l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);" + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);" Modified: pig/branches/spark/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/Util.java (original) +++ pig/branches/spark/test/org/apache/pig/test/Util.java Tue Jan 27 02:27:45 2015 @@ -45,8 +45,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -650,23 +650,7 @@ public class Util { static public void copyFromLocalToLocal(String fromLocalFileName, String toLocalFileName) throws IOException { - if(Util.WINDOWS){ - fromLocalFileName = fromLocalFileName.replace('\\','/'); - toLocalFileName = toLocalFileName.replace('\\','/'); - } - PigServer ps = new PigServer(ExecType.LOCAL, new Properties()); - String script = getMkDirCommandForHadoop2_0(toLocalFileName) + "fs -cp " + fromLocalFileName + " " + toLocalFileName; - - new File(toLocalFileName).deleteOnExit(); - - GruntParser parser = new GruntParser(new StringReader(script), ps); - parser.setInteractive(false); - try { - parser.parseStopOnError(); - } catch (org.apache.pig.tools.pigscript.parser.ParseException e) { - throw new IOException(e); - } - + FileUtils.copyFile(new File(fromLocalFileName), new File(toLocalFileName)); } static public void copyFromClusterToLocal(MiniGenericCluster cluster, @@ -1386,4 +1370,24 @@ public class Util { appender.close(); logger.removeAppender(appenderName); } + + public static Path getFirstPartFile(Path path) throws Exception { + FileStatus[] parts = FileSystem.get(path.toUri(), new Configuration()).listStatus(path, + new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part-"); + } + }); + return parts[0].getPath(); + } + + public static File getFirstPartFile(File dir) throws Exception { + File[] parts = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith("part-"); + }; + }); + return parts[0]; + } } Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld Tue Jan 27 02:27:45 2015 @@ -4,7 +4,7 @@ #-------------------------------------------------- # TEZ DAG plan: pig-0_scope-0 #-------------------------------------------------- -Tez vertex scope-12 -> Tez vertex scope-33,Tez vertex scope-22, +Tez vertex scope-12 -> Tez vertex scope-22,Tez vertex scope-33, Tez vertex scope-22 -> Tez vertex scope-33, Tez vertex scope-33 -> Tez vertex scope-35, Tez vertex scope-35 -> Tez vertex scope-46, Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld Tue Jan 27 02:27:45 2015 @@ -4,10 +4,10 @@ #-------------------------------------------------- # TEZ DAG plan: pig-0_scope-0 #-------------------------------------------------- -Tez vertex scope-33 -> Tez vertex scope-37,Tez vertex scope-60,Tez vertex scope-49, +Tez vertex scope-33 -> Tez vertex scope-37,Tez vertex scope-49,Tez vertex scope-60, Tez vertex scope-49 -> Tez vertex scope-60, Tez vertex scope-60 -> Tez vertex scope-62, -Tez vertex scope-37 -> Tez vertex scope-67,Tez vertex scope-62, +Tez vertex scope-37 -> Tez vertex scope-62,Tez vertex scope-67, Tez vertex scope-62 -> Tez vertex scope-67, Tez vertex scope-67 @@ -102,7 +102,7 @@ g: Local Rearrange[tuple]{chararray}(fal | |---g: Package(CombinerPackager)[tuple]{chararray} - scope-81 # Plan on vertex -POValueOutputTez - scope-70 -> [scope-67, scope-62] +POValueOutputTez - scope-70 -> [scope-62, scope-67] | |---h: New For Each(false)[bag] - scope-23 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld Tue Jan 27 02:27:45 2015 @@ -6,12 +6,12 @@ #-------------------------------------------------- Tez vertex scope-57 -> Tez vertex scope-59,Tez vertex scope-61,Tez vertex scope-63, Tez vertex scope-59 -Tez vertex scope-63 Tez vertex scope-61 +Tez vertex scope-63 Tez vertex scope-57 # Plan on vertex -POValueOutputTez - scope-58 -> [scope-59, scope-63, scope-61] +POValueOutputTez - scope-58 -> [scope-59, scope-61, scope-63] | |---a: New For Each(false,false)[bag] - scope-40 | | @@ -37,19 +37,6 @@ b: Store(file:///tmp/output/b:org.apache | |---Constant(5) - scope-44 | |---POValueInputTez - scope-60 <- scope-57 -Tez vertex scope-63 -# Plan on vertex -d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-56 -| -|---d: Filter[bag] - scope-52 - | | - | Greater Than[boolean] - scope-55 - | | - | |---Project[int][0] - scope-53 - | | - | |---Constant(10) - scope-54 - | - |---POValueInputTez - scope-64 <- scope-57 Tez vertex scope-61 # Plan on vertex c: Store(file:///tmp/output/c:org.apache.pig.builtin.PigStorage) - scope-51 @@ -63,3 +50,16 @@ c: Store(file:///tmp/output/c:org.apache | |---Constant(10) - scope-49 | |---POValueInputTez - scope-62 <- scope-57 +Tez vertex scope-63 +# Plan on vertex +d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-56 +| +|---d: Filter[bag] - scope-52 + | | + | Greater Than[boolean] - scope-55 + | | + | |---Project[int][0] - scope-53 + | | + | |---Constant(10) - scope-54 + | + |---POValueInputTez - scope-64 <- scope-57 Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld Tue Jan 27 02:27:45 2015 @@ -5,34 +5,34 @@ # TEZ DAG plan: pig-0_scope-1 #-------------------------------------------------- Tez vertex scope-317 -> Tez vertex scope-319,Tez vertex scope-330,Tez vertex scope-341, -Tez vertex scope-341 -> Tez vertex scope-344,Tez vertex scope-384, -Tez vertex scope-384 -> Tez vertex group scope-412, Tez vertex scope-319 -> Tez vertex scope-322,Tez vertex scope-335,Tez vertex scope-346, -Tez vertex scope-346 -> Tez vertex scope-349,Tez vertex scope-375, -Tez vertex scope-375 -> Tez vertex scope-378, -Tez vertex scope-378 -> Tez vertex scope-382,Tez vertex scope-386, -Tez vertex scope-386 -> Tez vertex group scope-412, -Tez vertex group scope-412 -Tez vertex scope-382 -Tez vertex scope-330 -> Tez vertex scope-333,Tez vertex scope-338, -Tez vertex scope-338 -> Tez vertex scope-340, -Tez vertex scope-340 Tez vertex scope-322 -> Tez vertex scope-324, Tez vertex scope-324 -> Tez vertex scope-326,Tez vertex scope-328, Tez vertex scope-326 -Tez vertex scope-344 Tez vertex scope-328 -Tez vertex scope-349 -> Tez vertex scope-369,Tez vertex scope-359, -Tez vertex scope-359 -> Tez vertex scope-369, -Tez vertex scope-369 -> Tez vertex scope-371, -Tez vertex scope-371 +Tez vertex scope-330 -> Tez vertex scope-333,Tez vertex scope-338, Tez vertex scope-333 -> Tez vertex scope-337, Tez vertex scope-335 -> Tez vertex scope-337, Tez vertex scope-337 +Tez vertex scope-338 -> Tez vertex scope-340, +Tez vertex scope-340 +Tez vertex scope-341 -> Tez vertex scope-344,Tez vertex scope-384, +Tez vertex scope-344 +Tez vertex scope-346 -> Tez vertex scope-349,Tez vertex scope-375, +Tez vertex scope-349 -> Tez vertex scope-359,Tez vertex scope-369, +Tez vertex scope-359 -> Tez vertex scope-369, +Tez vertex scope-369 -> Tez vertex scope-371, +Tez vertex scope-371 +Tez vertex scope-375 -> Tez vertex scope-378, +Tez vertex scope-378 -> Tez vertex scope-382,Tez vertex scope-386, +Tez vertex scope-382 +Tez vertex scope-384 -> Tez vertex group scope-412, +Tez vertex scope-386 -> Tez vertex group scope-412, +Tez vertex group scope-412 Tez vertex scope-317 # Plan on vertex -POValueOutputTez - scope-318 -> [scope-341, scope-319, scope-330] +POValueOutputTez - scope-318 -> [scope-319, scope-330, scope-341] | |---a: New For Each(false,false)[bag] - scope-218 | | @@ -45,35 +45,9 @@ POValueOutputTez - scope-318 -> [scope- | |---Project[bytearray][1] - scope-215 | |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-211 -Tez vertex scope-341 -# Plan on vertex -POValueOutputTez - scope-343 -> [scope-384, scope-344] -| -|---d1: Filter[bag] - scope-284 - | | - | Equal To[boolean] - scope-287 - | | - | |---Project[int][0] - scope-285 - | | - | |---Constant(5) - scope-286 - | - |---d: Filter[bag] - scope-280 - | | - | Greater Than[boolean] - scope-283 - | | - | |---Project[int][0] - scope-281 - | | - | |---Constant(10) - scope-282 - | - |---POValueInputTez - scope-342 <- scope-317 -Tez vertex scope-384 -# Plan on vertex -f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-413 -| -|---POValueInputTez - scope-385 <- scope-341 Tez vertex scope-319 # Plan on vertex -POValueOutputTez - scope-321 -> [scope-335, scope-322, scope-346] +POValueOutputTez - scope-321 -> [scope-322, scope-335, scope-346] | |---b: Filter[bag] - scope-220 | | @@ -84,48 +58,41 @@ POValueOutputTez - scope-321 -> [scope- | |---Constant(5) - scope-222 | |---POValueInputTez - scope-320 <- scope-317 -Tez vertex scope-346 -# Plan on vertex -POValueOutputTez - scope-348 -> [scope-375, scope-349] -| -|---POValueInputTez - scope-347 <- scope-319 -Tez vertex scope-375 +Tez vertex scope-322 # Plan on vertex -POValueOutputTez - scope-377 -> [scope-378] +b1: Local Rearrange[tuple]{int}(false) - scope-229 -> scope-324 +| | +| Project[int][0] - scope-230 | -|---f1: Limit - scope-306 - | - |---f: Filter[bag] - scope-302 - | | - | Greater Than or Equal[boolean] - scope-305 - | | - | |---Project[int][0] - scope-303 - | | - | |---Constant(3) - scope-304 - | - |---POValueInputTez - scope-376 <- scope-346 -Tez vertex scope-378 +|---POValueInputTez - scope-323 <- scope-319 +Tez vertex scope-324 # Plan on vertex -POValueOutputTez - scope-381 -> [scope-386, scope-382] +POValueOutputTez - scope-325 -> [scope-326, scope-328] | -|---f1: Limit - scope-380 - | - |---POValueInputTez - scope-379 <- scope-375 -Tez vertex scope-386 +|---b1: Package(Packager)[tuple]{int} - scope-228 +Tez vertex scope-326 # Plan on vertex -f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-414 +b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-234 | -|---POValueInputTez - scope-387 <- scope-378 -Tez vertex group scope-412 <- [scope-384, scope-386] -> null -# No plan on vertex group -Tez vertex scope-382 +|---POValueInputTez - scope-327 <- scope-324 +Tez vertex scope-328 # Plan on vertex -f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-310 +b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-244 | -|---POValueInputTez - scope-383 <- scope-378 +|---b2: New For Each(false,false)[bag] - scope-243 + | | + | Project[int][0] - scope-237 + | | + | POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-241 + | | + | |---Project[bag][0] - scope-240 + | | + | |---Project[bag][1] - scope-239 + | + |---POValueInputTez - scope-329 <- scope-324 Tez vertex scope-330 # Plan on vertex -POValueOutputTez - scope-332 -> [scope-338, scope-333] +POValueOutputTez - scope-332 -> [scope-333, scope-338] | |---c: Filter[bag] - scope-245 | | @@ -136,6 +103,31 @@ POValueOutputTez - scope-332 -> [scope- | |---Constant(10) - scope-247 | |---POValueInputTez - scope-331 <- scope-317 +Tez vertex scope-333 +# Plan on vertex +c1: Local Rearrange[tuple]{int}(false) - scope-258 -> scope-337 +| | +| Project[int][0] - scope-259 +| +|---POValueInputTez - scope-334 <- scope-330 +Tez vertex scope-335 +# Plan on vertex +c1: Local Rearrange[tuple]{int}(false) - scope-260 -> scope-337 +| | +| Project[int][0] - scope-261 +| +|---POValueInputTez - scope-336 <- scope-319 +Tez vertex scope-337 +# Plan on vertex +c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-265 +| +|---c1: New For Each(true,true)[tuple] - scope-264 + | | + | Project[bag][1] - scope-262 + | | + | Project[bag][2] - scope-263 + | + |---c1: Package(Packager)[tuple]{int} - scope-257 Tez vertex scope-338 # Plan on vertex c2: Local Rearrange[tuple]{int}(false) - scope-404 -> scope-340 @@ -182,43 +174,37 @@ c3: Store(file:///tmp/output/c1:org.apac | |---Project[bag][1] - scope-401 | |---c2: Package(CombinerPackager)[tuple]{int} - scope-269 -Tez vertex scope-322 -# Plan on vertex -b1: Local Rearrange[tuple]{int}(false) - scope-229 -> scope-324 -| | -| Project[int][0] - scope-230 -| -|---POValueInputTez - scope-323 <- scope-319 -Tez vertex scope-324 -# Plan on vertex -POValueOutputTez - scope-325 -> [scope-328, scope-326] -| -|---b1: Package(Packager)[tuple]{int} - scope-228 -Tez vertex scope-326 +Tez vertex scope-341 # Plan on vertex -b1: Store(file:///tmp/output/b1:org.apache.pig.builtin.PigStorage) - scope-234 +POValueOutputTez - scope-343 -> [scope-344, scope-384] | -|---POValueInputTez - scope-327 <- scope-324 +|---d1: Filter[bag] - scope-284 + | | + | Equal To[boolean] - scope-287 + | | + | |---Project[int][0] - scope-285 + | | + | |---Constant(5) - scope-286 + | + |---d: Filter[bag] - scope-280 + | | + | Greater Than[boolean] - scope-283 + | | + | |---Project[int][0] - scope-281 + | | + | |---Constant(10) - scope-282 + | + |---POValueInputTez - scope-342 <- scope-317 Tez vertex scope-344 # Plan on vertex d1: Store(file:///tmp/output/d1:org.apache.pig.builtin.PigStorage) - scope-291 | |---POValueInputTez - scope-345 <- scope-341 -Tez vertex scope-328 +Tez vertex scope-346 # Plan on vertex -b2: Store(file:///tmp/output/b2:org.apache.pig.builtin.PigStorage) - scope-244 +POValueOutputTez - scope-348 -> [scope-349, scope-375] | -|---b2: New For Each(false,false)[bag] - scope-243 - | | - | Project[int][0] - scope-237 - | | - | POUserFunc(org.apache.pig.builtin.LongSum)[long] - scope-241 - | | - | |---Project[bag][0] - scope-240 - | | - | |---Project[bag][1] - scope-239 - | - |---POValueInputTez - scope-329 <- scope-324 +|---POValueInputTez - scope-347 <- scope-319 Tez vertex scope-349 # Plan on vertex Local Rearrange[tuple]{tuple}(false) - scope-353 -> scope-359 @@ -279,28 +265,42 @@ e1: Store(file:///tmp/output/e1:org.apac | Project[bag][1] - scope-373 | |---Package(LitePackager)[tuple]{int} - scope-372 -Tez vertex scope-333 +Tez vertex scope-375 # Plan on vertex -c1: Local Rearrange[tuple]{int}(false) - scope-258 -> scope-337 -| | -| Project[int][0] - scope-259 +POValueOutputTez - scope-377 -> [scope-378] | -|---POValueInputTez - scope-334 <- scope-330 -Tez vertex scope-335 +|---f1: Limit - scope-306 + | + |---f: Filter[bag] - scope-302 + | | + | Greater Than or Equal[boolean] - scope-305 + | | + | |---Project[int][0] - scope-303 + | | + | |---Constant(3) - scope-304 + | + |---POValueInputTez - scope-376 <- scope-346 +Tez vertex scope-378 # Plan on vertex -c1: Local Rearrange[tuple]{int}(false) - scope-260 -> scope-337 -| | -| Project[int][0] - scope-261 +POValueOutputTez - scope-381 -> [scope-382, scope-386] | -|---POValueInputTez - scope-336 <- scope-319 -Tez vertex scope-337 +|---f1: Limit - scope-380 + | + |---POValueInputTez - scope-379 <- scope-375 +Tez vertex scope-382 # Plan on vertex -c1: Store(file:///tmp/output/c1:org.apache.pig.builtin.PigStorage) - scope-265 +f1: Store(file:///tmp/output/f1:org.apache.pig.builtin.PigStorage) - scope-310 | -|---c1: New For Each(true,true)[tuple] - scope-264 - | | - | Project[bag][1] - scope-262 - | | - | Project[bag][2] - scope-263 - | - |---c1: Package(Packager)[tuple]{int} - scope-257 +|---POValueInputTez - scope-383 <- scope-378 +Tez vertex scope-384 +# Plan on vertex +f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-413 +| +|---POValueInputTez - scope-385 <- scope-341 +Tez vertex scope-386 +# Plan on vertex +f2: Store(file:///tmp/output/f2:org.apache.pig.builtin.PigStorage) - scope-414 +| +|---POValueInputTez - scope-387 <- scope-378 +Tez vertex group scope-412 <- [scope-384, scope-386] -> null +# No plan on vertex group
