Author: rohini
Date: Thu Apr 16 14:49:18 2015
New Revision: 1674081

URL: http://svn.apache.org/r1674081
Log:
PIG-4508: [Pig on Tez] PigProcessor check for commit only on MROutput (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1674081&r1=1674080&r2=1674081&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 16 14:49:18 2015
@@ -62,6 +62,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4508: [Pig on Tez] PigProcessor check for commit only on MROutput (rohini)
+
 PIG-4505: [Pig on Tez] Auto adjust AM memory can hit OOM with 3.5GXmx (rohini)
 
 PIG-4502: E2E tests build fail with udfs compile (nmaheshwari via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1674081&r1=1674080&r2=1674081&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Thu Apr 16 14:49:18 2015
@@ -207,13 +207,15 @@ public class PigProcessor extends Abstra
                 throw new VisitorException(msg, errCode, PigException.BUG, e);
             }
 
-            while (!getContext().canCommit()) {
-                Thread.sleep(100);
-            }
-            for (MROutput fileOutput : fileOutputs){
-                fileOutput.flush();
-                if (fileOutput.isCommitRequired()) {
-                    fileOutput.commit();
+            if (!fileOutputs.isEmpty()) {
+                while (!getContext().canCommit()) {
+                    Thread.sleep(100);
+                }
+                for (MROutput fileOutput : fileOutputs){
+                    fileOutput.flush();
+                    if (fileOutput.isCommitRequired()) {
+                        fileOutput.commit();
+                    }
                 }
             }
 
@@ -233,8 +235,8 @@ public class PigProcessor extends Abstra
                 getContext().sendEvents(events);
             }
         } catch (Exception e) {
-            abortOutput();
             LOG.error("Encountered exception while processing: ", e);
+            abortOutput();
             throw e;
         }
     }
@@ -243,7 +245,7 @@ public class PigProcessor extends Abstra
         for (MROutput fileOutput : fileOutputs){
             try {
                 fileOutput.abort();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.error("Encountered exception while aborting output", e);
             }
         }


Reply via email to