Author: rohini
Date: Thu Apr 2 16:52:49 2015
New Revision: 1670938
URL: http://svn.apache.org/r1670938
Log:
PIG-4487: Pig on Tez gives wrong success message on failure in case of multiple
outputs (rohini)
PIG-4483: Pig on Tez output statistics shows storing to same directory twice
for union (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 2 16:52:49 2015
@@ -58,6 +58,10 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4487: Pig on Tez gives wrong success message on failure in case of
multiple outputs (rohini)
+
+PIG-4483: Pig on Tez output statistics shows storing to same directory twice
for union (rohini)
+
PIG-4480: Pig script failure on Tez with split and order by due to missing
sample collection (rohini)
PIG-4484: Ant pull jetty-6.1.26.zip on some platform (daijy)
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Thu Apr 2
16:52:49 2015
@@ -51,20 +51,20 @@ public final class OutputStats {
private boolean success;
private POStore store = null;
-
+
private Configuration conf;
private static final Log LOG = LogFactory.getLog(OutputStats.class);
-
+
public OutputStats(String location, long bytes, long records, boolean
success) {
this.location = location;
this.bytes = bytes;
- this.records = records;
+ this.records = records;
this.success = success;
try {
this.name = new Path(location).getName();
} catch (Exception e) {
- // location is a mal formatted URL
+ // location is a mal formatted URL
this.name = location;
}
}
@@ -81,6 +81,18 @@ public final class OutputStats {
return bytes;
}
+ public void setBytes(long bytes) {
+ this.bytes = bytes;
+ }
+
+ public long getRecords() {
+ return records;
+ }
+
+ public void setRecords(long records) {
+ this.records = records;
+ }
+
public long getNumberRecords() {
return records;
}
@@ -94,6 +106,10 @@ public final class OutputStats {
return success;
}
+ public void setSuccessful(boolean success) {
+ this.success = success;
+ }
+
public String getAlias() {
return (store == null) ? null : store.getAlias();
}
@@ -129,11 +145,11 @@ public final class OutputStats {
public void setPOStore(POStore store) {
this.store = store;
}
-
+
public void setConf(Configuration conf) {
this.conf = conf;
}
-
+
public Iterator<Tuple> iterator() throws IOException {
final LoadFunc p;
PigContext pigContext = ScriptState.get().getPigContext();
@@ -154,11 +170,12 @@ public final class OutputStats {
String msg = "Unable to get results for: " + store.getSFile();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
-
- return new Iterator<Tuple>() {
+
+ return new Iterator<Tuple>() {
Tuple t;
boolean atEnd;
+ @Override
public boolean hasNext() {
if (atEnd) return false;
try {
@@ -173,6 +190,7 @@ public final class OutputStats {
return !atEnd;
}
+ @Override
public Tuple next() {
Tuple next = t;
if (next != null) {
@@ -189,10 +207,11 @@ public final class OutputStats {
return next;
}
+ @Override
public void remove() {
throw new RuntimeException("Removal not supported");
}
-
+
};
}
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Thu Apr 2
16:52:49 2015
@@ -38,6 +38,7 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
@@ -87,9 +88,12 @@ public class TezDAGStats extends JobStat
private long activeSpillCountObj = 0;
private long activeSpillCountRecs = 0;
- private HashMap<String, Long> multiStoreCounters
+ private Map<String, Long> multiStoreCounters
= new HashMap<String, Long>();
+ private Map<String, OutputStats> outputsByLocation
+ = new HashMap<String, OutputStats>();
+
/**
* This class builds the graph of a Tez DAG vertices.
*/
@@ -217,7 +221,28 @@ public class TezDAGStats extends JobStat
inputs.addAll(vertexStats.getInputs());
}
if(vertexStats.getOutputs() != null &&
!vertexStats.getOutputs().isEmpty()) {
- outputs.addAll(vertexStats.getOutputs());
+ for (OutputStats output : vertexStats.getOutputs()) {
+ if (outputsByLocation.get(output.getLocation()) !=
null) {
+ OutputStats existingOut =
outputsByLocation.get(output.getLocation());
+ // In case of multistore, bytesWritten is already
calculated
+ // from size of all the files in the output
directory.
+ if (!output.getPOStore().isMultiStore() &&
output.getBytes() > -1) {
+ long bytes = existingOut.getBytes() > -1
+ ? (existingOut.getBytes() +
output.getBytes())
+ : output.getBytes();
+ existingOut.setBytes(bytes);
+ }
+ if (output.getRecords() > -1) {
+ long records = existingOut.getRecords() > -1
+ ? (existingOut.getRecords() +
output.getRecords())
+ : output.getRecords();
+ existingOut.setRecords(records);
+ }
+ } else {
+ outputs.add(output);
+ outputsByLocation.put(output.getLocation(),
output);
+ }
+ }
}
/*if (vertexStats.getHdfsBytesRead() >= 0) {
hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead;
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1670938&r1=1670937&r2=1670938&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Thu
Apr 2 16:52:49 2015
@@ -43,6 +43,7 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGStatus;
import com.google.common.collect.Maps;
@@ -191,6 +192,17 @@ public class TezPigScriptStats extends P
}
tezScriptState.dagCompletedNotification(tezJob.getName(),
tezDAGStats);
}
+
+ if (!tezDAGStats.isSuccessful()) {
+ String outputCommitOnDAGSuccess =
pigContext.getProperties().getProperty(
+ TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS);
+ if ((outputCommitOnDAGSuccess == null &&
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT)
+ || "true".equals(outputCommitOnDAGSuccess)) {
+ for (OutputStats stats : tezDAGStats.getOutputs()) {
+ stats.setSuccessful(false);
+ }
+ }
+ }
}
public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper
tezOper, boolean success) {