Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Feb 24 08:19:42 2017 @@ -44,6 +44,9 @@ public class POPoissonSample extends Phy private transient boolean initialized; + // num of rows skipped so far + private transient int numSkipped; + // num of rows sampled so far private transient int numRowsSampled; @@ -89,6 +92,7 @@ public class POPoissonSample extends Phy @Override public Result getNextTuple() throws ExecException { if (!initialized) { + numSkipped = 0; numRowsSampled = 0; avgTupleMemSz = 0; rowNum = 0; @@ -134,7 +138,7 @@ public class POPoissonSample extends Phy } // skip tuples - for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) { + while (numSkipped < skipInterval) { res = processInput(); if (res.returnStatus == POStatus.STATUS_NULL) { continue; @@ -148,6 +152,7 @@ public class POPoissonSample extends Phy return res; } rowNum++; + numSkipped++; } // skipped enough, get new sample @@ -173,6 +178,8 @@ public class POPoissonSample extends Phy rowNum++; newSample = res; + // reset skipped + numSkipped = 0; return currentSample; } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Feb 24 08:19:42 2017 @@ -125,7 +125,7 @@ public class POReservoirSample extends P } // collect samples until input is exhausted - int rand = randGen.nextInt(rowProcessed); + int rand = randGen.nextInt(rowProcessed + 1); if (rand < numSamples) { samples[rand] = res; } @@ -133,8 +133,13 @@ public class POReservoirSample extends P } } - if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) { - sampleCollectionDone = true; + if (res.returnStatus == POStatus.STATUS_EOP) { + if (this.parentPlan.endOfAllInput) { + sampleCollectionDone = true; + } else { + // In case of Split can get EOP in between. + return res; + } } return getSample(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Feb 24 08:19:42 2017 @@ -51,13 +51,13 @@ public class Packager implements Illustr protected DataBag[] bags; public static enum PackageType { - GROUP, JOIN + GROUP, JOIN, BLOOMJOIN }; protected transient Illustrator illustrator = null; // The key being worked on - Object key; + protected Object key; // marker to indicate if key is a tuple protected boolean isKeyTuple = false; @@ -65,7 +65,7 @@ public class Packager implements Illustr protected boolean isKeyCompound = false; // key's type - byte keyType; + protected byte keyType; // The number of inputs to this // co-group. 0 indicates a distinct, which means there will only be a Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Fri Feb 24 08:19:42 2017 @@ -60,7 +60,7 @@ public class StoreFuncDecorator { private boolean allowErrors() { return UDFContext.getUDFContext().getJobConf() - .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false); + .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb 24 08:19:42 2017 @@ -19,13 +19,14 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -56,6 +58,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator; @@ -87,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; @@ -108,7 +110,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; -import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -174,6 +175,7 @@ public class TezDagBuilder extends TezOp private PigContext pc; private Configuration globalConf; private Configuration pigContextConf; + private Configuration shuffleVertexManagerBaseConf; private FileSystem fs; private long intermediateTaskInputSize; private Set<String> inputSplitInDiskVertices; @@ -191,6 +193,8 @@ public class TezDagBuilder extends TezOp private String mapTaskLaunchCmdOpts; private String reduceTaskLaunchCmdOpts; + private boolean disableDAGRecovery = false; + public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); @@ -210,6 +214,10 @@ public class TezDagBuilder extends TezOp } } + public boolean shouldDisableDAGRecovery() { + return disableDAGRecovery; + } + private void initialize(PigContext pc) throws IOException { this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); @@ -217,6 +225,16 @@ public class TezDagBuilder extends TezOp this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); MRToTezHelper.processMRSettings(pigContextConf, globalConf); + shuffleVertexManagerBaseConf = new Configuration(false); + // Only copy tez.shuffle-vertex-manager config to keep payload size small + Iterator<Entry<String, String>> iter = pigContextConf.iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) { + shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue()); + } + } + // Add credentials from binary token file and get tokens for namenodes // specified in mapreduce.job.hdfs-servers SecurityHelper.populateTokenCache(globalConf, dag.getCredentials()); @@ -265,7 +283,7 @@ public class TezDagBuilder extends TezOp if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) { // If tez setting is not defined MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true); - MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true); + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false); } if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) { @@ -279,7 +297,7 @@ public class TezDagBuilder extends TezOp try { fs = FileSystem.get(globalConf); - intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc)); + intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc)); } catch (Exception e) { log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); intermediateTaskInputSize = 134217728L; @@ -397,7 +415,11 @@ public class TezDagBuilder extends TezOp tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup); POStore store = tezOp.getVertexGroupInfo().getStore(); if (store != null) { - vertexGroup.addDataSink(store.getOperatorKey().toString(), + String outputKey = store.getOperatorKey().toString(); + if (store instanceof POStoreTez) { + outputKey = ((POStoreTez) store).getOutputKey(); + } + vertexGroup.addDataSink(outputKey, DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(), OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials())); } @@ -441,7 +463,14 @@ public class TezDagBuilder extends TezOp Configuration conf = new Configuration(pigContextConf); - if (!combinePlan.isEmpty()) { + if (edge.needsDistinctCombiner()) { + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, + MRCombiner.class.getName()); + conf.set(MRJobConfig.COMBINE_CLASS_ATTR, + DistinctCombiner.Combine.class.getName()); + log.info("Setting distinct combiner class between " + + from.getOperatorKey() + " and " + to.getOperatorKey()); + } else if (!combinePlan.isEmpty()) { udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC); addCombiner(combinePlan, to, conf, isMergedInput); } @@ -450,7 +479,7 @@ public class TezDagBuilder extends TezOp POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { byte keyType = lr.getKeyType(); setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput); // In case of secondary key sort, main key type is the actual key type @@ -479,7 +508,8 @@ public class TezDagBuilder extends TezOp conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true); - conf.set("pig.pigContext", serializedPigContext); + conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); + conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); conf.set("udf.import.list", serializedUDFImportList); if(to.isGlobalSort() || to.isLimitAfterSort()){ @@ -510,26 +540,36 @@ public class TezDagBuilder extends TezOp UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); out.setUserPayload(payLoad); + in.setUserPayload(payLoad); + // Remove combiner and reset payload if (!combinePlan.isEmpty()) { boolean noCombineInReducer = false; + boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap(); String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER); - if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { + if (edge.getCombinerInReducer() != null) { + noCombineInReducer = !edge.getCombinerInReducer(); + } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan); } else { noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner); } - if (noCombineInReducer) { + if (noCombineInReducer || noCombineInMapper) { log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey()); conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); conf.unset(MRJobConfig.COMBINE_CLASS_ATTR); conf.unset("pig.combinePlan"); conf.unset("pig.combine.package"); conf.unset("pig.map.keytype"); - payLoad = TezUtils.createUserPayloadFromConf(conf); + UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf); + if (noCombineInMapper) { + out.setUserPayload(payLoadWithoutCombiner); + } + if (noCombineInReducer) { + in.setUserPayload(payLoadWithoutCombiner); + } } } - in.setUserPayload(payLoad); if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) { // Use custom edge @@ -593,6 +633,8 @@ public class TezDagBuilder extends TezOp setOutputFormat(job); payloadConf.set("udf.import.list", serializedUDFImportList); payloadConf.set("exectype", "TEZ"); + payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); + payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); // Process stores LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); @@ -611,11 +653,7 @@ public class TezDagBuilder extends TezOp payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists())); payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits())); inputPayLoad = new Configuration(payloadConf); - if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) { - inputPayLoad.set("pig.pigContext", serializedPigContext); - } } - payloadConf.set("pig.pigContext", serializedPigContext); if (tezOp.getSampleOperator() != null) { payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString()); @@ -689,7 +727,7 @@ public class TezDagBuilder extends TezOp PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { if (lr.isConnectedToPackage() - && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) { + && lr.containsOutputKey(tezOp.getOperatorKey().toString())) { localRearrangeMap.put((int) lr.getIndex(), inputKey); if (isVertexGroup) { isMergedInput = true; @@ -772,9 +810,25 @@ public class TezDagBuilder extends TezOp String vmPluginName = null; Configuration vmPluginConf = null; + boolean containScatterGather = false; + boolean containCustomPartitioner = false; + for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { + if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { + containScatterGather = true; + } + if (edge.partitionerClass != null) { + containCustomPartitioner = true; + } + } + + if(containScatterGather) { + vmPluginName = ShuffleVertexManager.class.getName(); + vmPluginConf = new Configuration(shuffleVertexManagerBaseConf); + } // Set the right VertexManagerPlugin if (tezOp.getEstimatedParallelism() != -1) { + boolean autoParallelism = false; if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { if (tezOp.getVertexParallelism()==-1 && ( tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1|| @@ -783,33 +837,12 @@ public class TezDagBuilder extends TezOp // to decrease/increase parallelism of sorting vertex dynamically // based on the numQuantiles calculated by sample aggregation vertex vmPluginName = PartitionerDefinedVertexManager.class.getName(); + autoParallelism = true; log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); } } else { - boolean containScatterGather = false; - boolean containCustomPartitioner = false; - for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { - if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { - containScatterGather = true; - } - if (edge.partitionerClass!=null) { - containCustomPartitioner = true; - } - } if (containScatterGather && !containCustomPartitioner) { - vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf; - // Use auto-parallelism feature of ShuffleVertexManager to dynamically - // reduce the parallelism of the vertex - if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) - && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) { - vmPluginName = PigGraceShuffleVertexManager.class.getName(); - tezOp.setUseGraceParallelism(true); - vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); - vmPluginConf.set("pig.pigContext", serializedPigContext); - } else { - vmPluginName = ShuffleVertexManager.class.getName(); - } - vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + // For Intermediate reduce, set the bytes per reducer to be block size. long bytesPerReducer = intermediateTaskInputSize; // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user. @@ -818,8 +851,8 @@ public class TezDagBuilder extends TezOp // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce // as map input sizes are mostly always high compared to map output. if (stores.size() > 0) { - if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { - bytesPerReducer = vmPluginConf.getLong( + if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { + bytesPerReducer = pigContextConf.getLong( InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); } else if (tezOp.isGroupBy()) { @@ -828,10 +861,28 @@ public class TezDagBuilder extends TezOp bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT; } } + + // Use auto-parallelism feature of ShuffleVertexManager to dynamically + // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager + // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on + if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) + && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty() + && tezOp.getCrossKeys() == null) { + vmPluginName = PigGraceShuffleVertexManager.class.getName(); + tezOp.setUseGraceParallelism(true); + vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); + vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext); + vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer); + } + vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer); + autoParallelism = true; log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); } } + if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) { + disableDAGRecovery = true; + } } if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())|| vmPluginName.equals(ShuffleVertexManager.class.getName()))) { @@ -1409,22 +1460,12 @@ public class TezDagBuilder extends TezOp private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if it is supported by - // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - Method method = clazz.getMethod("setOutputFormatClass", - org.apache.hadoop.mapreduce.Job.class, Class.class); - method.invoke(null, job, PigOutputFormatTez.class); - } catch (Exception e) { - job.setOutputFormatClass(PigOutputFormatTez.class); - log.warn(PigConfiguration.PIG_OUTPUT_LAZY - + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); - } + LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class); } else { job.setOutputFormatClass(PigOutputFormatTez.class); } } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Feb 24 08:19:42 2017 @@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.tez.client.TezClient; @@ -51,7 +56,7 @@ import com.google.common.collect.Maps; */ public class TezJob implements Runnable { private static final Log log = LogFactory.getLog(TezJob.class); - private Configuration conf; + private TezConfiguration conf; private EnumSet<StatusGetOpts> statusGetOpts; private Map<String, LocalResource> requestAMResources; private ApplicationId appId; @@ -69,31 +74,71 @@ public class TezJob implements Runnable public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources, - int estimatedTotalParallelism) throws IOException { + TezOperPlan tezPlan) throws IOException { this.conf = conf; this.dag = dag; this.requestAMResources = requestAMResources; this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true); this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - tezJobConf = new TezJobConfig(estimatedTotalParallelism); + tezJobConf = new TezJobConfig(tezPlan); } static class TezJobConfig { private int estimatedTotalParallelism = -1; + private int maxOutputsinSingleVertex; + private int totalVertices = 0; - public TezJobConfig(int estimatedTotalParallelism) { - this.estimatedTotalParallelism = estimatedTotalParallelism; + public TezJobConfig(TezOperPlan tezPlan) throws VisitorException { + this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism(); + MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan); + finder.visit(); + this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex(); + this.totalVertices = finder.getTotalVertices(); } public int getEstimatedTotalParallelism() { return estimatedTotalParallelism; } - public void setEstimatedTotalParallelism(int estimatedTotalParallelism) { - this.estimatedTotalParallelism = estimatedTotalParallelism; + public int getMaxOutputsinSingleVertex() { + return maxOutputsinSingleVertex; } + public int getTotalVertices() { + return totalVertices; + } + + } + + private static class MaxOutputsFinder extends TezOpPlanVisitor { + + private int maxOutputsinSingleVertex = 1; + private int totalVertices = 0; + + public MaxOutputsFinder(TezOperPlan plan) { + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + } + + public int getMaxOutputsinSingleVertex() { + return maxOutputsinSingleVertex; + } + + public int getTotalVertices() { + return totalVertices; + } + + @Override + public void visitTezOp(TezOperator tezOperator) throws VisitorException { + if (!tezOperator.isVertexGroup()) { + totalVertices++; + int outputs = tezOperator.outEdges.keySet().size(); + maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs; + } + } + + + } public DAG getDAG() { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Feb 24 08:19:42 2017 @@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.pig.PigException; +import org.apache.pig.backend.hadoop.PigATSClient; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; @@ -50,11 +52,12 @@ public class TezJobCompiler { private static final Log log = LogFactory.getLog(TezJobCompiler.class); private PigContext pigContext; - private TezConfiguration tezConf; + private Configuration conf; + private boolean disableDAGRecovery; public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException { this.pigContext = pigContext; - this.tezConf = new TezConfiguration(conf); + this.conf = conf; } public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources) @@ -64,6 +67,7 @@ public class TezJobCompiler { TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources); dagBuilder.visit(); dagBuilder.avoidContainerReuseIfInputSplitInDisk(); + disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery(); return tezDag; } @@ -85,6 +89,7 @@ public class TezJobCompiler { return job; } + @SuppressWarnings({ "rawtypes", "unchecked" }) private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer) throws JobCreationException { try { @@ -107,8 +112,34 @@ public class TezJobCompiler { } DAG tezDag = buildDAG(tezPlanNode, localResources); tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript())); + // set Tez caller context + // Reflection for the following code since it is only available since tez 0.8.1: + // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext), + // ATSService.EntityType, ""); + // tezDag.setCallerContext(context); + Class callerContextClass = null; + try { + callerContextClass = Class.forName("org.apache.tez.client.CallerContext"); + } catch (ClassNotFoundException e) { + // If pre-Tez 0.8.1, skip setting CallerContext + } + if (callerContextClass != null) { + Method builderBuildMethod = callerContextClass.getMethod("create", String.class, + String.class, String.class, String.class); + Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT, + PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, ""); + Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext", + context.getClass()); + dagSetCallerContext.invoke(tezDag, context); + } log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism()); - return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism()); + TezConfiguration tezConf = new TezConfiguration(conf); + if (disableDAGRecovery + && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { + tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + } + return new TezJob(tezConf, tezDag, localResources, tezPlan); } catch (Exception e) { int errCode = 2017; String msg = "Internal error creating job configuration."; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Feb 24 08:19:42 2017 @@ -22,6 +22,7 @@ import java.io.PrintStream; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -166,7 +167,7 @@ public class TezLauncher extends Launche tezStats = new TezPigScriptStats(pc); PigStats.start(tezStats); - conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); + conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); TezJobCompiler jc = new TezJobCompiler(pc, conf); TezPlanContainer tezPlanContainer = compile(php, pc); @@ -174,6 +175,10 @@ public class TezLauncher extends Launche tezScriptState.emitInitialPlanNotification(tezPlanContainer); tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch + boolean stop_on_failure = + Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false")); + boolean stoppedOnFailure = false; + TezPlanContainerNode tezPlanContainerNode; TezOperPlan tezPlan; int processedDAGs = 0; @@ -252,7 +257,18 @@ public class TezLauncher extends Launche ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100); } handleUnCaughtException(pc); - tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed()); + boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed(); + tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded); + // if stop_on_failure is enabled, we need to stop immediately when any job has failed + if (!tezDAGSucceeded) { + if (stop_on_failure) { + stoppedOnFailure = true; + break; + } else { + log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you " + + "want Pig to stop immediately on failure."); + } + } } tezStats.finish(); @@ -279,6 +295,11 @@ public class TezLauncher extends Launche } } + if (stoppedOnFailure) { + throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017, + PigException.REMOTE_ENVIRONMENT); + } + return tezStats; } @@ -402,9 +423,11 @@ public class TezLauncher extends Launche TezCompiler comp = new TezCompiler(php, pc); comp.compile(); TezPlanContainer planContainer = comp.getPlanContainer(); - for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer - .getKeys().entrySet()) { - TezOperPlan tezPlan = entry.getValue().getTezOperPlan(); + // Doing a sort so that test plan printed remains same between jdk7 and jdk8 + List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet()); + Collections.sort(opKeys); + for (OperatorKey opKey : opKeys) { + TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan(); optimize(tezPlan, pc); } return planContainer; @@ -499,7 +522,7 @@ public class TezLauncher extends Launche @Override public void killJob(String jobID, Configuration conf) throws BackendException { - if (runningJob != null && runningJob.getApplicationId().toString() == jobID) { + if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) { try { runningJob.killJob(); } catch (Exception e) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri Feb 24 08:19:42 2017 @@ -39,6 +39,8 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; +import com.google.common.annotations.VisibleForTesting; + public class TezResourceManager { private static TezResourceManager instance = null; private boolean inited = false; @@ -59,6 +61,7 @@ public class TezResourceManager { /** * This method is only used by test code to reset state. */ + @VisibleForTesting public static void dropInstance() { instance = null; } @@ -66,7 +69,7 @@ public class TezResourceManager { public void init(PigContext pigContext, Configuration conf) throws IOException { if (!inited) { this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext); - this.remoteFs = FileSystem.get(conf); + this.remoteFs = resourcesDir.getFileSystem(conf); this.conf = conf; this.pigContext = pigContext; this.inited = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Feb 24 08:19:42 2017 @@ -18,7 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,9 +31,11 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.client.TezAppMasterStatus; @@ -46,13 +50,13 @@ public class TezSessionManager { private static final Log log = LogFactory.getLog(TezSessionManager.class); static { - Runtime.getRuntime().addShutdownHook(new Thread() { + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { TezSessionManager.shutdown(); } - }); + }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); } private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock(); @@ -61,11 +65,17 @@ public class TezSessionManager { private TezSessionManager() { } - public static class SessionInfo { - SessionInfo(TezClient session, Map<String, LocalResource> resources) { + private static class SessionInfo { + + public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) { this.session = session; + this.config = config; this.resources = resources; } + + public TezConfiguration getConfig() { + return config; + } public Map<String, LocalResource> getResources() { return resources; } @@ -77,20 +87,23 @@ public class TezSessionManager { } private TezClient session; private Map<String, LocalResource> resources; + private TezConfiguration config; private boolean inUse = false; } private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>(); - private static SessionInfo createSession(Configuration conf, + private static SessionInfo createSession(TezConfiguration amConf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { - TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf); + MRToTezHelper.translateMRSettingsForTezAM(amConf); TezScriptState ss = TezScriptState.get(); ss.addDAGSettingsToConf(amConf); - adjustAMConfig(amConf, tezJobConf); - String jobName = conf.get(PigContext.JOB_NAME, "pig"); + if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) { + adjustAMConfig(amConf, tezJobConf); + } + String jobName = amConf.get(PigContext.JOB_NAME, "pig"); TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds); try { tezClient.start(); @@ -104,12 +117,10 @@ public class TezSessionManager { tezClient.stop(); throw new RuntimeException(e); } - return new SessionInfo(tezClient, requestedAMResources); + return new SessionInfo(tezClient, amConf, requestedAMResources); } private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) { - int requiredAMMaxHeap = -1; - int requiredAMResourceMB = -1; String amLaunchOpts = amConf.get( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); @@ -122,8 +133,10 @@ public class TezSessionManager { // Need more room for native memory/virtual address space // when close to 4G due to 32-bit jvm 4G limit - int minAMMaxHeap = 3200; - int minAMResourceMB = 4096; + int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200; + int maxAMResourceMB = 4096; + int requiredAMResourceMB = maxAMResourceMB; + int requiredAMMaxHeap = maxAMHeap; // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb // Increment container size by 512 mb for every additional 5K tasks. @@ -135,22 +148,38 @@ public class TezSessionManager { // 5000 and above - 1024Xmx, 1536 (512 native memory) for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) { if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) { - requiredAMMaxHeap = minAMMaxHeap; - requiredAMResourceMB = minAMResourceMB; break; } - minAMResourceMB = minAMResourceMB - 512; - minAMMaxHeap = minAMResourceMB - 512; + requiredAMResourceMB = requiredAMResourceMB - 512; + requiredAMMaxHeap = requiredAMResourceMB - 512; + } + + if (tezJobConf.getTotalVertices() > 30) { + //Add 512 mb per 30 vertices + int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30); + requiredAMResourceMB = requiredAMResourceMB + additionaMem; + requiredAMMaxHeap = requiredAMResourceMB - 512; + } + + if (tezJobConf.getMaxOutputsinSingleVertex() > 10) { + //Add 256 mb per 5 outputs if a vertex has more than 10 outputs + int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5); + requiredAMResourceMB = requiredAMResourceMB + additionaMem; + requiredAMMaxHeap = requiredAMResourceMB - 512; } + requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB); + requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap); + if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) { amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB); log.info("Increasing " + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from " + configuredAMResourceMB + " to " + requiredAMResourceMB - + " as the number of total estimated tasks is " - + tezJobConf.getEstimatedTotalParallelism()); + + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() + + ", total vertices = " + tezJobConf.getTotalVertices() + + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) { amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, @@ -158,8 +187,9 @@ public class TezSessionManager { log.info("Increasing Tez AM Heap Size from " + configuredAMMaxHeap + "M to " + requiredAMMaxHeap - + "M as the number of total estimated tasks is " - + tezJobConf.getEstimatedTotalParallelism()); + + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() + + ", total vertices = " + tezJobConf.getTotalVertices() + + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now " + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)); } @@ -178,7 +208,22 @@ public class TezSessionManager { return true; } - static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources, + private static boolean validateSessionConfig(SessionInfo currentSession, + Configuration newSessionConfig) + throws TezException, IOException { + // If DAG recovery is disabled for one and enabled for another, do not reuse + if (currentSession.getConfig().getBoolean( + TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT) + != newSessionConfig.getBoolean( + TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { + return false; + } + return true; + } + + static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>(); SessionInfo newSession = null; @@ -196,7 +241,8 @@ public class TezSessionManager { sessionsToRemove.add(sessionInfo); } else if (!sessionInfo.inUse && appMasterStatus.equals(TezAppMasterStatus.READY) - && validateSessionResources(sessionInfo,requestedAMResources)) { + && validateSessionResources(sessionInfo,requestedAMResources) + && validateSessionConfig(sessionInfo, conf)) { sessionInfo.inUse = true; return sessionInfo.session; } @@ -253,6 +299,11 @@ public class TezSessionManager { synchronized (sessionInfo) { if (sessionInfo.session == session) { log.info("Stopping Tez session " + session); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); session.stop(); sessionToRemove = sessionInfo; break; @@ -279,19 +330,30 @@ public class TezSessionManager { shutdown = true; for (SessionInfo sessionInfo : sessionPool) { synchronized (sessionInfo) { + TezClient session = sessionInfo.session; try { - if (sessionInfo.session.getAppMasterStatus().equals( + String timeStamp = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); + if (session.getAppMasterStatus().equals( TezAppMasterStatus.SHUTDOWN)) { log.info("Tez session is already shutdown " - + sessionInfo.session); + + session); + System.err.println(timeStamp + + " Tez session is already shutdown " + session + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); continue; } - log.info("Shutting down Tez session " - + sessionInfo.session); - sessionInfo.session.stop(); + log.info("Shutting down Tez session " + session); + // Since hadoop calls org.apache.log4j.LogManager.shutdown(); + // the log.info message is not displayed with shutdown hook in Oozie + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); + session.stop(); } catch (Exception e) { log.error("Error shutting down Tez session " - + sessionInfo.session, e); + + session, e); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 24 08:19:42 2017 @@ -32,10 +32,12 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.hash.Hash; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; @@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; @@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan private Map<PhysicalOperator, TezOperator> phyToTezOpMap; + // Contains the inputs to operator like join, with the list maintaining the + // same order of join from left to right + private Map<TezOperator, List<TezOperator>> inputsMap; + public static final String USER_COMPARATOR_MARKER = "user.comparator.func:"; public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold"; public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation"; @@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan private boolean optimisticFileConcatenation = false; private List<String> readOnceLoadFuncs = null; + private Configuration conf; + private POLocalRearrangeTezFactory localRearrangeFactory; public TezCompiler(PhysicalPlan plan, PigContext pigContext) @@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan this.pigContext = pigContext; pigProperties = pigContext.getProperties(); + conf = ConfigurationUtil.toConfiguration(pigProperties, false); splitsSeen = Maps.newHashMap(); tezPlan = new TezOperPlan(); nig = NodeIdGenerator.getGenerator(); @@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan scope = roots.get(0).getOperatorKey().getScope(); localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig); phyToTezOpMap = Maps.newHashMap(); + inputsMap = Maps.newHashMap(); fileConcatenationThreshold = Integer.parseInt(pigProperties .getProperty(FILE_CONCATENATION_THRESHOLD, "100")); @@ -655,15 +671,8 @@ public class TezCompiler extends PhyPlan blocking(); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); - // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented - // with a global variable and a specific DistinctCombiner class. This seems better. - PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan; - addDistinctPlan(combinePlan, 1); - - POLocalRearrangeTez clr = localRearrangeFactory.create(); - clr.setOutputKey(curTezOp.getOperatorKey().toString()); - clr.setDistinct(true); - combinePlan.addAsLeaf(clr); + TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey()); + edge.setNeedsDistinctCombiner(true); curTezOp.markDistinct(); addDistinctPlan(curTezOp.plan, op.getRequestedParallelism()); @@ -856,6 +865,7 @@ public class TezCompiler extends PhyPlan } else { curTezOp.plan.addAsLeaf(op); } + phyToTezOpMap.put(op, curTezOp); } catch (Exception e) { int errCode = 2034; @@ -900,6 +910,7 @@ public class TezCompiler extends PhyPlan public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException { try { blocking(); + inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs))); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); curTezOp.setRequestedParallelism(op.getRequestedParallelism()); if (op.isCross()) { @@ -1088,7 +1099,7 @@ public class TezCompiler extends PhyPlan indexerTezOp.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(); + FileSpec strFile = getTempFileSpec(pigContext); st.setSFile(strFile); indexAggrOper.plan.addAsLeaf(st); indexAggrOper.setClosed(true); @@ -1255,7 +1266,7 @@ public class TezCompiler extends PhyPlan rightTezOprAggr.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(); + FileSpec strFile = getTempFileSpec(pigContext); st.setSFile(strFile); rightTezOprAggr.plan.addAsLeaf(st); rightTezOprAggr.setClosed(true); @@ -1346,6 +1357,9 @@ public class TezCompiler extends PhyPlan } else if (op.getNumInps() > 1) { curTezOp.markCogroup(); } + } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { + curTezOp.markRegularJoin(); + addBloomToJoin(op, curTezOp); } } catch (Exception e) { int errCode = 2034; @@ -1354,6 +1368,132 @@ public class TezCompiler extends PhyPlan } } + private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException { + + List<TezOperator> inputs = inputsMap.get(curTezOp); + TezOperator buildBloomOp; + List<TezOperator> applyBloomOps = new ArrayList<>(); + + String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY); + boolean createBloomInMap = "map".equals(strategy); + if (!createBloomInMap && !strategy.equals("reduce")) { + throw new PlanException(new IllegalArgumentException( + "Invalid value for " + + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - " + + strategy + ". Valid values are map and reduce")); + } + int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS); + int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES); + int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf); + int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE)); + + // We build bloom of the right most input and apply the bloom filter on the left inputs by default. + // But in case of left outer join we build bloom of the left input and use it on the right input + boolean[] inner = op.getPkgr().getInner(); + boolean skipNullKeys = true; + if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right + buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input + for (int i = 0; i < (inner.length - 1); i++) { + applyBloomOps.add(inputs.get(i)); + } + skipNullKeys = inner[0]; + } else { + // Left outer join + skipNullKeys = false; + buildBloomOp = inputs.get(0); // Bloom filter is built from left most input + for (int i = 1; i < inner.length; i++) { + applyBloomOps.add(inputs.get(i)); + } + } + + // Add BuildBloom operator to the input + POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0); + POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType); + bbr.setSkipNullKeys(skipNullKeys); + buildBloomOp.plan.remove(lr); + buildBloomOp.plan.addAsLeaf(bbr); + + // Add a new reduce vertex that will construct the final bloom filter + // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy + // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy + TezOperator combineBloomOp = getTezOp(); + tezPlan.add(combineBloomOp); + combineBloomOp.markBuildBloom(); + // Explicitly set the parallelism for the new vertex to number of bloom filters. + // Auto parallelism will bring it down based on the actual output size + combineBloomOp.setEstimatedParallelism(numBloomFilters); + // We don't want parallelism to be changed during the run by grace auto parallelism + // It will take the whole input size and estimate way higher + combineBloomOp.setDontEstimateParallelism(true); + + String combineBloomOpKey = combineBloomOp.getOperatorKey().toString(); + TezEdgeDescriptor edge = new TezEdgeDescriptor(); + TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge); + bbr.setBloomOutputKey(combineBloomOpKey); + + + POPackage pkg = new POPackage(OperatorKey.genOpKey(scope)); + pkg.setNumInps(1); + BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);; + pkgr.setKeyType(DataType.INTEGER); + pkg.setPkgr(pkgr); + POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); + combineBloomOp.plan.addAsLeaf(pkg); + combineBloomOp.plan.addAsLeaf(combineBloomOutput); + + edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); + + // Add combiner as well. + POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); + BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType); + combinerPkgr.setCombiner(true); + combinerPkgr.setKeyType(DataType.INTEGER); + pkg_c.setPkgr(combinerPkgr); + pkg_c.setNumInps(1); + edge.combinePlan.addAsLeaf(pkg_c); + POProject prjKey = new POProject(OperatorKey.genOpKey(scope)); + prjKey.setResultType(DataType.INTEGER); + List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>(); + PhysicalPlan pp = new PhysicalPlan(); + pp.add(prjKey); + clrInps.add(pp); + POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER); + clr.setOutputKey(combineBloomOpKey); + edge.combinePlan.addAsLeaf(clr); + + if (createBloomInMap) { + // No combiner needed on map as there will be only one bloom filter per map for each partition + // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager + edge.setCombinerInMap(false); + edge.setCombinerInReducer(true); + } else { + pkgr.setBloomKeyType(op.getPkgr().getKeyType()); + // Do distinct of the keys on the map side to reduce data sent to reducers. + // In case of reduce, not adding a combiner and doing the distinct during reduce itself. + // If needed one can be added later + edge.setCombinerInMap(true); + edge.setCombinerInReducer(false); + } + + // Broadcast the final bloom filter to other inputs + for (TezOperator applyBloomOp : applyBloomOps) { + applyBloomOp.markFilterBloom(); + lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0); + POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters); + applyBloomOp.plan.remove(lr); + applyBloomOp.plan.addAsLeaf(bfr); + bfr.setInputKey(combineBloomOpKey); + edge = new TezEdgeDescriptor(); + edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge); + combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString()); + } + + } + @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ @@ -1513,7 +1653,7 @@ public class TezCompiler extends PhyPlan for (int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(true); + flat1.add(i == transformPlans.size() - 1 ? true : false); } // This foreach will pick the sort key columns from the POPoissonSample output @@ -1722,7 +1862,7 @@ public class TezCompiler extends PhyPlan * @return * @throws IOException */ - private FileSpec getTempFileSpec() throws IOException { + public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException { return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Feb 24 08:19:42 2017 @@ -31,8 +31,13 @@ import org.apache.tez.runtime.library.ou * Descriptor for Tez edge. It holds combine plan as well as edge properties. */ public class TezEdgeDescriptor implements Serializable { - // Combiner runs on both input and output of Tez edge. - transient public PhysicalPlan combinePlan; + + public transient PhysicalPlan combinePlan; + private boolean needsDistinctCombiner; + // Combiner runs on both input and output of Tez edge by default + // It can be configured to run only in output(map) or input(reduce) + private Boolean combinerInMap; + private Boolean combinerInReducer; public String inputClassName; public String outputClassName; @@ -65,6 +70,30 @@ public class TezEdgeDescriptor implement dataMovementType = DataMovementType.SCATTER_GATHER; } + public boolean needsDistinctCombiner() { + return needsDistinctCombiner; + } + + public void setNeedsDistinctCombiner(boolean nic) { + needsDistinctCombiner = nic; + } + + public Boolean getCombinerInMap() { + return combinerInMap; + } + + public void setCombinerInMap(Boolean combinerInMap) { + this.combinerInMap = combinerInMap; + } + + public Boolean getCombinerInReducer() { + return combinerInReducer; + } + + public void setCombinerInReducer(Boolean combinerInReducer) { + this.combinerInReducer = combinerInReducer; + } + public boolean isUseSecondaryKey() { return useSecondaryKey; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Feb 24 08:19:42 2017 @@ -25,8 +25,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -217,8 +218,12 @@ public class TezOperPlan extends Operato newPlan.add(node); } - Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>(); - for (TezOperator from : mFromEdges.keySet()) { + // Using a LinkedHashSet and doing a sort so that + // test plan printed remains same between jdk7 and jdk8 + Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>(); + List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet()); + Collections.sort(fromEdges); + for (TezOperator from : fromEdges) { List<TezOperator> tos = mFromEdges.get(from); for (TezOperator to : tos) { if (list.contains(from) || list.contains(to)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Feb 24 08:19:42 2017 @@ -181,7 +181,11 @@ public class TezOperator extends Operato // Indicate if this job is a native job NATIVE, // Indicate if this job does rank counter - RANK_COUNTER; + RANK_COUNTER, + // Indicate if this job constructs bloom filter + BUILDBLOOM, + // Indicate if this job applies bloom filter + FILTERBLOOM; }; // Features in the job/vertex. Mostly will be only one feature. @@ -235,6 +239,7 @@ public class TezOperator extends Operato } private LoaderInfo loaderInfo = new LoaderInfo(); + private long totalInputFilesSize = -1; public TezOperator(OperatorKey k) { super(k); @@ -452,6 +457,22 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.RANK_COUNTER.ordinal()); } + public boolean isBuildBloom() { + return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal()); + } + + public void markBuildBloom() { + feature.set(OPER_FEATURE.BUILDBLOOM.ordinal()); + } + + public boolean isFilterBloom() { + return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal()); + } + + public void markFilterBloom() { + feature.set(OPER_FEATURE.FILTERBLOOM.ordinal()); + } + public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { for (OPER_FEATURE opf : OPER_FEATURE.values()) { if (excludeFeatures != null && excludeFeatures.contains(opf)) { @@ -651,6 +672,14 @@ public class TezOperator extends Operato return loaderInfo; } + public long getTotalInputFilesSize() { + return totalInputFilesSize; + } + + public void setTotalInputFilesSize(long totalInputFilesSize) { + this.totalInputFilesSize = totalInputFilesSize; + } + public void setUseGraceParallelism(boolean useGraceParallelism) { this.useGraceParallelism = useGraceParallelism; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Feb 24 08:19:42 2017 @@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten @Override public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange; - if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) { + if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) { return; } loRearrangeFound++; @@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten if(keyInfo == null) keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - Integer index = Integer.valueOf(lrearrange.getIndex()); + // For BloomPackager there is only one input, but the + // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero + Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex()); if(keyInfo.get(index) != null) { if (isPOSplit) { // Case of POSplit having more than one input in case of self join or union @@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten } - keyInfo.put(index, - new Pair<Boolean, Map<Integer, Integer>>( - lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); - pkg.getPkgr().setKeyInfo(keyInfo); - pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); - pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + if (pkg.getPkgr() instanceof BloomPackager ) { + keyInfo.put(index, + new Pair<Boolean, Map<Integer, Integer>>( + Boolean.FALSE, new HashMap<Integer, Integer>())); + pkg.getPkgr().setKeyInfo(keyInfo); + } else { + keyInfo.put(index, + new Pair<Boolean, Map<Integer, Integer>>( + lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + } + } /**
