Modified: 
pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezGraceParallelism.java Fri 
Feb 24 08:19:42 2017
@@ -117,15 +117,15 @@ public class TestTezGraceParallelism {
         Util.createLogAppender("testDecreaseParallelism", writer, new 
Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
         try {
             // DAG: 47 \
-            //           -> 49(join) -> 52(distinct) -> 61(group)
+            //           -> 49(join) -> 52(distinct) -> 56(group)
             //      48 /
             // Parallelism at compile time:
             // DAG: 47(1) \
-            //              -> 49(2) -> 52(20) -> 61(200)
+            //              -> 49(2) -> 52(20) -> 56(200)
             //      48(1) /
             // However, when 49 finishes, the actual output of 49 only justify 
parallelism 1.
-            // We adjust the parallelism for 61 to 100 based on this.
-            // At runtime, ShuffleVertexManager still kick in and further 
reduce parallelism from 100 to 1.
+            // We adjust the parallelism for 56 to 7 based on this.
+            // At runtime, ShuffleVertexManager still kick in and further 
reduce parallelism from 7 to 1.
             //
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE1 + "' as (name:chararray, age:int);");
             pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -140,10 +140,10 @@ public class TestTezGraceParallelism {
                             "('F',1349L)", "('M',1373L)"});
             Util.checkQueryOutputsAfterSort(iter, expectedResults);
             assertTrue(writer.toString().contains("Initialize parallelism for 
scope-52 to 18"));
-            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-61 to 7"));
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-56 to 7"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-49 to 1 from 2"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-52 to 1 from 18"));
-            assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-61 to 1 from 7"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-56 to 1 from 7"));
         } finally {
             Util.removeLogAppender("testDecreaseParallelism", 
PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
@@ -217,8 +217,8 @@ public class TestTezGraceParallelism {
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("All predecessors for 
scope-84 are finished, time to set parallelism for scope-85"));
-            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-85 to 10"));
+            assertTrue(writer.toString().contains("All predecessors for 
scope-79 are finished, time to set parallelism for scope-80"));
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-80 to 10"));
         } finally {
             Util.removeLogAppender("testJoinWithDifferentDepth", 
PigGraceShuffleVertexManager.class);
         }
@@ -262,9 +262,9 @@ public class TestTezGraceParallelism {
         StringWriter writer = new StringWriter();
         Util.createLogAppender("testJoinWithUnion", writer, 
PigGraceShuffleVertexManager.class);
         try {
-            // DAG: 29 -> 32 -> 41 \
-            //                       -> 70 (vertex group) -> 61
-            //      42 -> 45 -> 54 /
+            // DAG: 29 -> 32 -> 36 \
+            //                       -> 55 (vertex group) -> 51
+            //      37 -> 40 -> 44 /
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
             pigServer.registerQuery("B = distinct A;");
             pigServer.registerQuery("C = group B by name;");
@@ -280,8 +280,8 @@ public class TestTezGraceParallelism {
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("time to set parallelism for 
scope-41"));
-            assertTrue(writer.toString().contains("time to set parallelism for 
scope-54"));
+            assertTrue(writer.toString().contains("time to set parallelism for 
scope-36"));
+            assertTrue(writer.toString().contains("time to set parallelism for 
scope-44"));
         } finally {
             Util.removeLogAppender("testJoinWithUnion", 
PigGraceShuffleVertexManager.class);
         }
@@ -322,4 +322,33 @@ public class TestTezGraceParallelism {
             super.setStoreLocation(location, job);
         }
     }
+
+    @Test
+    // See PIG-4786
+    public void testCross() throws IOException{
+        // scope-90 is the cross vertex. It should not use 
PigGraceShuffleVertexManager
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testCross", writer, 
PigGraceShuffleVertexManager.class);
+        File outputDir = File.createTempFile("intemediate", "txt");
+        outputDir.delete();
+        
pigServer.getPigContext().getProperties().setProperty("mapreduce.input.fileinputformat.split.maxsize",
 "3000");
+        
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", 
"true");
+        pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + 
"' as (name:chararray, gender:chararray);");
+        pigServer.registerQuery("B = order A by name;");
+        pigServer.registerQuery("C = distinct B;");
+        pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + INPUT_FILE1 + 
"' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D by name;");
+        pigServer.registerQuery("F = foreach E generate group as name, 
AVG(D.age) as avg_age;");
+        pigServer.registerQuery("G = cross C, F;");
+        Iterator<Tuple> iter = pigServer.openIterator("G");
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        assertEquals(count, 400);
+        assertFalse(writer.toString().contains("scope-90"));
+    }
 }

Modified: 
pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobControlCompiler.java 
Fri Feb 24 08:19:42 2017
@@ -21,7 +21,9 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,17 +31,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJobCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
@@ -48,6 +53,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -57,8 +63,11 @@ import org.apache.pig.test.junit.Ordered
 import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -77,7 +86,8 @@ import org.junit.runner.RunWith;
     "testTezParallelismEstimatorFilterFlatten",
     "testTezParallelismEstimatorHashJoin",
     "testTezParallelismEstimatorSplitBranch",
-    "testTezParallelismDefaultParallelism"
+    "testTezParallelismDefaultParallelism",
+    "testShuffleVertexManagerConfig"
 })
 public class TestTezJobControlCompiler {
     private static PigContext pc;
@@ -89,6 +99,7 @@ public class TestTezJobControlCompiler {
     public static void setUpBeforeClass() throws Exception {
         input1 = Util.createTempFileDelOnExit("input1", "txt").toURI();
         input2 = Util.createTempFileDelOnExit("input2", "txt").toURI();
+        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
     }
 
     @AfterClass
@@ -107,7 +118,7 @@ public class TestTezJobControlCompiler {
                 "a = load '" + input1 +"' as (x:int, y:int);" +
                 "b = filter a by x > 0;" +
                 "c = foreach b generate y;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -127,7 +138,7 @@ public class TestTezJobControlCompiler {
                 "a = load '" + input1 +"' as (x:int, y:int);" +
                 "b = group a by x;" +
                 "c = foreach b generate group, a;" +
-                "store c into 'file:///tmp/output';";
+                "store c into 'file:///tmp/pigoutput';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -159,7 +170,7 @@ public class TestTezJobControlCompiler {
                 "b = load '" + input2 +"' as (x:int, z:int);" +
                 "c = join a by x, b by x;" +
                 "d = foreach c generate a::x as x, y, z;" +
-                "store d into 'file:///tmp/output';";
+                "store d into 'file:///tmp/pigoutput';";
 
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
 
@@ -289,6 +300,72 @@ public class TestTezJobControlCompiler {
         TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
         Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
         assertEquals(leafVertex.getParallelism(), 5);
+        pc.defaultParallel = -1;
+    }
+
+    @Test
+    public void testShuffleVertexManagerConfig() throws Exception{
+        
pc.getProperties().setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
"0.3");
+        
pc.getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "500");
+
+        try {
+
+            String query = "a = load '10' using " + 
ArbitarySplitsLoader.class.getName()
+                    + "() as (name:chararray, age:int, gpa:double);"
+                    + "b = limit a 5;"
+                    + "c = group b by name;"
+                    + "store c into 'output';";
+
+            VertexManagerPluginDescriptor vmPlugin = 
getLeafVertexVMPlugin(query);
+            Configuration vmPluginConf = 
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+
+            // Case of grace auto parallelism (PigGraceShuffleVertexManager)
+            assertEquals(PigGraceShuffleVertexManager.class.getName(), 
vmPlugin.getClassName());
+            // min and max src fraction, auto parallel, desired size, 
bytes.per.reducer, pig.tez.plan and pigcontext
+            assertEquals(7, vmPluginConf.size());
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+            assertEquals("true", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+            assertEquals("500", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+            assertEquals("500", 
vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM));
+
+            // Case of auto parallelism (ShuffleVertexManager)
+            
pc.getProperties().setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, 
"false");
+            vmPlugin = getLeafVertexVMPlugin(query);
+            vmPluginConf = 
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+            assertEquals(ShuffleVertexManager.class.getName(), 
vmPlugin.getClassName());
+            // min and max src fraction, auto parallel, desired size
+            assertEquals(4, vmPluginConf.size());
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+            assertEquals("true", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL));
+            assertEquals("500", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE));
+
+            // Case of default parallel or PARALLEL (ShuffleVertexManager)
+            pc.defaultParallel = 2;
+            vmPlugin = getLeafVertexVMPlugin(query);
+            vmPluginConf = 
TezUtils.createConfFromUserPayload(vmPlugin.getUserPayload());
+            assertEquals(ShuffleVertexManager.class.getName(), 
vmPlugin.getClassName());
+            // min and max src fraction
+            assertEquals(2, vmPluginConf.size());
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION));
+            assertEquals("0.3", 
vmPluginConf.get(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION));
+        } finally {
+            
pc.getProperties().remove(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+            
pc.getProperties().remove(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+            
pc.getProperties().remove(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM);
+            pc.defaultParallel = -1;
+        }
+    }
+
+    private VertexManagerPluginDescriptor getLeafVertexVMPlugin(String query) 
throws Exception {
+        Pair<TezOperPlan, DAG> compiledPlan = compile(query);
+        TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+        Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
+        Field vmPluginField = 
Vertex.class.getDeclaredField("vertexManagerPlugin");
+        vmPluginField.setAccessible(true);
+        VertexManagerPluginDescriptor vmPlugin = 
(VertexManagerPluginDescriptor) vmPluginField.get(leafVertex);
+        return vmPlugin;
     }
 
     private Pair<TezOperPlan, DAG> compile(String query) throws Exception {

Added: pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java (added)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezJobExecution.java Fri Feb 
24 08:19:42 2017
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test class for tez specific behaviour tests
+ */
+public class TestTezJobExecution {
+
+    private static final String TEST_DIR = 
Util.getTestDirectory(TestTezJobExecution.class);
+
+    private static final String INPUT_FILE = TEST_DIR + Path.SEPARATOR + 
"input";
+    private PigServer pigServer;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+        new File(TEST_DIR).mkdirs();
+        Util.createLocalInputFile(INPUT_FILE, new String[] {
+            "1", "1", "1", "2", "2", "2"
+        });
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        Util.deleteDirectory(new File(TEST_DIR));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer("tez_local");
+    }
+
+    @Test
+    public void testUnionParallelHashValuePartition() throws IOException {
+        String output = TEST_DIR + Path.SEPARATOR + "output1";
+        String query = "A = LOAD '" + INPUT_FILE + "';"
+                + "B = LOAD '" + INPUT_FILE + "';"
+                + "C = UNION A, B PARALLEL 2;"
+                + "STORE C into '" + output + "';";
+        pigServer.registerQuery(query);
+        String part0 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00000"));
+        String part1 = FileUtils.readFileToString(new File(output + 
Path.SEPARATOR + "part-v002-o000-r-00001"));
+        assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+        assertEquals("1\n1\n1\n1\n1\n1\n", part1);
+    }
+
+    @Test
+    public void testDAGDiscoveryDisabled() throws IOException {
+        String output1 = TEST_DIR + Path.SEPARATOR + "output-parallel";
+        String output2 = TEST_DIR + Path.SEPARATOR + "output-autoparallel";
+        String scriptFile = TEST_DIR + Path.SEPARATOR + 
"testDAGRecoveryDisable.pig";
+        String query = "A = LOAD '" + INPUT_FILE + "';"
+                + "B = GROUP A BY $0 PARALLEL 1;"
+                + "STORE B into '" + output1 + "';"
+                + "exec;"
+                + "C = LOAD '" + INPUT_FILE + "';"
+                + "D = GROUP C BY $0;"
+                + "STORE D into '" + output2 + "';";
+
+        Util.createLocalInputFile(scriptFile, new String[] {query});
+
+        String[] args = { "-x", "tez_local", scriptFile };
+
+        TestNotificationListener listener = new TestNotificationListener();
+        // Recovery is not disabled when there is auto parallelism. Should 
reuse AM application session
+        PigStats stats = PigRunner.run(args, listener);
+        assertTrue(stats.isSuccessful());
+        assertEquals(1, listener.getJobsStarted().size());
+
+        Util.deleteFile(pigServer.getPigContext(), output1);
+        Util.deleteFile(pigServer.getPigContext(), output2);
+
+        // Recovery is disabled when there is auto parallelism. Should use two 
different AMs
+        listener.reset();
+        args = new String[] {
+                "-D" + 
PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY + "=true",
+                "-x",
+                "tez_local",
+                scriptFile };
+        stats = PigRunner.run(args, listener);
+        assertTrue(stats.isSuccessful());
+        assertEquals(2, listener.getJobsStarted().size());
+    }
+
+
+    private static class TestNotificationListener implements 
PigProgressNotificationListener {
+
+        private Set<String> jobsStarted = new HashSet<String>();
+
+        public void reset() {
+            this.jobsStarted.clear();
+        }
+
+        public Set<String> getJobsStarted() {
+            return jobsStarted;
+        }
+
+        @Override
+        public void initialPlanNotification(String scriptId,
+                OperatorPlan<?> plan) {
+        }
+
+        @Override
+        public void launchStartedNotification(String scriptId,
+                int numJobsToLaunch) {
+        }
+
+        @Override
+        public void jobsSubmittedNotification(String scriptId,
+                int numJobsSubmitted) {
+        }
+
+        @Override
+        public void jobStartedNotification(String scriptId, String 
assignedJobId) {
+            jobsStarted.add(assignedJobId);
+        }
+
+        @Override
+        public void jobFinishedNotification(String scriptId, JobStats 
jobStats) {
+        }
+
+        @Override
+        public void jobFailedNotification(String scriptId, JobStats jobStats) {
+        }
+
+        @Override
+        public void outputCompletedNotification(String scriptId,
+                OutputStats outputStats) {
+        }
+
+        @Override
+        public void progressUpdatedNotification(String scriptId, int progress) 
{
+
+        }
+
+        @Override
+        public void launchCompletedNotification(String scriptId,
+                int numJobsSucceeded) {
+        }
+
+    }
+
+}

Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java (original)
+++ pig/branches/spark/test/org/apache/pig/tez/TestTezLauncher.java Fri Feb 24 
08:19:42 2017
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTru
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
@@ -35,6 +34,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -124,11 +124,11 @@ public class TestTezLauncher {
 
     @Test
     public void testQueueName() throws Exception {
-        Configuration conf = new Configuration();
+        TezConfiguration conf = new TezConfiguration();
         conf.set("tez.queue.name", "special");
-        conf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        MRToTezHelper.translateMRSettingsForTezAM(conf);
         assertEquals(conf.get("tez.queue.name"), "special");
-        
+
     }
 }
 

Modified: pig/branches/spark/test/perf/pigmix/bin/generate_data.sh
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/bin/generate_data.sh?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/perf/pigmix/bin/generate_data.sh (original)
+++ pig/branches/spark/test/perf/pigmix/bin/generate_data.sh Fri Feb 24 
08:19:42 2017
@@ -25,20 +25,11 @@ fi
 
 source $PIGMIX_HOME/conf/config.sh
 
-if [ $HADOOP_VERSION == "23" ]; then
-    echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
-    $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
-else
-    echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot"
-    $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
-fi
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
 
 shopt -s extglob
-if [ $HADOOP_VERSION == "23" ]; then
-    pigjar=`echo $PIG_HOME/pig*-h2.jar`
-else
-    pigjar=`echo $PIG_HOME/pig*-h1.jar`
-fi
+pigjar=`echo $PIG_HOME/pig*-h2.jar`
 
 pigmixjar=$PIGMIX_HOME/pigmix.jar
 

Modified: pig/branches/spark/test/perf/pigmix/build.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/perf/pigmix/build.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/perf/pigmix/build.xml (original)
+++ pig/branches/spark/test/perf/pigmix/build.xml Fri Feb 24 08:19:42 2017
@@ -34,6 +34,8 @@
     </fileset>
   </path>
 
+  <property name="hadoopversion" value="2" />
+
   <property name="java.dir" value="${basedir}/src/java"/>
   <property name="pigmix.build.dir" value="${basedir}/build"/>
   <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>


Reply via email to