Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri Mar 4 18:17:39 2016 @@ -21,8 +21,6 @@ package org.apache.pig.tools.pigstats; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.util.Progressable; -import org.apache.pig.JVMReuseManager; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.hadoop.executionengine.TaskContext; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; @@ -35,11 +33,7 @@ public class PigStatusReporter extends S private TaskContext<?> context = null; - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class); - } - - @StaticDataCleanup + //@StaticDataCleanup public static void staticDataCleanup() { reporter = null; }
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Mar 4 18:17:39 2016 @@ -63,6 +63,7 @@ import org.apache.pig.impl.plan.DepthFir import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.newplan.logical.relational.LOCogroup; import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE; import org.apache.pig.newplan.logical.relational.LOCross; @@ -165,7 +166,8 @@ public abstract class ScriptState { protected String id; - protected String script; + protected String serializedScript; + protected String truncatedScript; protected String commandLine; protected String fileName; @@ -180,7 +182,8 @@ public abstract class ScriptState { protected ScriptState(String id) { this.id = id; - this.script = ""; + this.serializedScript = ""; + this.truncatedScript = ""; } public static ScriptState get() { @@ -272,7 +275,7 @@ public abstract class ScriptState { } } - public void setScript(File file) { + public void setScript(File file) throws IOException { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); @@ -289,10 +292,18 @@ public abstract class ScriptState { } } - public void setScript(String script) { + public void setScript(String script) throws IOException { if (script == null) return; + //Retain the truncated script + setTruncatedScript(script); + + //Serialize and encode the string. + this.serializedScript = ObjectSerializer.serialize(script); + } + + private void setTruncatedScript(String script) { // restrict the size of the script to be stored in job conf int maxScriptSize = 10240; if (pigContext != null) { @@ -301,13 +312,10 @@ public abstract class ScriptState { maxScriptSize = Integer.valueOf(prop); } } - script = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize) + + this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize) : script; - // XML parser cann't handle certain characters, including - // the control character (). Use Base64 encoding to - // get around this problem - this.script = new String(Base64.encodeBase64(script.getBytes())); } public void setScriptFeatures(LogicalPlan plan) { @@ -372,11 +380,15 @@ public abstract class ScriptState { return (commandLine == null) ? "" : commandLine; } - protected String getScript() { - return (script == null) ? "" : script; + public String getSerializedScript() { + return (serializedScript == null) ? "" : serializedScript; + } + + public String getScript() { + return (truncatedScript == null) ? "" : truncatedScript; } - protected void setScript(BufferedReader reader) { + protected void setScript(BufferedReader reader) throws IOException { StringBuilder sb = new StringBuilder(); try { String line = reader.readLine(); Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java Fri Mar 4 18:17:39 2016 @@ -72,7 +72,7 @@ public class MRScriptState extends Scrip conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion()); conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion()); conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id); - conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript()); + conf.set(PIG_PROPERTY.SCRIPT.toString(), getSerializedScript()); conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine()); try { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Mar 4 18:17:39 2016 @@ -33,13 +33,17 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.newplan.PlanVisitor; import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter; +import org.apache.pig.tools.pigstats.PigStatsUtil; import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.CounterGroup; @@ -66,9 +70,20 @@ public class TezDAGStats extends JobStat public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName(); public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName(); + public static final String SUCCESS_HEADER = String.format("VertexId Parallelism TotalTasks" + + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s" + + " Alias\tFeature\tOutputs", + "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten"); + + public static final String FAILURE_HEADER = String.format("VertexId State Parallelism TotalTasks" + + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s" + + " Alias\tFeature\tOutputs", + "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten"); + private Map<String, TezVertexStats> tezVertexStatsMap; private String appId; + private StringBuilder tezDAGPlan; private int totalTasks = -1; private long fileBytesRead = -1; @@ -87,37 +102,44 @@ public class TezDAGStats extends JobStat private long activeSpillCountObj = 0; private long activeSpillCountRecs = 0; - private HashMap<String, Long> multiStoreCounters + private Map<String, Long> multiStoreCounters = new HashMap<String, Long>(); + private Map<String, OutputStats> outputsByLocation + = new HashMap<String, OutputStats>(); + /** * This class builds the graph of a Tez DAG vertices. */ - static class JobGraphBuilder extends TezOpPlanVisitor { + static class TezDAGStatsBuilder extends TezOpPlanVisitor { + private TezPlanContainerNode tezPlanNode; private JobGraph jobPlan; private Map<String, TezVertexStats> tezVertexStatsMap; private List<TezVertexStats> vertexStatsToBeRemoved; private TezDAGScriptInfo dagScriptInfo; + private StringBuilder tezDAGPlan; - public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) { - super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); - tezVertexStatsMap = new HashMap<String, TezVertexStats>(); - vertexStatsToBeRemoved = new ArrayList<TezVertexStats>(); - jobPlan = new JobGraph(); + public TezDAGStatsBuilder(TezPlanContainerNode tezPlanNode, TezDAGScriptInfo dagScriptInfo) { + super(tezPlanNode.getTezOperPlan(), new DependencyOrderWalker<TezOperator, TezOperPlan>(tezPlanNode.getTezOperPlan())); + this.tezPlanNode = tezPlanNode; + this.tezVertexStatsMap = new HashMap<String, TezVertexStats>(); + this.vertexStatsToBeRemoved = new ArrayList<TezVertexStats>(); + this.jobPlan = new JobGraph(); + this.tezDAGPlan = new StringBuilder(); this.dagScriptInfo = dagScriptInfo; } - public Map<String, TezVertexStats> getTezVertexStatsMap() { - return tezVertexStatsMap; - } - - public JobGraph getJobPlan() { - return jobPlan; + public TezDAGStats build() throws VisitorException { + visit(); + TezDAGStats dagStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobPlan, tezVertexStatsMap, tezDAGPlan); + dagStats.setAlias(dagScriptInfo); + return dagStats; } @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { + TezPrinter.TezVertexGraphPrinter.writePlan(mPlan, tezOp, tezDAGPlan); TezVertexStats currStats = new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings()); jobPlan.add(currStats); @@ -131,7 +153,7 @@ public class TezDAGStats extends JobStat } } - // Remove VertexGroups (union) from JobGraph since they're not + // Remove VertexGroups (union) from JobGraph since they're not // materialized as real vertices by Tez. if (tezOp.isVertexGroup()) { vertexStatsToBeRemoved.add(currStats); @@ -158,9 +180,10 @@ public class TezDAGStats extends JobStat } - protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) { + protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap, StringBuilder tezDAGPlan) { super(name, plan); this.tezVertexStatsMap = tezVertexStatsMap; + this.tezDAGPlan = tezDAGPlan; } public TezVertexStats getVertexStats(String vertexName) { @@ -191,10 +214,10 @@ public class TezDAGStats extends JobStat totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue(); CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP); - fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue(); - fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue(); - hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue(); - hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue(); + fileBytesRead = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_READ).getValue(); + fileBytesWritten = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_WRITTEN).getValue(); + hdfsBytesRead = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_READ).getValue(); + hdfsBytesWritten = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_WRITTEN).getValue(); } else { LOG.warn("Failed to get counters for DAG: " + dag.getName()); } @@ -217,7 +240,28 @@ public class TezDAGStats extends JobStat inputs.addAll(vertexStats.getInputs()); } if(vertexStats.getOutputs() != null && !vertexStats.getOutputs().isEmpty()) { - outputs.addAll(vertexStats.getOutputs()); + for (OutputStats output : vertexStats.getOutputs()) { + if (outputsByLocation.get(output.getLocation()) != null) { + OutputStats existingOut = outputsByLocation.get(output.getLocation()); + // In case of multistore, bytesWritten is already calculated + // from size of all the files in the output directory. + if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) { + long bytes = existingOut.getBytes() > -1 + ? (existingOut.getBytes() + output.getBytes()) + : output.getBytes(); + existingOut.setBytes(bytes); + } + if (output.getRecords() > -1) { + long records = existingOut.getRecords() > -1 + ? (existingOut.getRecords() + output.getRecords()) + : output.getRecords(); + existingOut.setRecords(records); + } + } else { + outputs.add(output); + outputsByLocation.put(output.getLocation(), output); + } + } } /*if (vertexStats.getHdfsBytesRead() >= 0) { hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead; @@ -275,21 +319,53 @@ public class TezDAGStats extends JobStat public String getDisplayString() { StringBuilder sb = new StringBuilder(); - sb.append("DAG " + name + ":\n"); - sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId", + sb.append(String.format("%1$40s: %2$-100s%n", "Name", + name)); + sb.append(String.format("%1$40s: %2$-100s%n", "ApplicationId", appId)); - sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks", + sb.append(String.format("%1$40s: %2$-100s%n", "TotalLaunchedTasks", totalTasks)); - sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead", + sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesRead", fileBytesRead)); - sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten", + sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesWritten", fileBytesWritten)); - sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead", + sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesRead", hdfsBytesRead)); - sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten", + sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesWritten", hdfsBytesWritten)); + sb.append(String.format("%1$40s: %2$-100s%n", "SpillableMemoryManager spill count", + spillCount)); + sb.append(String.format("%1$40s: %2$-100s%n", "Bags proactively spilled", + activeSpillCountObj)); + sb.append(String.format("%1$40s: %2$-100s%n", "Records proactively spilled", + activeSpillCountRecs)); + + + sb.append("\nDAG Plan:\n"); + sb.append(tezDAGPlan); + + List<JobStats> success = ((JobGraph)getPlan()).getSuccessfulJobs(); + List<JobStats> failed = ((JobGraph)getPlan()).getFailedJobs(); + + if (success != null && !success.isEmpty()) { + sb.append("\nVertex Stats:\n"); + sb.append(SUCCESS_HEADER).append("\n"); + for (JobStats js : success) { + sb.append(js.getDisplayString()); + } + } + + if (failed != null && !failed.isEmpty()) { + sb.append("\nFailed vertices:\n"); + sb.append(FAILURE_HEADER).append("\n"); + for (JobStats js : failed) { + sb.append(js.getDisplayString()); + } + sb.append("\n"); + } + return sb.toString(); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Mar 4 18:17:39 2016 @@ -43,6 +43,8 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGStatus; import com.google.common.collect.Maps; @@ -73,10 +75,8 @@ public class TezPigScriptStats extends P public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException { TezScriptState ss = TezScriptState.get(); TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode); - TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo); - jobGraphBuilder.visit(); - TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap()); - currStats.setAlias(dagScriptInfo); + TezDAGStats.TezDAGStatsBuilder builder = new TezDAGStats.TezDAGStatsBuilder(tezPlanNode, dagScriptInfo); + TezDAGStats currStats = builder.build(); jobPlan.add(currStats); List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode); if (preds != null) { @@ -109,7 +109,11 @@ public class TezPigScriptStats extends P public void finish() { super.stop(); - display(); + try { + display(); + } catch (Throwable e) { + LOG.warn("Exception while displaying stats:", e); + } } private void display() { @@ -149,7 +153,10 @@ public class TezPigScriptStats extends P } } + int count = 0; for (TezDAGStats dagStats : tezDAGStatsMap.values()) { + sb.append("\n"); + sb.append("DAG " + count++ + ":\n"); sb.append(dagStats.getDisplayString()); sb.append("\n"); } @@ -186,11 +193,24 @@ public class TezPigScriptStats extends P tezScriptState.emitjobFinishedNotification(tezDAGStats); } else if (dagStatus.getState() == DAGStatus.State.FAILED) { tezDAGStats.setSuccessful(false); - tezDAGStats.setErrorMsg(tezJob.getDiagnostics()); + String diagnostics = tezJob.getDiagnostics(); + tezDAGStats.setErrorMsg(diagnostics); + tezDAGStats.setBackendException(new TezException(diagnostics)); tezScriptState.emitJobFailedNotification(tezDAGStats); } tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats); } + + if (!tezDAGStats.isSuccessful()) { + String outputCommitOnDAGSuccess = pigContext.getProperties().getProperty( + TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS); + if ((outputCommitOnDAGSuccess == null && TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT) + || "true".equals(outputCommitOnDAGSuccess)) { + for (OutputStats stats : tezDAGStats.getOutputs()) { + stats.setSuccessful(false); + } + } + } } public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper tezOper, boolean success) { Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Mar 4 18:17:39 2016 @@ -107,7 +107,7 @@ public class TezScriptState extends Scri conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion()); conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion()); conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id); - conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript()); + conf.set(PIG_PROPERTY.SCRIPT.toString(), getSerializedScript()); conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine()); } Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original) +++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Mar 4 18:17:39 2016 @@ -22,7 +22,7 @@ import static org.apache.pig.tools.pigst import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP; import java.io.IOException; -import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.Counters; import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -49,6 +50,7 @@ import org.apache.tez.common.counters.Ta import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.api.client.VertexStatus.State; import com.google.common.collect.Maps; @@ -62,25 +64,25 @@ public class TezVertexStats extends JobS private boolean isMapOpts; private int parallelism; + private State vertexState; // CounterGroup, Counter, Value private Map<String, Map<String, Long>> counters = null; private List<POStore> stores = null; private List<FileSpec> loads = null; - private int numberMaps = 0; - private int numberReduces = 0; - - private long mapInputRecords = 0; - private long mapOutputRecords = 0; - private long reduceInputRecords = 0; - private long reduceOutputRecords = 0; + private int numTasks = 0; + private long numInputRecords = 0; + private long numReduceInputRecords = 0; + private long numOutputRecords = 0; + private long fileBytesRead = 0; + private long fileBytesWritten = 0; private long spillCount = 0; private long activeSpillCountObj = 0; private long activeSpillCountRecs = 0; - private HashMap<String, Long> multiStoreCounters - = new HashMap<String, Long>(); + private Map<String, Long> multiInputCounters = Maps.newHashMap(); + private Map<String, Long> multiStoreCounters = Maps.newHashMap(); public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) { super(name, plan); @@ -103,13 +105,25 @@ public class TezVertexStats extends JobS @Override public String getDisplayString() { StringBuilder sb = new StringBuilder(); - sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name)); - if (getAlias() != null && !getAlias().isEmpty()) { - sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias())); + sb.append(String.format("%-10s ", name)); + if (state == JobState.FAILED) { + sb.append(vertexState.name()); } - if (getFeature() != null && !getFeature().isEmpty()) { - sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature())); + sb.append(String.format("%9s ", parallelism)); + sb.append(String.format("%10s ", numTasks)); + sb.append(String.format("%14s ", numInputRecords)); + sb.append(String.format("%20s ", numReduceInputRecords)); + sb.append(String.format("%14s ", numOutputRecords)); + sb.append(String.format("%14s ", fileBytesRead)); + sb.append(String.format("%16s ", fileBytesWritten)); + sb.append(String.format("%14s ", hdfsBytesRead)); + sb.append(String.format("%16s ", hdfsBytesWritten)); + sb.append(getAlias()).append("\t"); + sb.append(getFeature()).append("\t"); + for (OutputStats os : outputs) { + sb.append(os.getLocation()).append(","); } + sb.append("\n"); return sb.toString(); } @@ -123,7 +137,7 @@ public class TezVertexStats extends JobS this.stores = (List<POStore>) ObjectSerializer.deserialize( conf.get(JobControlCompiler.PIG_REDUCE_STORES)); this.loads = (List<FileSpec>) ObjectSerializer.deserialize( - conf.get("pig.inputs")); + conf.get(PigInputFormat.PIG_INPUTS)); } catch (IOException e) { LOG.warn("Failed to deserialize the store list", e); } @@ -138,17 +152,12 @@ public class TezVertexStats extends JobS } public void accumulateStats(VertexStatus status, int parallelism) { - hdfsBytesRead = -1; - hdfsBytesWritten = -1; if (status != null) { setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED)); - this.parallelism = parallelism; - if (this.isMapOpts) { - numberMaps += parallelism; - } else { - numberReduces += parallelism; - } + this.vertexState = status.getState(); + this.parallelism = parallelism; //compile time parallelism + this.numTasks = status.getProgress().getTotalTaskCount(); //run time parallelism TezCounters tezCounters = status.getVertexCounters(); counters = Maps.newHashMap(); Iterator<CounterGroup> grpIt = tezCounters.iterator(); @@ -163,14 +172,22 @@ public class TezVertexStats extends JobS counters.put(grp.getName(), cntMap); } - if (counters.get(FS_COUNTER_GROUP) != null && - counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) { - hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ); - } - if (counters.get(FS_COUNTER_GROUP) != null && - counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) { - hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN); + Map<String, Long> fsCounters = counters.get(FS_COUNTER_GROUP); + if (fsCounters != null) { + if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_READ)) { + this.hdfsBytesRead = fsCounters.get(PigStatsUtil.HDFS_BYTES_READ); + } + if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_WRITTEN)) { + this.hdfsBytesWritten = fsCounters.get(PigStatsUtil.HDFS_BYTES_WRITTEN); + } + if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_READ)) { + this.fileBytesRead = fsCounters.get(PigStatsUtil.FILE_BYTES_READ); + } + if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_WRITTEN)) { + this.fileBytesWritten = fsCounters.get(PigStatsUtil.FILE_BYTES_WRITTEN); + } } + Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP); if (pigCounters != null) { if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) { @@ -198,24 +215,44 @@ public class TezVertexStats extends JobS } public void addInputStatistics() { + + long inputRecords = -1; + Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP); + if (taskCounters != null) { + if (taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) { + inputRecords = taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()); + numInputRecords = inputRecords; + } + if (taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()) != null) { + numReduceInputRecords = taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()); + } + } + if (loads == null) { return; } + Map<String, Long> mIGroup = counters.get(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP); + if (mIGroup != null) { + multiInputCounters.putAll(mIGroup); + } + + // There is always only one load in a Tez vertex for (FileSpec fs : loads) { long records = -1; long hdfsBytesRead = -1; String filename = fs.getFileName(); if (counters != null) { - Map<String, Long> taskCounter = counters.get(TASK_COUNTER_GROUP); - if (taskCounter != null - && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) { - records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()); - if (this.isMapOpts) { - mapInputRecords += records; - } else { - reduceInputRecords += records; - } + if (mIGroup != null) { + Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0)); + if (n != null) records = n; + } + if (records == -1) { + records = inputRecords; + } + if (isSuccessful() && records == -1) { + // Tez removes 0 value counters for efficiency. + records = 0; } if (counters.get(FS_COUNTER_GROUP) != null && counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) { @@ -230,10 +267,25 @@ public class TezVertexStats extends JobS } public void addOutputStatistics() { + + long outputRecords = -1; + + Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP); + if (taskCounters != null + && taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) { + outputRecords = taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()); + numOutputRecords = outputRecords; + } + if (stores == null) { return; } + Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP); + if (msGroup != null) { + multiStoreCounters.putAll(msGroup); + } + for (POStore sto : stores) { if (sto.isTmpStore()) { continue; @@ -242,23 +294,16 @@ public class TezVertexStats extends JobS long hdfsBytesWritten = -1; String filename = sto.getSFile().getFileName(); if (counters != null) { - if (sto.isMultiStore()) { - Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP); - if (msGroup != null) { - multiStoreCounters.putAll(msGroup); - Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto)); - if (n != null) records = n; - } - } else if (counters.get(TASK_COUNTER_GROUP) != null - && counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) { - records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()); - } - if (records != -1) { - if (this.isMapOpts) { - mapOutputRecords += records; - } else { - reduceOutputRecords += records; - } + if (msGroup != null) { + Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto)); + if (n != null) records = n; + } + if (records == -1) { + records = outputRecords; + } + if (isSuccessful() && records == -1) { + // Tez removes 0 value counters for efficiency. + records = 0; } } /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */ @@ -284,13 +329,13 @@ public class TezVertexStats extends JobS @Override @Deprecated public int getNumberMaps() { - return numberMaps; + return this.isMapOpts ? numTasks : -1; } @Override @Deprecated public int getNumberReduces() { - return numberReduces; + return this.isMapOpts ? -1 : numTasks; } @Override @@ -332,25 +377,25 @@ public class TezVertexStats extends JobS @Override @Deprecated public long getMapInputRecords() { - return mapInputRecords; + return this.isMapOpts ? numInputRecords : -1; } @Override @Deprecated public long getMapOutputRecords() { - return mapOutputRecords; + return this.isMapOpts ? numOutputRecords : -1; } @Override @Deprecated public long getReduceInputRecords() { - return reduceInputRecords; + return this.isMapOpts ? -1 : numInputRecords; } @Override @Deprecated public long getReduceOutputRecords() { - return reduceOutputRecords; + return this.isMapOpts ? -1 : numOutputRecords; } @Override @@ -377,7 +422,7 @@ public class TezVertexStats extends JobS @Override @Deprecated public Map<String, Long> getMultiStoreCounters() { - return multiStoreCounters; + return Collections.unmodifiableMap(multiStoreCounters); } @Override Modified: pig/branches/spark/src/pig-default.properties URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/pig-default.properties (original) +++ pig/branches/spark/src/pig-default.properties Fri Mar 4 18:17:39 2016 @@ -57,4 +57,8 @@ pig.sql.type=hcat pig.output.committer.recovery.support=false pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader -pig.stats.output.size.reader.unsupported=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer +pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage + +pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage + +pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage \ No newline at end of file Propchange: pig/branches/spark/src/pig-default.properties ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Mar 4 18:17:39 2016 @@ -1,2 +1,2 @@ /hadoop/pig/branches/multiquery/conf/pig.properties:741727-770826 -/pig/trunk/src/pig-default.properties:1621676-1622566 +/pig/trunk/src/pig-default.properties:1621676-1733612 Modified: pig/branches/spark/src/python/streaming/controller.py URL: http://svn.apache.org/viewvc/pig/branches/spark/src/python/streaming/controller.py?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/python/streaming/controller.py (original) +++ pig/branches/spark/src/python/streaming/controller.py Fri Mar 4 18:17:39 2016 @@ -125,7 +125,12 @@ class PythonStreamingController: try: func_output = func(*inputs) if should_log: - log_message("Row %s: UDF Output: %s" % (self.input_count, unicode(func_output))) + try: + log_message("Row %s: UDF Output: %s" % (self.input_count, unicode(func_output))) + except: + #This is probably an error with unicoding the output. Calling unicode on bytearray will + #throw an exception. Since its just a log statement, just skip and carry on. + logging.exception("Couldn't log output. Try to continue.") except: #These errors should always be caused by user code. write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) Modified: pig/branches/spark/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/build.xml (original) +++ pig/branches/spark/test/e2e/pig/build.xml Fri Mar 4 18:17:39 2016 @@ -31,12 +31,16 @@ <equals arg1="${hadoopversion}" arg2="23" /> </condition> + <property name="mvnrepo" value="http://repo2.maven.org/maven2"/> + <!-- Separate property name for udfs' build.xml --> <property name="udf.dir" value="${basedir}/udfs"/> <property name="udf.java.dir" value="${udf.dir}/java"/> <property name="udf.jar" value="${udf.java.dir}/testudf.jar"/> <property name="python.udf.dir" value="${udf.dir}/python"/> + <property name="js.udf.dir" value="${udf.dir}/js" /> <property name="ruby.udf.dir" value="${udf.dir}/ruby" /> + <property name="groovy.udf.dir" value="${udf.dir}/groovy" /> <property name="cpython.udf.dir" value="${udf.dir}/cpython" /> <property name="params.dir" value="${basedir}/paramfiles"/> <property name="e2e.lib.dir" value="${basedir}/lib"/> @@ -128,12 +132,34 @@ </not> </condition> + <pathconvert property="tests.suites.all" pathsep=" "> + <path path="${test.location}/tests/cmdline.conf"/> + <path path="${test.location}/tests/multiquery.conf"/> + <path path="${test.location}/tests/negative.conf"/> + <path path="${test.location}/tests/nightly.conf"/> + <path path="${test.location}/tests/streaming.conf"/> + <path path="${test.location}/tests/streaming_local.conf"/> + <path path="${test.location}/tests/turing_jython.conf"/> + <path path="${test.location}/tests/bigdata.conf"/> + <path path="${test.location}/tests/grunt.conf"/> + <path path="${test.location}/tests/macro.conf"/> + <path path="${test.location}/tests/orc.conf"/> + <path path="${test.location}/tests/hcat.conf"/> + <path path="${test.location}/tests/utf8.conf"/> + </pathconvert> + + <condition property="tests.suites" value="${tests.suites.all}"> + <not> + <isset property="tests.suites"/> + </not> + </condition> + <target name="udfs"> <ant dir="${udf.java.dir}"/> </target> <!-- Build an archive to use in the tests --> - <target name="tar" description="Create tar file with pig modules"> + <target name="tar" description="Create tar file with pig modules" depends="download-datafu"> <mkdir dir="${tar.dir}"/> <mkdir dir="${tar.dir}/tests"/> <mkdir dir="${tar.dir}/drivers"/> @@ -144,7 +170,9 @@ <mkdir dir="${tar.dir}/libexec/PigTest/test"/> <mkdir dir="${tar.dir}/libexec/PigTest/generate"/> <mkdir dir="${tar.dir}/libexec/python"/> + <mkdir dir="${tar.dir}/libexec/js"/> <mkdir dir="${tar.dir}/libexec/ruby"/> + <mkdir dir="${tar.dir}/libexec/groovy"/> <mkdir dir="${tar.dir}/libexec/cpython"/> <mkdir dir="${tar.dir}/lib"/> <mkdir dir="${tar.dir}/lib/java"/> @@ -193,10 +221,18 @@ <fileset dir="${python.udf.dir}"/> </copy> + <copy todir="${tar.dir}/libexec/js"> + <fileset dir="${js.udf.dir}"/> + </copy> + <copy todir="${tar.dir}/libexec/ruby"> <fileset dir="${ruby.udf.dir}"/> </copy> + <copy todir="${tar.dir}/libexec/groovy"> + <fileset dir="${groovy.udf.dir}"/> + </copy> + <copy todir="${tar.dir}/libexec/cpython"> <fileset dir="${cpython.udf.dir}"/> </copy> @@ -290,22 +326,10 @@ <env key="FORK_FACTOR_FILE" value="${fork.factor.conf.file}"/> <env key="HADOOP_MAPRED_LOCAL_DIR" value="${hadoop.mapred.local.dir}"/> <env key="E2E_DEBUG" value="${e2e.debug}"/> + <env key="SORT_BENCHMARKS" value="${sort.benchmarks}"/> <arg value="./test_harness.pl"/> - <arg line="${tests.to.run}"/> - <arg value="${test.location}/tests/cmdline.conf"/> - <arg value="${test.location}/tests/multiquery.conf"/> - <arg value="${test.location}/tests/negative.conf"/> - <arg value="${test.location}/tests/nightly.conf"/> - <arg value="${test.location}/tests/streaming.conf"/> - <arg value="${test.location}/tests/streaming_local.conf"/> - <arg value="${test.location}/tests/turing_jython.conf"/> - <arg value="${test.location}/tests/bigdata.conf"/> - <arg value="${test.location}/tests/grunt.conf"/> - <arg value="${test.location}/tests/macro.conf"/> - <arg value="${test.location}/tests/orc.conf"/> - <arg value="${test.location}/tests/hcat.conf"/> - <arg value="${test.location}/tests/utf8.conf"/> + <arg line="${tests.to.run} ${tests.suites}"/> </exec> </target> @@ -404,6 +428,12 @@ <ant dir="${udf.java.dir}" target="clean"/> </target> + <target name="download-datafu" description="To download datafu" unless="offline"> + <mkdir dir="lib/java"/> + <get src="${mvnrepo}/com/linkedin/datafu/datafu/1.2.0/datafu-1.2.0.jar" + dest="lib/java/datafu.jar"/> + </target> + </project> Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original) +++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Fri Mar 4 18:17:39 2016 @@ -595,8 +595,8 @@ sub postProcessSingleOutputFile # Build command to: # 1. Combine part files my $fppCmd; - if(Util::isWindows()) { - my $delCmd = "del \"$localdir\\*.crc\""; + if(Util::isWindows()||Util::isCygwin()) { + my $delCmd = "del \"$localdir\\*.crc\" 2>NUL"; print $log "$delCmd\n"; system($delCmd); $fppCmd = "cat $localdir\\map* $localdir\\part* 2>NUL"; @@ -614,6 +614,11 @@ sub postProcessSingleOutputFile $fppCmd .= " > $localdir/out_original"; + #Need slashes to be consistent for windows + if (Util::isWindows() || Util::isCygwin()) { + $fppCmd =~ s/\\/\//g; + } + # run command print $log "$fppCmd\n"; system($fppCmd); @@ -623,6 +628,25 @@ sub postProcessSingleOutputFile print $log join(" ", @sortCmd) . "\n"; IPC::Run::run(\@sortCmd, '>', "$localdir/out_sorted") or die "Sort for benchmark comparison failed on $localdir/out_original"; + # Remove extra \r from $localdir/out_sorted for Windows benchmark + if(Util::isWindows()||Util::isCygwin()) { + my $tmpfile = "$localdir/out_sorted.tmp"; + link("$localdir/out_sorted", $tmpfile) or + die "Unable to create temporary file $tmpfile, $!\n"; + unlink("$localdir/out_sorted") or + die "Unable to unlink file $localdir/out_sorted, $!\n"; + open(IFH, "< $tmpfile") or + die "Unable to open file $tmpfile, $!\n"; + open(OFH, "> $localdir/out_sorted") or + die "Unable to open file $localdir/out_sorted, $!\n"; + while(<IFH>) { + $_ =~ s/\r$//g; + print OFH $_; + } + close(OFH); + close(IFH); + unlink($tmpfile); + } return "$localdir/out_sorted"; } @@ -936,6 +960,17 @@ sub compareSingleOutput { my ($self, $testResult, $testOutput, $benchmarkOutput, $log) = @_; + if ($ENV{'SORT_BENCHMARKS'} eq 'true'){ + # Sort the benchmark Output. + my $benchmarkOutput_new = $benchmarkOutput.'_new'; + my @sortCmd = ('sort', "$benchmarkOutput"); + print $log join(" ", @sortCmd) . "\n"; + IPC::Run::run(\@sortCmd, '>', "$benchmarkOutput_new") or die "Sort for benchmark ouput failed on $benchmarkOutput_new"; + my @renameCmd = ('mv', "$benchmarkOutput_new" , "$benchmarkOutput"); + print $log join(" ", @renameCmd) . "\n"; + IPC::Run::run(\@renameCmd, \undef, $log, $log) or die "Rename command failed"; + } + # cksum the the two files to see if they are the same my ($testChksm, $benchmarkChksm); IPC::Run::run((['cat', $testOutput], '|', ['cksum']), \$testChksm, Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Fri Mar 4 18:17:39 2016 @@ -44,7 +44,7 @@ stored as textfile;\, 'num' => 2, 'java_params' => ['-Dhcat.bin=:HCATBIN:'], 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -sql drop table if exists pig_hcat_ddl_1; +SQL drop table if exists pig_hcat_ddl_1; sql create table pig_hcat_ddl_1(name string, age int, gpa double) Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Fri Mar 4 18:17:39 2016 @@ -52,6 +52,21 @@ # - _TEST_ Streaming with multiple stores. # - _TEST_ Streaming in demux. # - _TEST_ Streaming in nested demux. +# MultiQuery_Union (Also refer Union in nightly.conf) +# - _TEST_ Multiple levels of union with join +# - _TEST_ Union with replicate join left table part of split +# - _TEST_ Union with replicate join right table part of split +# - _TEST_ Union with skewed join left table part of split +# - _TEST_ Union with skewed join right table part of split +# - _TEST_ Union with group by + combiner +# - _TEST_ Union with group by + secondary key partitioner +# - _TEST_ Union with order by +# MultiQuery_Self +# - _TEST_ Self cross +# - _TEST_ Self cogroup +# - _TEST_ Three way join (two self) +# - _TEST_ Self replicate join +# - _TEST_ Self skewed join $cfg = { @@ -554,7 +569,301 @@ $cfg = { }, ] # end of tests }, + + { + 'name' => 'MultiQuery_Union', + 'tests' => [ + { + # Union + Groupby + Combiner + 'num' => 1, + 'floatpostprocess' => 1, + 'java_params' => ['-Dpig.exec.mapPartAgg=false'], + 'delimiter' => ' ', + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +a1 = filter a by gpa >= 3.9; +a2 = filter a by gpa < 2; +c = union a1, a2; +d = group c by name; +e = foreach d generate group, SUM(c.age); +store e into ':OUTPATH:';\, + }, + { + # Union + Groupby + Combiner + POPartialAgg + 'num' => 2, + 'floatpostprocess' => 1, + 'java_params' => ['-Dpig.exec.mapPartAgg=true'], + 'delimiter' => ' ', + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +a1 = filter a by gpa >= 3.9; +a2 = filter a by gpa < 2; +c = union a1, a2; +d = group c by name; +e = foreach d generate group, SUM(c.age); +store e into ':OUTPATH:';\, + }, + { + # Union + Replicate Join left outer + Stream + Group by + Secondary Key Partitioner + 'num' => 3, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float); +a1 = filter a by gpa is null or gpa >= 3.9; +a2 = filter a by gpa < 2; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = join b by name left outer, c by name using 'replicated'; +e = stream d through `cat` as (name, age, gpa, name1, age1, registration, contributions); +f = foreach e generate name, age, gpa, registration, contributions; +g = group f by name; +g1 = group f by name; -- Two separate groupbys to ensure secondary key partitioner +h = foreach g { + inner1 = order f by age, gpa, registration, contributions; + inner2 = limit inner1 1; + generate inner2, SUM(f.age); }; +i = foreach g1 { + inner1 = order f by age asc, gpa desc, registration asc, contributions desc; + inner2 = limit inner1 1; + generate inner2, SUM(f.age); }; +store h into ':OUTPATH:.1'; +store i into ':OUTPATH:.2';\, + }, + { + # Union + Replicate Join inner + Order by + 'num' => 4, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float); +a1 = filter a by gpa is null or gpa >= 3.9; +a2 = filter a by gpa < 1; +b = union a1, a2; +b1 = filter b by age < 30; +b2 = foreach b generate name, age, FLOOR(gpa) as gpa; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = join b2 by name, c by name using 'replicated'; +e = foreach d generate b2::name as name, b2::age as age, gpa, registration, contributions; +f = order e by name, age DESC; +store f into ':OUTPATH:';\, + 'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2,2nr'], + }, + { + # Union + Replicate Join right input + 'num' => 5, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +a1 = filter a by gpa is null or gpa <= 3.9; +a2 = filter a by gpa < 2; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = join c by name, b by name using 'replicated'; +store d into ':OUTPATH:';\, + }, + { + # Union + Left outer Join + 'num' => 6, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa is null or gpa >= 3.9; +a2 = filter a by gpa < 1; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = join b by name left outer, c by name; +e = foreach d generate b::name as name, b::age as age, gpa, registration, contributions; +store e into ':OUTPATH:';\, + }, + { + # Multiple levels of union + Skewed join Right outer + 'num' => 7, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +b = filter a by gpa >= 3.9; +b1 = foreach b generate *; +b2 = foreach b generate *; +b3 = union onschema b1, b2; +c = filter a by gpa < 2; +c1 = foreach c generate *; +c2 = foreach c generate *; +c3 = union onschema c1, c2; +a1 = union onschema b3, c3; +store a1 into ':OUTPATH:.1'; +d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +e = join a1 by name right outer, d by name using 'skewed' PARALLEL 3; +store e into ':OUTPATH:.2';\, + }, + { + # Union + Skewed Join right input + 'num' => 8, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +a1 = filter a by gpa >= 3.9; +a2 = filter a by gpa < 2; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = join c by name, b by name using 'skewed' PARALLEL 3; +store d into ':OUTPATH:';\, + }, + { + # Union + CROSS + 'num' => 9, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa == 0.00; +a2 = filter a by gpa == 4.00; +b = union a1, a2; +c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +d = CROSS b, c; +store d into ':OUTPATH:';\, + }, + { + # Union + Rank + 'num' => 10, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa is null or gpa >= 3.9; +a2 = filter a by gpa < 1; +b = union a1, a2; +c = rank b; +-- Ordering is not guaranteed with union and ranking will differ. So just test rank and column separately +d = foreach c generate $0; +e = foreach c generate $1, $2, $3; +store d into ':OUTPATH:.1'; +store e into ':OUTPATH:.2';\, + }, + { + # Union + Rank dense + 'num' => 11, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa is null or gpa >= 3.9; +a2 = filter a by gpa < 1; +b = union a1, a2; +c = rank b by name ASC, age DESC DENSE; +store c into ':OUTPATH:';\, + }, + ] # end of tests + }, + + { + 'name' => 'MultiQuery_Self', + 'tests' => [ + # Self cross + { + 'num' => 1, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3.9; +c = filter a by gpa <= 0.5; +d = filter a by gpa >= 3.5 and gpa < 3.9; +e = filter a by gpa > 0.5 and gpa < 1; +f = CROSS b, c PARALLEL 3; +g = CROSS d, e PARALLEL 4; +store f into ':OUTPATH:.1'; +store g into ':OUTPATH:.2';\, + }, + { + # Self cogroup + 'num' => 2, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = cogroup c by name, b by name; +e = foreach d generate flatten(c), flatten(b); +store e into ':OUTPATH:';\, + }, + { + # Three way join (two self) + 'num' => 3, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); +e = join b by name, c by name, d by name PARALLEL 2; +store e into ':OUTPATH:';\, + }, + { + # Self join replicated + 'num' => 4, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = join c by name, b by name using 'replicated'; +store d into ':OUTPATH:';\, + }, + { + # Self join skewed + 'num' => 5, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = join c by name, b by name using 'skewed' PARALLEL 2; +store d into ':OUTPATH:';\, + }, + { + # Self join left outer + 'num' => 6, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = join c by name left outer, b by name PARALLEL 2; +store d into ':OUTPATH:';\, + }, + { + # Self join right outer + 'num' => 7, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = join c by name right outer, b by name PARALLEL 2; +store d into ':OUTPATH:';\, + }, + { + # Self join full outer + 'num' => 8, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); +b = filter a by gpa >= 3; +c = filter a by gpa < 2; +d = join c by name full outer, b by name PARALLEL 2; +store d into ':OUTPATH:';\, + }, + { + # Self join union replicated + 'num' => 9, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa == 0.00; +a2 = filter a by gpa == 4.00; +b = union a1, a2; +c = JOIN a by name, b by name using 'replicated'; +store c into ':OUTPATH:';\, + }, + { + # Self join union + 'num' => 10, + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); +a1 = filter a by gpa == 0.00; +a2 = filter a by gpa == 4.00; +b = union a1, a2; +c = JOIN a by name left, b by name; +store c into ':OUTPATH:';\, + }, + { + # Complex self join + 'num' => 11, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); +SPLIT a INTO b IF age > 40, + c IF age <= 40; + +d = FOREACH c GENERATE name, age, gpa; + +e = FILTER d BY gpa > 3; +f = FILTER d BY gpa <= 3; + +g = JOIN e BY name LEFT, f BY name; +h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa; + +i = DISTINCT h; + +j = FILTER b BY gpa > 3; +k = FILTER b by gpa <= 3; + +l = JOIN j BY name LEFT, k BY name; +m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa; +n = DISTINCT m; + +m = UNION e, i, j, n; + +n = JOIN a BY name, m BY name; +store n into ':OUTPATH:';\, + } + ] # end of tests + }, ] # end of groups } -; \ No newline at end of file +;
