Author: rohini
Date: Tue Apr 12 19:23:56 2016
New Revision: 1738857

URL: http://svn.apache.org/viewvc?rev=1738857&view=rev
Log:
PIG-4867: -stop_on_failure does not work with Tez (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738857&r1=1738856&r2=1738857&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 12 19:23:56 2016
@@ -109,6 +109,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4867: -stop_on_failure does not work with Tez (rohini)
+
 PIG-4844: Tez AM runs out of memory when vertex has high number of outputs 
(rohini)
 
 PIG-3906: ant site errors out (nielsbasjes via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1738857&r1=1738856&r2=1738857&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Tue Apr 12 19:23:56 2016
@@ -174,6 +174,10 @@ public class TezLauncher extends Launche
         tezScriptState.emitInitialPlanNotification(tezPlanContainer);
         tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); 
//number of DAGs to Launch
 
+        boolean stop_on_failure =
+                
Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
+        boolean stoppedOnFailure = false;
+
         TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
         int processedDAGs = 0;
@@ -252,7 +256,18 @@ public class TezLauncher extends Launche
                     ((tezPlanContainer.size() - 
processedDAGs)/tezPlanContainer.size()) * 100);
             }
             handleUnCaughtException(pc);
-            tezPlanContainer.updatePlan(tezPlan, 
reporter.notifyFinishedOrFailed());
+            boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
+            tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
+            // if stop_on_failure is enabled, we need to stop immediately when 
any job has failed
+            if (!tezDAGSucceeded) {
+                if (stop_on_failure) {
+                    stoppedOnFailure = true;
+                    break;
+                } else {
+                    log.warn("Ooops! Some job has failed! Specify 
-stop_on_failure if you "
+                            + "want Pig to stop immediately on failure.");
+                }
+            }
         }
 
         tezStats.finish();
@@ -279,6 +294,11 @@ public class TezLauncher extends Launche
             }
         }
 
+        if (stoppedOnFailure) {
+            throw new ExecException("Stopping execution on job failure with 
-stop_on_failure option", 6017,
+                    PigException.REMOTE_ENVIRONMENT);
+        }
+
         return tezStats;
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1738857&r1=1738856&r2=1738857&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Tue Apr 12 19:23:56 2016
@@ -963,7 +963,6 @@ public class TestGrunt {
 
     @Test
     public void testStopOnFailure() throws Throwable {
-        Assume.assumeTrue("Skip this test for TEZ", 
Util.isMapredExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), 
cluster.getProperties());
         PigContext context = server.getPigContext();
         context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1524,7 +1523,7 @@ public class TestGrunt {
         JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper();
         javaCompilerHelper.compile(tmpDir.getAbsolutePath(),
                 new 
JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc));
-        
+
         String jarName = "TestUDFJar.jar";
         String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName;
         int status = Util.executeJavaCommand("jar -cf " + jarFile +


Reply via email to