Author: rohini
Date: Thu Apr  2 16:52:49 2015
New Revision: 1670938

URL: http://svn.apache.org/r1670938
Log:
PIG-4487: Pig on Tez gives wrong success message on failure in case of multiple 
outputs (rohini)
PIG-4483: Pig on Tez output statistics shows storing to same directory twice 
for union (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr  2 16:52:49 2015
@@ -58,6 +58,10 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4487: Pig on Tez gives wrong success message on failure in case of 
multiple outputs (rohini)
+
+PIG-4483: Pig on Tez output statistics shows storing to same directory twice 
for union (rohini)
+
 PIG-4480: Pig script failure on Tez with split and order by due to missing 
sample collection (rohini)
 
 PIG-4484: Ant pull jetty-6.1.26.zip on some platform (daijy)

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Thu Apr  2 
16:52:49 2015
@@ -51,20 +51,20 @@ public final class OutputStats {
     private boolean success;
 
     private POStore store = null;
-    
+
     private Configuration conf;
 
     private static final Log LOG = LogFactory.getLog(OutputStats.class);
-    
+
     public OutputStats(String location, long bytes, long records, boolean 
success) {
         this.location = location;
         this.bytes = bytes;
-        this.records = records;        
+        this.records = records;
         this.success = success;
         try {
             this.name = new Path(location).getName();
         } catch (Exception e) {
-            // location is a mal formatted URL 
+            // location is a mal formatted URL
             this.name = location;
         }
     }
@@ -81,6 +81,18 @@ public final class OutputStats {
         return bytes;
     }
 
+    public void setBytes(long bytes) {
+        this.bytes = bytes;
+    }
+
+    public long getRecords() {
+        return records;
+    }
+
+    public void setRecords(long records) {
+        this.records = records;
+    }
+
     public long getNumberRecords() {
         return records;
     }
@@ -94,6 +106,10 @@ public final class OutputStats {
         return success;
     }
 
+    public void setSuccessful(boolean success) {
+        this.success = success;
+    }
+
     public String getAlias() {
         return (store == null) ? null : store.getAlias();
     }
@@ -129,11 +145,11 @@ public final class OutputStats {
     public void setPOStore(POStore store) {
         this.store = store;
     }
-    
+
     public void setConf(Configuration conf) {
         this.conf = conf;
     }
-    
+
     public Iterator<Tuple> iterator() throws IOException {
         final LoadFunc p;
         PigContext pigContext = ScriptState.get().getPigContext();
@@ -154,11 +170,12 @@ public final class OutputStats {
             String msg = "Unable to get results for: " + store.getSFile();
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-        
-        return new Iterator<Tuple>() {        
+
+        return new Iterator<Tuple>() {
             Tuple   t;
             boolean atEnd;
 
+            @Override
             public boolean hasNext() {
                 if (atEnd) return false;
                 try {
@@ -173,6 +190,7 @@ public final class OutputStats {
                 return !atEnd;
             }
 
+            @Override
             public Tuple next() {
                 Tuple next = t;
                 if (next != null) {
@@ -189,10 +207,11 @@ public final class OutputStats {
                 return next;
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("Removal not supported");
             }
- 
+
         };
     }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Thu Apr  2 
16:52:49 2015
@@ -38,6 +38,7 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
@@ -87,9 +88,12 @@ public class TezDAGStats extends JobStat
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
 
-    private HashMap<String, Long> multiStoreCounters
+    private Map<String, Long> multiStoreCounters
             = new HashMap<String, Long>();
 
+    private Map<String, OutputStats> outputsByLocation
+            = new HashMap<String, OutputStats>();
+
     /**
      * This class builds the graph of a Tez DAG vertices.
      */
@@ -217,7 +221,28 @@ public class TezDAGStats extends JobStat
                     inputs.addAll(vertexStats.getInputs());
                 }
                 if(vertexStats.getOutputs() != null  && 
!vertexStats.getOutputs().isEmpty()) {
-                    outputs.addAll(vertexStats.getOutputs());
+                    for (OutputStats output : vertexStats.getOutputs()) {
+                        if (outputsByLocation.get(output.getLocation()) != 
null) {
+                            OutputStats existingOut = 
outputsByLocation.get(output.getLocation());
+                            // In case of multistore, bytesWritten is already 
calculated
+                            // from size of all the files in the output 
directory.
+                            if (!output.getPOStore().isMultiStore() && 
output.getBytes() > -1) {
+                                long bytes = existingOut.getBytes() > -1
+                                        ? (existingOut.getBytes() + 
output.getBytes())
+                                        : output.getBytes();
+                                existingOut.setBytes(bytes);
+                            }
+                            if (output.getRecords() > -1) {
+                                long records = existingOut.getRecords() > -1
+                                        ? (existingOut.getRecords() + 
output.getRecords())
+                                        : output.getRecords();
+                                existingOut.setRecords(records);
+                            }
+                        } else {
+                            outputs.add(output);
+                            outputsByLocation.put(output.getLocation(), 
output);
+                        }
+                    }
                 }
                 /*if (vertexStats.getHdfsBytesRead() >= 0) {
                     hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead;

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Thu 
Apr  2 16:52:49 2015
@@ -43,6 +43,7 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGStatus;
 
 import com.google.common.collect.Maps;
@@ -191,6 +192,17 @@ public class TezPigScriptStats extends P
             }
             tezScriptState.dagCompletedNotification(tezJob.getName(), 
tezDAGStats);
         }
+
+        if (!tezDAGStats.isSuccessful()) {
+            String outputCommitOnDAGSuccess = 
pigContext.getProperties().getProperty(
+                    TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS);
+            if ((outputCommitOnDAGSuccess == null && 
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT)
+                    || "true".equals(outputCommitOnDAGSuccess)) {
+                for (OutputStats stats : tezDAGStats.getOutputs()) {
+                    stats.setSuccessful(false);
+                }
+            }
+        }
     }
 
     public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper 
tezOper, boolean success) {


Reply via email to