Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Tue Jan 27 02:27:45 2015 @@ -18,14 +18,18 @@ package org.apache.pig.backend.hadoop.executionengine.tez; +import java.util.Properties; import java.util.UUID; +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.pig.tools.pigstats.tez.TezScriptState; +import org.apache.tez.dag.api.TezConfiguration; public class TezExecutionEngine extends HExecutionEngine { @@ -45,4 +49,11 @@ public class TezExecutionEngine extends public PigStats instantiatePigStats() { return new TezPigScriptStats(pigContext); } + + @Override + public JobConf getExecConf(Properties properties) throws ExecException { + JobConf jc = super.getExecConf(properties); + jc.addResource(TezConfiguration.TEZ_SITE_XML); + return jc; + } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Tue Jan 27 02:27:45 2015 @@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.EnumSet; -import java.util.HashSet; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -177,13 +176,18 @@ public class TezJob implements Runnable while (true) { try { - dagStatus = dagClient.getDAGStatus(statusGetOpts); + dagStatus = dagClient.getDAGStatus(null); } catch (Exception e) { log.info("Cannot retrieve DAG status", e); break; } if (dagStatus.isCompleted()) { + try { + dagStatus = dagClient.getDAGStatus(statusGetOpts); + } catch (Exception e) { + log.warn("Failed to retrieve DAG counters", e); + } // For tez_local mode where PigProcessor destroys all UDFContext UDFContext.setUdfContext(udfContext); @@ -219,16 +223,10 @@ public class TezJob implements Runnable private class DAGStatusReporter extends TimerTask { - private final String LINE_SEPARATOR = System.getProperty("line.separator"); - @Override public void run() { if (dagStatus == null) return; - String msg = "status=" + dagStatus.getState() - + ", progress=" + dagStatus.getDAGProgress() - + ", diagnostics=" - + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR); - log.info("DAG Status: " + msg); + log.info("DAG Status: " + dagStatus.toString()); } } @@ -248,7 +246,7 @@ public class TezJob implements Runnable public String getDiagnostics() { try { if (dagClient != null && dagStatus == null) { - dagStatus = dagClient.getDAGStatus(new HashSet<StatusGetOpts>()); + dagStatus = dagClient.getDAGStatus(null); } if (dagStatus != null) { return StringUtils.join(dagStatus.getDiagnostics(), "\n"); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Tue Jan 27 02:27:45 2015 @@ -75,6 +75,7 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -95,18 +96,24 @@ public class TezLauncher extends Launche if (namedThreadFactory == null) { namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("PigTezLauncher-%d") + .setDaemon(true) .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler()) .build(); } - executor = Executors.newSingleThreadExecutor(namedThreadFactory); } @Override public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception { + synchronized (this) { + if (executor == null) { + executor = Executors.newSingleThreadExecutor(namedThreadFactory); + } + } if (pc.getExecType().isLocal()) { pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, "true"); pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true"); pc.getProperties().setProperty("tez.ignore.lib.uris", "true"); + pc.getProperties().setProperty("tez.am.dag.scheduler.class", DAGSchedulerNaturalOrderControlled.class.getName()); } Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Jan 27 02:27:45 2015 @@ -297,9 +297,11 @@ public class TezCompiler extends PhyPlan userFunc.getInputs().remove(1); } - TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp); - //TODO shared edge once support is available in Tez - TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + if (tezPlan.getPredecessors(tezOp)==null || !tezPlan.getPredecessors(tezOp).contains(from)) { + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, from, tezOp); + //TODO shared edge once support is available in Tez + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + } } } } @@ -366,7 +368,7 @@ public class TezCompiler extends PhyPlan storeOnlyPhyPlan.addAsLeaf(store); storeOnlyTezOperator.plan = storeOnlyPhyPlan; tezPlan.add(storeOnlyTezOperator); - phyToTezOpMap.put(store, storeOnlyTezOperator); + phyToTezOpMap.put(p, storeOnlyTezOperator); // Create new operator as second splittee curTezOp = getTezOp(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Jan 27 02:27:45 2015 @@ -608,6 +608,7 @@ public class TezOperator extends Operato private POStore store; private OutputDescriptor storeOutDescriptor; private VertexGroup vertexGroup; + private FileSpec sFile; public VertexGroupInfo() { } @@ -659,6 +660,13 @@ public class TezOperator extends Operato this.vertexGroup = vertexGroup; } + public void setSFile(FileSpec sFile) { + this.sFile = sFile; + } + + public FileSpec getSFile() { + return sFile; + } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Tue Jan 27 02:27:45 2015 @@ -18,8 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan; import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map.Entry; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -42,7 +43,7 @@ public class TezPrinter extends TezOpPla * @param plan tez plan to print */ public TezPrinter(PrintStream ps, TezOperPlan plan) { - super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true)); mStream = ps; } @@ -62,13 +63,16 @@ public class TezPrinter extends TezOpPla mStream.println("Tez vertex " + tezOper.getOperatorKey().toString()); } if (tezOper.inEdges.size() > 0) { - for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) { + List<OperatorKey> inEdges = new ArrayList<OperatorKey>(tezOper.inEdges.keySet()); + Collections.sort(inEdges); + for (OperatorKey inEdge : inEdges) { //TODO: Print other edge properties like custom partitioner - if (!inEdge.getValue().combinePlan.isEmpty()) { - mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">"); + TezEdgeDescriptor edgeDesc = tezOper.inEdges.get(inEdge); + if (!edgeDesc.combinePlan.isEmpty()) { + mStream.println("# Combine plan on edge <" + inEdge + ">"); PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>( - inEdge.getValue().combinePlan, mStream); + edgeDesc.combinePlan, mStream); printer.setVerbose(isVerbose); printer.visit(); mStream.println(); @@ -93,7 +97,7 @@ public class TezPrinter extends TezOpPla StringBuffer buf; public TezGraphPrinter(TezOperPlan plan) { - super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true)); buf = new StringBuffer(); } @@ -106,6 +110,7 @@ public class TezPrinter extends TezOpPla } List<TezOperator> succs = mPlan.getSuccessors(tezOper); if (succs != null) { + Collections.sort(succs); buf.append("\t->\t"); for (TezOperator op : succs) { if (op.isVertexGroup()) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Tue Jan 27 02:27:45 2015 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -160,6 +161,8 @@ public class POShuffledValueInputTez ext @Override public String name() { - return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + inputKeys; + List<String> inputKeyList = new ArrayList<String>(inputKeys); + Collections.sort(inputKeyList); + return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + inputKeyList; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java Tue Jan 27 02:27:45 2015 @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -189,7 +190,9 @@ public class POValueOutputTez extends Ph @Override public String name() { - return "POValueOutputTez - " + mKey.toString() + "\t->\t " + outputKeys; + List<String> outputKeyList = new ArrayList<String>(outputKeys); + Collections.sort(outputKeyList); + return "POValueOutputTez - " + mKey.toString() + "\t->\t " + outputKeyList; } public static class EmptyWritable implements Writable { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Tue Jan 27 02:27:45 2015 @@ -18,8 +18,10 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map.Entry; +import java.util.Set; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; @@ -54,7 +56,6 @@ public class MultiQueryOptimizerTez exte List<TezOperator> splittees = new ArrayList<TezOperator>(); List<TezOperator> successors = getPlan().getSuccessors(tezOp); - List<TezOperator> succ_successors = new ArrayList<TezOperator>(); for (TezOperator successor : successors) { // If has other dependency, don't merge into split, @@ -65,20 +66,21 @@ public class MultiQueryOptimizerTez exte // Detect diamond shape, we cannot merge it into split, since Tez // does not handle double edge between vertexes // TODO: PIG-3876 to handle this by writing to same edge - boolean sharedSucc = false; - if (getPlan().getSuccessors(successor)!=null) { - for (TezOperator succ_successor : getPlan().getSuccessors(successor)) { - if (succ_successors.contains(succ_successor)) { - sharedSucc = true; - break; - } + Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>(); + Set<TezOperator> toMergeSuccessors = new HashSet<TezOperator>(); + mergedSuccessors.addAll(successors); + for (TezOperator splittee : splittees) { + if (getPlan().getSuccessors(splittee) != null) { + mergedSuccessors.addAll(getPlan().getSuccessors(splittee)); } - succ_successors.addAll(getPlan().getSuccessors(successor)); } - if (sharedSucc) { - continue; + if (getPlan().getSuccessors(successor) != null) { + toMergeSuccessors.addAll(getPlan().getSuccessors(successor)); + } + mergedSuccessors.retainAll(toMergeSuccessors); + if (mergedSuccessors.isEmpty()) { // no shared edge after merge + splittees.add(successor); } - splittees.add(successor); } if (splittees.size()==0) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Tue Jan 27 02:27:45 2015 @@ -35,9 +35,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; +import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.ReverseDependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; @@ -97,11 +97,28 @@ public class UnionOptimizer extends TezO // Union followed by Split followed by Store could have multiple stores List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class); TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()]; + List<TezOperator> succs = tezPlan.getSuccessors(unionOp); + // Create a copy as disconnect while iterating modifies the original list + List<TezOperator> successors = succs == null ? null : new ArrayList<TezOperator>(succs); + for (int i=0; i < storeVertexGroupOps.length; i++) { - storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); - storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i))); - storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers()); - tezPlan.add(storeVertexGroupOps[i]); + TezOperator existingVertexGroup = null; + if (successors != null) { + for (TezOperator succ : successors) { + if (succ.isVertexGroup() && succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile())) { + existingVertexGroup = succ; + } + } + } + if (existingVertexGroup != null) { + storeVertexGroupOps[i] = existingVertexGroup; + } else { + storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); + storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i))); + storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile()); + storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers()); + tezPlan.add(storeVertexGroupOps[i]); + } } // Case of split, orderby, skewed join, rank, etc will have multiple outputs @@ -182,7 +199,6 @@ public class UnionOptimizer extends TezO tezPlan.disconnect(pred, unionOp); } - List<TezOperator> successors = tezPlan.getSuccessors(unionOp); List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>(); for (TezOutput tezOutput : unionOutputs) { if (tezOutput instanceof POValueOutputTez) { @@ -243,9 +259,6 @@ public class UnionOptimizer extends TezO throw new VisitorException(e); } - List<TezOperator> succs = tezPlan.getSuccessors(unionOp); - // Create a copy as disconnect while iterating modifies the original list - List<TezOperator> successors = succs == null ? null : new ArrayList<TezOperator>(succs); if (successors != null) { // Successor inputs should now point to the vertex groups. for (TezOperator succ : successors) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java Tue Jan 27 02:27:45 2015 @@ -21,15 +21,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.runtime.api.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; [email protected] [email protected] public class ObjectCache { private static final Log LOG = LogFactory.getLog(ObjectCache.class); - private final ObjectRegistry registry = new ObjectRegistryImpl(); private static ObjectCache cache = new ObjectCache(); + private ObjectRegistry registry; + private ObjectCache() { } @@ -37,11 +37,34 @@ public class ObjectCache { return cache; } + /** + * Returns the tez ObjectRegistry which allows caching of objects at the + * Session, DAG and Vertex level on container reuse for better performance + * and savings + */ + public ObjectRegistry getObjectRegistry() { + return registry; + } + + /** + * For internal use only. This method to be called only by PigProcessor + */ + @InterfaceAudience.Private + void setObjectRegistry(ObjectRegistry registry) { + this.registry = registry; + } + + /** + * Convenience method to cache objects in ObjectRegistry for a vertex + */ public void cache(String key, Object value) { LOG.info("Adding " + key + " to cache"); registry.cacheForVertex(key, value); } + /** + * Convenience method to retrieve objects cached for the vertex from ObjectRegistry + */ public Object retrieve(String key) { Object o = registry.get(key); if (o != null) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Tue Jan 27 02:27:45 2015 @@ -103,6 +103,7 @@ public class PigProcessor extends Abstra public PigProcessor(ProcessorContext context) { super(context); + ObjectCache.getInstance().setObjectRegistry(context.getObjectRegistry()); } @SuppressWarnings("unchecked") Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Tue Jan 27 02:27:45 2015 @@ -37,7 +37,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner; +import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Tue Jan 27 02:27:45 2015 @@ -472,28 +472,38 @@ public class MapRedUtil { result.add(combinedSplits); resultLengths.add(split.getLength()); } else { - ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++); String[] locations = split.getLocations(); - // sort the locations to stabilize the number of maps: PIG-1757 - Arrays.sort(locations); - HashSet<String> locationSeen = new HashSet<String>(); - for (String location : locations) - { - if (!locationSeen.contains(location)) + if (locations.length == 0) { + // This split is missing blocks, or the split returned bad locations. + // Don't try to combine. + comparableSplitId++; + ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>(); + combinedSplits.add(split); + result.add(combinedSplits); + resultLengths.add(split.getLength()); + } else { + ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++); + // sort the locations to stabilize the number of maps: PIG-1757 + Arrays.sort(locations); + HashSet<String> locationSeen = new HashSet<String>(); + for (String location : locations) { - Node node = nodeMap.get(location); - if (node == null) { - node = new Node(); - nodes.add(node); - nodeMap.put(location, node); + if (!locationSeen.contains(location)) + { + Node node = nodeMap.get(location); + if (node == null) { + node = new Node(); + nodes.add(node); + nodeMap.put(location, node); + } + node.add(csplit); + csplit.add(node); + locationSeen.add(location); } - node.add(csplit); - csplit.add(node); - locationSeen.add(location); } + lastSplit = split; + size++; } - lastSplit = split; - size++; } } /* verification code: debug purpose Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Jan 27 02:27:45 2015 @@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -178,6 +179,8 @@ public class HBaseStorage extends LoadFu private final long minTimestamp_; private final long maxTimestamp_; private final long timestamp_; + private boolean includeTimestamp_; + private boolean includeTombstone_; protected transient byte[] gt_; protected transient byte[] gte_; @@ -211,6 +214,8 @@ public class HBaseStorage extends LoadFu validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value"); validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value"); validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value"); + validOptions_.addOption("includeTimestamp", false, "Record will include the timestamp after the rowkey on store (rowkey, timestamp, ...)"); + validOptions_.addOption("includeTombstone", false, "Record will include a tombstone marker on store after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, ...)"); } /** @@ -254,6 +259,8 @@ public class HBaseStorage extends LoadFu * <li>-minTimestamp= Scan's timestamp for min timeRange * <li>-maxTimestamp= Scan's timestamp for max timeRange * <li>-timestamp= Scan's specified timestamp + * <li>-includeTimestamp= Record will include the timestamp after the rowkey on store (rowkey, timestamp, ...) + * <li>-includeTombstone= Record will include a tombstone marker on store after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, ...) * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) Utf8StorageConverter is the default * To be used with extreme caution, since this could result in data loss * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal). @@ -268,7 +275,7 @@ public class HBaseStorage extends LoadFu configuredOptions_ = parser_.parse(validOptions_, optsArr); } catch (ParseException e) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", validOptions_ ); + formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", validOptions_ ); throw e; } @@ -343,6 +350,22 @@ public class HBaseStorage extends LoadFu timestamp_ = 0; } + includeTimestamp_ = false; + if (configuredOptions_.hasOption("includeTimestamp")) { + String value = configuredOptions_.getOptionValue("includeTimestamp"); + if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {//the empty string and null check is for backward compat. + includeTimestamp_ = true; + } + } + + includeTombstone_ = false; + if (configuredOptions_.hasOption("includeTombstone")) { + String value = configuredOptions_.getOptionValue("includeTombstone"); + if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) { + includeTombstone_ = true; + } + } + initScan(); } @@ -930,7 +953,41 @@ public class HBaseStorage extends LoadFu public void putNext(Tuple t) throws IOException { ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields(); byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType(); - long ts=System.currentTimeMillis(); + long ts; + + int startIndex=1; + if (includeTimestamp_) { + byte timestampType = (fieldSchemas == null) ? DataType.findType(t.get(startIndex)) : fieldSchemas[startIndex].getType(); + LoadStoreCaster caster = (LoadStoreCaster) caster_; + + switch (timestampType) { + case DataType.BYTEARRAY: ts = caster.bytesToLong(((DataByteArray)t.get(startIndex)).get()); break; + case DataType.LONG: ts = ((Long)t.get(startIndex)).longValue(); break; + case DataType.DATETIME: ts = ((DateTime)t.get(startIndex)).getMillis(); break; + default: throw new IOException("Unable to find a converter for timestamp field " + t.get(startIndex)); + } + + startIndex++; + } else { + ts = System.currentTimeMillis(); + } + + // check for deletes + if (includeTombstone_) { + if (((Boolean)t.get(startIndex)).booleanValue()) { + Delete delete = createDelete(t.get(0), type, ts); + try { + // this is a delete so there will be + // no put and we are done here + writer.write(null, delete); + return; + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + startIndex++; + } Put put = createPut(t.get(0), type); @@ -941,8 +998,8 @@ public class HBaseStorage extends LoadFu } } - for (int i=1;i<t.size();++i){ - ColumnInfo columnInfo = columnInfo_.get(i-1); + for (int i=startIndex;i<t.size();++i){ + ColumnInfo columnInfo = columnInfo_.get(i-startIndex); if (LOG.isDebugEnabled()) { LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) + ", cf:column=" + columnInfo); @@ -979,6 +1036,25 @@ public class HBaseStorage extends LoadFu } /** + * Public method to initialize a Delete. + * + * @param key + * @param type + * @param timestamp + * @return new delete + * @throws IOException + */ + public Delete createDelete(Object key, byte type, long timestamp) throws IOException { + Delete delete = new Delete(objToBytes(key, type), timestamp); + + if(noWAL_) { + delete.setWriteToWAL(false); + } + + return delete; + } + + /** * Public method to initialize a Put. Used to allow assertions of how Puts * are initialized by unit tests. * Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Tue Jan 27 02:27:45 2015 @@ -54,10 +54,8 @@ import org.apache.pig.data.DataByteArray import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; -import org.apache.pig.parser.ParserException; /** * A loader for data stored using {@link JsonStorage}. This is not a generic @@ -171,7 +169,7 @@ public class JsonLoader extends LoadFunc return t; } - } catch (JsonParseException jpe) { + } catch (Exception jpe) { warn("Bad record, returning null for " + val, PigWarning.UDF_WARNING_1); } finally { p.close(); @@ -180,6 +178,52 @@ public class JsonLoader extends LoadFunc return t; } + private Object readPrimitive(JsonParser p, JsonToken tok, ResourceFieldSchema field) throws IOException { + + if (tok == JsonToken.VALUE_NULL) return null; + + switch(field.getType()) { + // Read based on our expected type + case DataType.BOOLEAN: + return p.getBooleanValue(); + + case DataType.INTEGER: + return p.getIntValue(); + + case DataType.LONG: + return p.getLongValue(); + + case DataType.FLOAT: + return p.getFloatValue(); + + case DataType.DOUBLE: + return p.getDoubleValue(); + + case DataType.DATETIME: + DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser(); + return formatter.withOffsetParsed().parseDateTime(p.getText()); + + case DataType.BYTEARRAY: + byte[] b = p.getText().getBytes(); + // Use the DBA constructor that copies the bytes so that we own + // the memory + return new DataByteArray(b, 0, b.length); + + case DataType.CHARARRAY: + return p.getText(); + + case DataType.BIGINTEGER: + return p.getBigIntegerValue(); + + case DataType.BIGDECIMAL: + return new BigDecimal(p.getText()); + + default: + throw new IOException("Unknown type in input schema: " + + field.getType() ); + } + } + private Object readField(JsonParser p, ResourceFieldSchema field, int fieldnum) throws IOException { @@ -193,67 +237,14 @@ public class JsonLoader extends LoadFunc // Check to see if this value was null if (tok == JsonToken.VALUE_NULL) return null; + + tok = p.nextToken(); // Read based on our expected type switch (field.getType()) { - case DataType.BOOLEAN: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getBooleanValue(); - - case DataType.INTEGER: - // Read the field name - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getIntValue(); - - case DataType.LONG: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getLongValue(); - - case DataType.FLOAT: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getFloatValue(); - - case DataType.DOUBLE: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getDoubleValue(); - - case DataType.DATETIME: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser(); - return formatter.withOffsetParsed().parseDateTime(p.getText()); - - case DataType.BYTEARRAY: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - byte[] b = p.getText().getBytes(); - // Use the DBA constructor that copies the bytes so that we own - // the memory - return new DataByteArray(b, 0, b.length); - - case DataType.CHARARRAY: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getText(); - - case DataType.BIGINTEGER: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return p.getBigIntegerValue(); - - case DataType.BIGDECIMAL: - tok = p.nextToken(); - if (tok == JsonToken.VALUE_NULL) return null; - return new BigDecimal(p.getText()); - case DataType.MAP: // Should be a start of the map object - if (p.nextToken() != JsonToken.START_OBJECT) { + if (tok != JsonToken.START_OBJECT) { warn("Bad map field, could not find start of object, field " + fieldnum, PigWarning.UDF_WARNING_1); return null; @@ -267,7 +258,7 @@ public class JsonLoader extends LoadFunc return m; case DataType.TUPLE: - if (p.nextToken() != JsonToken.START_OBJECT) { + if (tok != JsonToken.START_OBJECT) { warn("Bad tuple field, could not find start of object, " + "field " + fieldnum, PigWarning.UDF_WARNING_1); return null; @@ -289,7 +280,7 @@ public class JsonLoader extends LoadFunc return t; case DataType.BAG: - if (p.nextToken() != JsonToken.START_ARRAY) { + if (tok != JsonToken.START_ARRAY) { warn("Bad bag field, could not find start of array, " + "field " + fieldnum, PigWarning.UDF_WARNING_1); return null; @@ -305,28 +296,29 @@ public class JsonLoader extends LoadFunc JsonToken innerTok; while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) { - if (innerTok != JsonToken.START_OBJECT) { - warn("Bad bag tuple field, could not find start of " - + "object, field " + fieldnum, PigWarning.UDF_WARNING_1); - return null; - } - t = tupleFactory.newTuple(fs.length); - for (int j = 0; j < fs.length; j++) { - t.set(j, readField(p, fs[j], j)); + if (innerTok == JsonToken.START_OBJECT) { + for (int j = 0; j < fs.length; j++) { + t.set(j, readField(p, fs[j], j)); + } + + if (p.nextToken() != JsonToken.END_OBJECT) { + warn("Bad bag tuple field, could not find end of " + + "object, field " + fieldnum, PigWarning.UDF_WARNING_1); + return null; + } + bag.add(t); + } else { + + // handle array of kind [ primitive, primitive ... ] + t.set(0, readPrimitive(p, innerTok, fs[0])); + bag.add(t); } - - if (p.nextToken() != JsonToken.END_OBJECT) { - warn("Bad bag tuple field, could not find end of " - + "object, field " + fieldnum, PigWarning.UDF_WARNING_1); - return null; - } - bag.add(t); } return bag; + default: - throw new IOException("Unknown type in input schema: " + - field.getType()); + return readPrimitive(p, tok, field); } } Modified: pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java Tue Jan 27 02:27:45 2015 @@ -47,6 +47,10 @@ public class RollupDimensions extends Ev private static BagFactory bf = BagFactory.getInstance(); private static TupleFactory tf = TupleFactory.getInstance(); private final String allMarker; + // the pivot position + private int pivot = -1; + // to check if rollup is optimized or not + private boolean rollupHIIoptimizable = false; public RollupDimensions() { this(null); @@ -57,6 +61,18 @@ public class RollupDimensions extends Ev this.allMarker = allMarker; } + public void setRollupHIIOptimizable(boolean check) { + this.rollupHIIoptimizable = check; + } + + public boolean getRollupHIIOptimizable() { + return this.rollupHIIoptimizable; + } + + public void setPivot(int pvt) throws IOException { + this.pivot = pvt; + } + @Override public DataBag exec(Tuple tuple) throws IOException { List<Tuple> result = Lists.newArrayListWithCapacity(tuple.size() + 1); @@ -66,12 +82,32 @@ public class RollupDimensions extends Ev return bf.newDefaultBag(result); } - private void iterativelyRollup(List<Tuple> result, Tuple input) throws ExecException { - Tuple tempTup = tf.newTuple(input.getAll()); - for (int i = input.size() - 1; i >= 0; i--) { - tempTup.set(i, allMarker); - result.add(tf.newTuple(tempTup.getAll())); - } + private void iterativelyRollup(List<Tuple> result, Tuple input) + throws IOException { + + Tuple tempTup = tf.newTuple(input.getAll()); + + //if (this.rollupHIIoptimizable != null) { // rule is enabled + if (this.rollupHIIoptimizable == true) { + if (this.pivot == -1) // user did not specify the pivot position + // --> IRG approach + return; + else { // user did specify the pivot position --> IRG + IRG + if (this.pivot == 0) // we use the IRG approach + return; + else { // we use IRG+IRG approach + for (int i = this.pivot - 1; i < input.size(); i++) + tempTup.set(i, allMarker); + result.add(tf.newTuple(tempTup.getAll())); + } + } + } + else { // we can not optimize --> Vanilla approach + for (int i = input.size() - 1; i >= 0; i--) { + tempTup.set(i, allMarker); + result.add(tf.newTuple(tempTup.getAll())); + } + } } @Override Modified: pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java Tue Jan 27 02:27:45 2015 @@ -196,6 +196,7 @@ public class Utf8StorageConverter implem private Map<String, Object> consumeMap(PushbackInputStream in, ResourceFieldSchema fieldSchema) throws IOException { int buf; + boolean emptyMap = true; while ((buf=in.read())!='[') { if (buf==-1) { @@ -207,9 +208,14 @@ public class Utf8StorageConverter implem while (true) { // Read key (assume key can not contains special character such as #, (, [, {, }, ], ) while ((buf=in.read())!='#') { + // end of map + if (emptyMap && buf==']') { + return m; + } if (buf==-1) { throw new IOException("Unexpect end of map"); } + emptyMap = false; mOut.write(buf); } String key = bytesToCharArray(mOut.toByteArray()); Modified: pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java Tue Jan 27 02:27:45 2015 @@ -459,7 +459,7 @@ private MockRecordWriter mockRecordWrite @Override public void putNext(Tuple t) throws IOException { - mockRecordWriter.dataBeingWritten.add(t); + mockRecordWriter.dataBeingWritten.add(TF.newTuple(t.getAll())); } @Override @@ -648,6 +648,10 @@ private MockRecordWriter mockRecordWrite @Override public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException { + if (arg0.getConfiguration().get("mapreduce.output.basename")!=null) { + return new MockRecordWriter(arg0.getConfiguration().get("mapreduce.output.basename") + "-" + + arg0.getTaskAttemptID().getTaskID().getId()); + } return new MockRecordWriter(getUniqueFile(arg0, "part", ".mock")); } Modified: pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Tue Jan 27 02:27:45 2015 @@ -19,6 +19,7 @@ package org.apache.pig.impl.plan; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -37,18 +38,31 @@ import org.apache.pig.impl.util.Utils; public class DependencyOrderWalker <O extends Operator, P extends OperatorPlan<O>> extends PlanWalker<O, P> { + private final boolean walkLeavesInOrder; + /** * @param plan Plan for this walker to traverse. */ public DependencyOrderWalker(P plan) { + this(plan, false); + } + + /** + * @param plan Plan for this walker to traverse. + * @param boolean walkLeavesInOrder Sort the leaves before walking + */ + public DependencyOrderWalker(P plan, boolean walkLeavesInOrder) { super(plan); + this.walkLeavesInOrder = walkLeavesInOrder; } + /** * Begin traversing the graph. * @param visitor Visitor this walker is being used by. * @throws VisitorException if an error is encountered while walking. */ + @Override @SuppressWarnings("unchecked") public void walk(PlanVisitor<O, P> visitor) throws VisitorException { // This is highly inefficient, but our graphs are small so it should be okay. @@ -63,6 +77,9 @@ public class DependencyOrderWalker <O ex Set<O> seen = new HashSet<O>(); List<O> leaves = mPlan.getLeaves(); if (leaves == null) return; + if (walkLeavesInOrder) { + Collections.sort(leaves); + } for (O op : leaves) { doAllPredecessors(op, seen, fifo); } @@ -71,8 +88,9 @@ public class DependencyOrderWalker <O ex } } - public PlanWalker<O, P> spawnChildWalker(P plan) { - return new DependencyOrderWalker<O, P>(plan); + @Override + public PlanWalker<O, P> spawnChildWalker(P plan) { + return new DependencyOrderWalker<O, P>(plan, walkLeavesInOrder); } protected void doAllPredecessors(O node, Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java Tue Jan 27 02:27:45 2015 @@ -219,21 +219,32 @@ public class AvroStorageSchemaConversion ResourceSchema mapSchema = new ResourceSchema(); ResourceSchema.ResourceFieldSchema[] mapSchemaFields = new ResourceSchema.ResourceFieldSchema[1]; - if (mapAvroSchema.getType() == Type.RECORD) { - ResourceSchema innerResourceSchema = - avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack, - alreadyDefinedSchemas, allowRecursiveSchema); + switch(mapAvroSchema.getType()) { + case RECORD: + ResourceSchema innerResourceSchemaRecord = + avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack, + alreadyDefinedSchemas, allowRecursiveSchema); mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema(); mapSchemaFields[0].setType(DataType.TUPLE); mapSchemaFields[0].setName(mapAvroSchema.getName()); - mapSchemaFields[0].setSchema(innerResourceSchema); + mapSchemaFields[0].setSchema(innerResourceSchemaRecord); mapSchemaFields[0].setDescription(fieldSchema.getDoc()); - } else { + mapSchema.setFields(mapSchemaFields); + rf.setSchema(mapSchema); + break; + case MAP: + case ARRAY: + ResourceSchema innerResourceSchema = + avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack, + alreadyDefinedSchemas, allowRecursiveSchema); + rf.setSchema(innerResourceSchema); + break; + default: mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema(); mapSchemaFields[0].setType(getPigType(mapAvroSchema)); + mapSchema.setFields(mapSchemaFields); + rf.setSchema(mapSchema); } - mapSchema.setFields(mapSchemaFields); - rf.setSchema(mapSchema); } break; case DataType.TUPLE: Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java Tue Jan 27 02:27:45 2015 @@ -206,7 +206,7 @@ public class DereferenceExpression exten throw new FrontendException("Index "+rawColumn + " out of range in schema:" + schema.toString(false), 1127); } columns.add( (Integer)rawColumn ); - } else { + } else if (schema!=null) { int pos = schema.getFieldPosition((String)rawColumn); if( pos != -1) { columns.add( pos ); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Jan 27 02:27:45 2015 @@ -60,6 +60,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.builtin.RollupDimensions; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -78,13 +79,13 @@ public class ExpToPhyTranslationVisitor // This value points to the current LogicalRelationalOperator we are working on protected LogicalRelationalOperator currentOp; - - public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, + + public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException { this(plan, new DependencyOrderWalker(plan), op, phyPlan, map); } - - public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, + + public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException { super(plan, walker); currentOp = op; @@ -92,7 +93,7 @@ public class ExpToPhyTranslationVisitor currentPlan = phyPlan; currentPlans = new LinkedList<PhysicalPlan>(); } - + protected Map<Operator, PhysicalOperator> logToPhyMap; protected Deque<PhysicalPlan> currentPlans; @@ -102,7 +103,7 @@ public class ExpToPhyTranslationVisitor protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator(); protected PigContext pc; - + public void setPigContext(PigContext pc) { this.pc = pc; } @@ -110,13 +111,13 @@ public class ExpToPhyTranslationVisitor public PhysicalPlan getPhysicalPlan() { return currentPlan; } - - private void attachBinaryComparisonOperator( BinaryExpression op, + + private void attachBinaryComparisonOperator( BinaryExpression op, BinaryComparisonOperator exprOp ) throws FrontendException { // We dont have aliases in ExpressionOperators // exprOp.setAlias(op.getAlias()); - - + + exprOp.setOperandType(op.getLhs().getType()); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs())); @@ -140,13 +141,13 @@ public class ExpToPhyTranslationVisitor } } } - - private void attachBinaryExpressionOperator( BinaryExpression op, + + private void attachBinaryExpressionOperator( BinaryExpression op, BinaryExpressionOperator exprOp ) throws FrontendException { // We dont have aliases in ExpressionOperators // exprOp.setAlias(op.getAlias()); - - + + exprOp.setResultType(op.getLhs().getType()); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs())); @@ -173,81 +174,81 @@ public class ExpToPhyTranslationVisitor @Override public void visit( AndExpression op ) throws FrontendException { - + // System.err.println("Entering And"); BinaryComparisonOperator exprOp = new POAnd(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( OrExpression op ) throws FrontendException { - + // System.err.println("Entering Or"); BinaryComparisonOperator exprOp = new POOr(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( EqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( NotEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new NotEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( GreaterThanExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( GreaterThanEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new GTOrEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit( LessThanExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - - + + @Override public void visit( LessThanEqualExpression op ) throws FrontendException { - + BinaryComparisonOperator exprOp = new LTOrEqualToExpr(new OperatorKey( DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + attachBinaryComparisonOperator(op, exprOp); } - + @Override public void visit(ProjectExpression op) throws FrontendException { POProject exprOp; - + if(op.getAttachedRelationalOp() instanceof LOGenerate && op.getPlan().getSuccessors(op)==null && !(op.findReferent() instanceof LOInnerLoad)) { exprOp = new PORelationToExprProject(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -256,7 +257,7 @@ public class ExpToPhyTranslationVisitor exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); } - + if (op.getFieldSchema()==null && op.isRangeOrStarProject()) exprOp.setResultType(DataType.TUPLE); else @@ -278,9 +279,9 @@ public class ExpToPhyTranslationVisitor // TODO implement this // exprOp.setOverloaded(op.getOverloaded()); logToPhyMap.put(op, exprOp); - currentPlan.add(exprOp); + currentPlan.add(exprOp); } - + @Override public void visit( MapLookupExpression op ) throws FrontendException { ExpressionOperator physOp = new POMapLookUp(new OperatorKey(DEFAULT_SCOPE, @@ -302,10 +303,10 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit(org.apache.pig.newplan.logical.expression.ConstantExpression op) throws FrontendException { - + // System.err.println("Entering Constant"); ConstantExpression ce = new ConstantExpression(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); @@ -318,7 +319,7 @@ public class ExpToPhyTranslationVisitor logToPhyMap.put(op, ce); // System.err.println("Exiting Constant"); } - + @Override public void visit( CastExpression op ) throws FrontendException { POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -351,10 +352,10 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit( NotExpression op ) throws FrontendException { - + PONot pNot = new PONot(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); // physOp.setAlias(op.getAlias()); @@ -374,7 +375,7 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override public void visit( IsNullExpression op ) throws FrontendException { POIsNull pIsNull = new POIsNull(new OperatorKey(DEFAULT_SCOPE, nodeGen @@ -408,7 +409,7 @@ public class ExpToPhyTranslationVisitor ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op .getExpression()); pNegative.setExpr(from); - pNegative.setResultType(op.getType()); + pNegative.setResultType(op.getType()); try { currentPlan.connect(from, pNegative); } catch (PlanException e) { @@ -417,60 +418,60 @@ public class ExpToPhyTranslationVisitor throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e); } } - + @Override - public void visit( AddExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Add(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( AddExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Add(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( RegexExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new PORegexp(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( RegexExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new PORegexp(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); - + List<Operator> successors = op.getPlan().getSuccessors(op); if (successors.get(1) instanceof org.apache.pig.newplan.logical.expression.ConstantExpression) { ((PORegexp)exprOp).setConstExpr(true); } } - + @Override - public void visit( SubtractExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( SubtractExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( MultiplyExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( MultiplyExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( DivideExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Divide(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( DivideExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Divide(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override - public void visit( ModExpression op ) throws FrontendException { - BinaryExpressionOperator exprOp = new Mod(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); - + public void visit( ModExpression op ) throws FrontendException { + BinaryExpressionOperator exprOp = new Mod(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE))); + attachBinaryExpressionOperator(op, exprOp); } - + @Override public void visit( BinCondExpression op ) throws FrontendException { - + POBinCond exprOp = new POBinCond( new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)) ); - + exprOp.setResultType(op.getType()); exprOp.setCond((ExpressionOperator) logToPhyMap.get(op.getCondition())); exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs())); @@ -495,17 +496,34 @@ public class ExpToPhyTranslationVisitor } } } - + @SuppressWarnings("unchecked") @Override - public void visit( UserFuncExpression op ) throws FrontendException { + public void visit( UserFuncExpression op ) throws FrontendException { Object f = PigContext.instantiateFuncFromSpec(op.getFuncSpec()); PhysicalOperator p; + String ROLLUP_UDF = RollupDimensions.class.getName(); if (f instanceof EvalFunc) { p = new POUserFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, null, op.getFuncSpec(), (EvalFunc) f); ((POUserFunc)p).setSignature(op.getSignature()); + if( op.getFuncSpec().toString().equals(ROLLUP_UDF)) { + //Set the pivot value + ((POUserFunc)p).setPivot(op.getPivot()); + if(op.getRollupHIIOptimizable()!=false) { + ((POUserFunc)p).setRollupHIIOptimizable(true); + //Set value for RollupHIIOptimizable and pivot of RollupDimension + EvalFunc<?> tmp = ((POUserFunc)p).getFunc(); + ((RollupDimensions)tmp).setRollupHIIOptimizable(true); + try { + ((RollupDimensions)tmp).setPivot(op.getPivot()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } //reinitialize input schema from signature if (((POUserFunc)p).getFunc().getInputSchema() == null) { ((POUserFunc)p).setFuncInputSchema(op.getSignature()); @@ -535,7 +553,7 @@ public class ExpToPhyTranslationVisitor } } logToPhyMap.put(op, p); - + //We need to track all the scalars if( op instanceof ScalarExpression ) { Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator(); @@ -543,20 +561,20 @@ public class ExpToPhyTranslationVisitor } } - + @Override public void visit( DereferenceExpression op ) throws FrontendException { POProject exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE))); exprOp.setResultType(op.getType()); - exprOp.setColumns((ArrayList<Integer>)op.getBagColumns()); + exprOp.setColumns((ArrayList<Integer>)op.getBagColumns()); exprOp.setStar(false); logToPhyMap.put(op, exprOp); currentPlan.add(exprOp); - + PhysicalOperator from = logToPhyMap.get( op.getReferredExpression() ); - + if( from != null ) { currentPlan.connect(from, exprOp); } Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Tue Jan 27 02:27:45 2015 @@ -57,6 +57,26 @@ public class UserFuncExpression extends private static int sigSeq=0; private boolean viaDefine=false; //this represents whether the function was instantiate via a DEFINE statement or not + private boolean rollupHIIoptimizable = false; + //the pivot value + private int pivot = -1; + + public void setPivot(int pvt) { + this.pivot = pvt; + } + + public int getPivot() { + return this.pivot; + } + + public void setRollupHIIOptimizable(boolean check) { + this.rollupHIIoptimizable = check; + } + + public boolean getRollupHIIOptimizable() { + return this.rollupHIIoptimizable; + } + public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) { super("UserFunc", plan); mFuncSpec = funcSpec; @@ -66,7 +86,6 @@ public class UserFuncExpression extends } } - public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, List<LogicalExpression> args) { this( plan, funcSpec ); Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1654955&r1=1654954&r2=1654955&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Tue Jan 27 02:27:45 2015 @@ -44,6 +44,7 @@ import org.apache.pig.newplan.logical.ru import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer; import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten; import org.apache.pig.newplan.logical.rules.PushUpFilter; +import org.apache.pig.newplan.logical.rules.RollupHIIOptimizer; import org.apache.pig.newplan.logical.rules.SplitFilter; import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter; import org.apache.pig.newplan.optimizer.PlanOptimizer; @@ -56,6 +57,7 @@ public class LogicalPlanOptimizer extend private boolean allRulesDisabled = false; private SetMultimap<RulesReportKey, String> rulesReport = TreeMultimap.create(); private PigContext pc = null; + private static final String MAPREDUCE_FW = "MAPREDUCE"; public LogicalPlanOptimizer(OperatorPlan p, int iterations, Set<String> turnOffRules) { this(p, iterations, turnOffRules, null); @@ -203,6 +205,20 @@ public class LogicalPlanOptimizer extend if (!s.isEmpty()) ls.add(s); + // RollupHIIOptimizer Set + // This set of rules for rollup hii + // If pig is not running in MR mode, this rule will be disabled + if (pc!=null) + if (pc.getExecType().toString().equals(MAPREDUCE_FW)) { + s = new HashSet<Rule>(); + // Optimize RollupHII + r = new RollupHIIOptimizer("RollupHIIOptimizer"); + checkAndAddRule(s, r); + if (!s.isEmpty()) + ls.add(s); + } else { + LOG.info("Not MR mode. RollupHIIOptimizer is disabled"); + } return ls; }
