Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri Feb 24 08:19:42 2017 @@ -117,15 +117,15 @@ public class TestTezGraceParallelism { Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class}); try { // DAG: 47 \ - // -> 49(join) -> 52(distinct) -> 61(group) + // -> 49(join) -> 52(distinct) -> 56(group) // 48 / // Parallelism at compile time: // DAG: 47(1) \ - // -> 49(2) -> 52(20) -> 61(200) + // -> 49(2) -> 52(20) -> 56(200) // 48(1) / // However, when 49 finishes, the actual output of 49 only justify parallelism 1. - // We adjust the parallelism for 61 to 100 based on this. - // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1. + // We adjust the parallelism for 56 to 7 based on this. + // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1. // pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); @@ -140,10 +140,10 @@ public class TestTezGraceParallelism { "('F',1349L)", "('M',1373L)"}); Util.checkQueryOutputsAfterSort(iter, expectedResults); assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18")); - assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7")); + assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7")); assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2")); assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18")); - assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7")); + assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7")); } finally { Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class); } @@ -217,8 +217,8 @@ public class TestTezGraceParallelism { count++; } assertEquals(count, 20); - assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85")); - assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10")); + assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80")); + assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10")); } finally { Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class); } @@ -262,9 +262,9 @@ public class TestTezGraceParallelism { StringWriter writer = new StringWriter(); Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class); try { - // DAG: 29 -> 32 -> 41 \ - // -> 70 (vertex group) -> 61 - // 42 -> 45 -> 54 / + // DAG: 29 -> 32 -> 36 \ + // -> 55 (vertex group) -> 51 + // 37 -> 40 -> 44 / pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("B = distinct A;"); pigServer.registerQuery("C = group B by name;"); @@ -280,8 +280,8 @@ public class TestTezGraceParallelism { count++; } assertEquals(count, 20); - assertTrue(writer.toString().contains("time to set parallelism for scope-41")); - assertTrue(writer.toString().contains("time to set parallelism for scope-54")); + assertTrue(writer.toString().contains("time to set parallelism for scope-36")); + assertTrue(writer.toString().contains("time to set parallelism for scope-44")); } finally { Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class); } @@ -322,4 +322,33 @@ public class TestTezGraceParallelism { super.setStoreLocation(location, job); } } + + @Test + // See PIG-4786 + public void testCross() throws IOException{ + // scope-90 is the cross vertex. It should not use PigGraceShuffleVertexManager + NodeIdGenerator.reset(); + PigServer.resetScope(); + StringWriter writer = new StringWriter(); + Util.createLogAppender("testCross", writer, PigGraceShuffleVertexManager.class); + File outputDir = File.createTempFile("intemediate", "txt"); + outputDir.delete(); + pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize", "3000"); + pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); + pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); + pigServer.registerQuery("B = order A by name;"); + pigServer.registerQuery("C = distinct B;"); + pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("E = group D by name;"); + pigServer.registerQuery("F = foreach E generate group as name, AVG(D.age) as avg_age;"); + pigServer.registerQuery("G = cross C, F;"); + Iterator<Tuple> iter = pigServer.openIterator("G"); + int count = 0; + while (iter.hasNext()) { + iter.next(); + count++; + } + assertEquals(count, 400); + assertFalse(writer.toString().contains("scope-90")); + } }
Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java Fri Feb 24 08:19:42 2017 @@ -21,7 +21,9 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -29,17 +31,20 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; @@ -48,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.OperatorKey; @@ -57,8 +63,11 @@ import org.apache.pig.test.junit.Ordered import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.pig.tools.pigstats.tez.TezScriptState; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -77,7 +86,8 @@ import org.junit.runner.RunWith; "testTezParallelismEstimatorFilterFlatten", "testTezParallelismEstimatorHashJoin", "testTezParallelismEstimatorSplitBranch", - "testTezParallelismDefaultParallelism" + "testTezParallelismDefaultParallelism", + "testShuffleVertexManagerConfig" }) public class TestTezJobControlCompiler { private static PigContext pc; @@ -89,6 +99,7 @@ public class TestTezJobControlCompiler { public static void setUpBeforeClass() throws Exception { input1 = Util.createTempFileDelOnExit("input1", "txt").toURI(); input2 = Util.createTempFileDelOnExit("input2", "txt").toURI(); + FileUtils.deleteDirectory(new File("/tmp/pigoutput")); } @AfterClass @@ -107,7 +118,7 @@ public class TestTezJobControlCompiler { "a = load '" + input1 +"' as (x:int, y:int);" + "b = filter a by x > 0;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -127,7 +138,7 @@ public class TestTezJobControlCompiler { "a = load '" + input1 +"' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b generate group, a;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -159,7 +170,7 @@ public class TestTezJobControlCompiler { "b = load '" + input2 +"' as (x:int, z:int);" + "c = join a by x, b by x;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; Pair<TezOperPlan, DAG> compiledPlan = compile(query); @@ -289,6 +300,72 @@ public class TestTezJobControlCompiler { TezOperator leafOper = compiledPlan.first.getLeaves().get(0); Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); assertEquals(leafVertex.getParallelism(), 5); + pc.defaultParallel = -1; + } + + @Test + public void testShuffleVertexManagerConfig() throws Exception{ + pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "0.3"); + pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "500"); + + try { + + String query = "a = load '10' using " + ArbitarySplitsLoader.class.getName() + + "() as (name:chararray, age:int, gpa:double);" + + "b = limit a 5;" + + "c = group b by name;" + + "store c into 'output';"; + + VertexManagerPluginDescriptor vmPlugin = getLeafVertexVMPlugin(query); + Configuration vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); + + // Case of grace auto parallelism (PigGraceShuffleVertexManager) + assertEquals(PigGraceShuffleVertexManager.class.getName(), vmPlugin.getClassName()); + // min and max src fraction, auto parallel, desired size, bytes.per.reducer, pig.tez.plan and pigcontext + assertEquals(7, vmPluginConf.size()); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); + assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL)); + assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE)); + assertEquals("500", vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM)); + + // Case of auto parallelism (ShuffleVertexManager) + pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false"); + vmPlugin = getLeafVertexVMPlugin(query); + vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); + assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName()); + // min and max src fraction, auto parallel, desired size + assertEquals(4, vmPluginConf.size()); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); + assertEquals("true", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL)); + assertEquals("500", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE)); + + // Case of default parallel or PARALLEL (ShuffleVertexManager) + pc.defaultParallel = 2; + vmPlugin = getLeafVertexVMPlugin(query); + vmPluginConf = TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload()); + assertEquals(ShuffleVertexManager.class.getName(), vmPlugin.getClassName()); + // min and max src fraction + assertEquals(2, vmPluginConf.size()); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION)); + assertEquals("0.3", vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION)); + } finally { + pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); + pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM); + pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM); + pc.defaultParallel = -1; + } + } + + private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) throws Exception { + Pair<TezOperPlan, DAG> compiledPlan = compile(query); + TezOperator leafOper = compiledPlan.first.getLeaves().get(0); + Vertex leafVertex = compiledPlan.second.getVertex(leafOper.getOperatorKey().toString()); + Field vmPluginField = Vertex.class.getDeclaredField("vertexManagerPlugin"); + vmPluginField.setAccessible(true); + VertexManagerPluginDescriptor vmPlugin = (VertexManagerPluginDescriptor) vmPluginField.get(leafVertex); + return vmPlugin; } private Pair<TezOperPlan, DAG> compile(String query) throws Exception { Added: pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java (added) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; +import org.apache.pig.PigConfiguration; +import org.apache.pig.PigRunner; +import org.apache.pig.PigServer; +import org.apache.pig.impl.plan.OperatorPlan; +import org.apache.pig.test.Util; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; +import org.apache.pig.tools.pigstats.PigProgressNotificationListener; +import org.apache.pig.tools.pigstats.PigStats; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test class for tez specific behaviour tests + */ +public class TestTezJobExecution { + + private static final String TEST_DIR = Util.getTestDirectory(TestTezJobExecution.class); + + private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + "input"; + private PigServer pigServer; + + @BeforeClass + public static void oneTimeSetUp() throws Exception { + Util.deleteDirectory(new File(TEST_DIR)); + new File(TEST_DIR).mkdirs(); + Util.createLocalInputFile(INPUT_FILE, new String[] { + "1", "1", "1", "2", "2", "2" + }); + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + Util.deleteDirectory(new File(TEST_DIR)); + } + + @Before + public void setUp() throws Exception { + pigServer = new PigServer("tez_local"); + } + + @Test + public void testUnionParallelHashValuePartition() throws IOException { + String output = TEST_DIR + Path.SEPARATOR + "output1"; + String query = "A = LOAD '" + INPUT_FILE + "';" + + "B = LOAD '" + INPUT_FILE + "';" + + "C = UNION A, B PARALLEL 2;" + + "STORE C into '" + output + "';"; + pigServer.registerQuery(query); + String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000")); + String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001")); + assertEquals("2\n2\n2\n2\n2\n2\n", part0); + assertEquals("1\n1\n1\n1\n1\n1\n", part1); + } + + @Test + public void testDAGDiscoveryDisabled() throws IOException { + String output1 = TEST_DIR + Path.SEPARATOR + "output-parallel"; + String output2 = TEST_DIR + Path.SEPARATOR + "output-autoparallel"; + String scriptFile = TEST_DIR + Path.SEPARATOR + "testDAGRecoveryDisable.pig"; + String query = "A = LOAD '" + INPUT_FILE + "';" + + "B = GROUP A BY $0 PARALLEL 1;" + + "STORE B into '" + output1 + "';" + + "exec;" + + "C = LOAD '" + INPUT_FILE + "';" + + "D = GROUP C BY $0;" + + "STORE D into '" + output2 + "';"; + + Util.createLocalInputFile(scriptFile, new String[] {query}); + + String[] args = { "-x", "tez_local", scriptFile }; + + TestNotificationListener listener = new TestNotificationListener(); + // Recovery is not disabled when there is auto parallelism. Should reuse AM application session + PigStats stats = PigRunner.run(args, listener); + assertTrue(stats.isSuccessful()); + assertEquals(1, listener.getJobsStarted().size()); + + Util.deleteFile(pigServer.getPigContext(), output1); + Util.deleteFile(pigServer.getPigContext(), output2); + + // Recovery is disabled when there is auto parallelism. Should use two different AMs + listener.reset(); + args = new String[] { + "-D" + PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY + "=true", + "-x", + "tez_local", + scriptFile }; + stats = PigRunner.run(args, listener); + assertTrue(stats.isSuccessful()); + assertEquals(2, listener.getJobsStarted().size()); + } + + + private static class TestNotificationListener implements PigProgressNotificationListener { + + private Set<String> jobsStarted = new HashSet<String>(); + + public void reset() { + this.jobsStarted.clear(); + } + + public Set<String> getJobsStarted() { + return jobsStarted; + } + + @Override + public void initialPlanNotification(String scriptId, + OperatorPlan<?> plan) { + } + + @Override + public void launchStartedNotification(String scriptId, + int numJobsToLaunch) { + } + + @Override + public void jobsSubmittedNotification(String scriptId, + int numJobsSubmitted) { + } + + @Override + public void jobStartedNotification(String scriptId, String assignedJobId) { + jobsStarted.add(assignedJobId); + } + + @Override + public void jobFinishedNotification(String scriptId, JobStats jobStats) { + } + + @Override + public void jobFailedNotification(String scriptId, JobStats jobStats) { + } + + @Override + public void outputCompletedNotification(String scriptId, + OutputStats outputStats) { + } + + @Override + public void progressUpdatedNotification(String scriptId, int progress) { + + } + + @Override + public void launchCompletedNotification(String scriptId, + int numJobsSucceeded) { + } + + } + +} Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java Fri Feb 24 08:19:42 2017 @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru import java.util.Arrays; import java.util.Iterator; -import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType; @@ -35,6 +34,7 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; import org.apache.pig.tools.pigstats.PigStats; +import org.apache.tez.dag.api.TezConfiguration; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -124,11 +124,11 @@ public class TestTezLauncher { @Test public void testQueueName() throws Exception { - Configuration conf = new Configuration(); + TezConfiguration conf = new TezConfiguration(); conf.set("tez.queue.name", "special"); - conf = MRToTezHelper.getDAGAMConfFromMRConf(conf); + MRToTezHelper.translateMRSettingsForTezAM(conf); assertEquals(conf.get("tez.queue.name"), "special"); - + } } Modified: pig/branches/spark/test/perf/pigmix/bin/generate_data.sh URL: http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/bin/generate_data.sh?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/perf/pigmix/bin/generate_data.sh (original) +++ pig/branches/spark/test/perf/pigmix/bin/generate_data.sh Fri Feb 24 08:19:42 2017 @@ -25,20 +25,11 @@ fi source $PIGMIX_HOME/conf/config.sh -if [ $HADOOP_VERSION == "23" ]; then - echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot" - $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot -else - echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot" - $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot -fi +echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot" +$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot shopt -s extglob -if [ $HADOOP_VERSION == "23" ]; then - pigjar=`echo $PIG_HOME/pig*-h2.jar` -else - pigjar=`echo $PIG_HOME/pig*-h1.jar` -fi +pigjar=`echo $PIG_HOME/pig*-h2.jar` pigmixjar=$PIGMIX_HOME/pigmix.jar Modified: pig/branches/spark/test/perf/pigmix/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/build.xml?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/perf/pigmix/build.xml (original) +++ pig/branches/spark/test/perf/pigmix/build.xml Fri Feb 24 08:19:42 2017 @@ -34,6 +34,8 @@ </fileset> </path> + <property name="hadoopversion" value="2" /> + <property name="java.dir" value="${basedir}/src/java"/> <property name="pigmix.build.dir" value="${basedir}/build"/> <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>