Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Mar 4 18:17:39 2016 @@ -22,16 +22,18 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Set; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.RawComparator; @@ -43,6 +45,10 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; @@ -52,18 +58,6 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter; @@ -82,10 +76,10 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; @@ -99,20 +93,21 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigInputFormatTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper; -import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -121,7 +116,9 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.UDFContextSeparator.UDFType; import org.apache.pig.tools.pigstats.tez.TezScriptState; +import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -136,6 +133,7 @@ import org.apache.tez.dag.api.InputIniti import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; @@ -145,50 +143,142 @@ import org.apache.tez.dag.library.vertex import org.apache.tez.mapreduce.combine.MRCombiner; import org.apache.tez.mapreduce.committer.MROutputCommitter; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; +import org.apache.tez.mapreduce.hadoop.InputSplitInfo; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto.Builder; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput; import org.apache.tez.runtime.library.input.UnorderedKVInput; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; /** * A visitor to construct DAG out of Tez plan. */ public class TezDagBuilder extends TezOpPlanVisitor { - private static final Log log = LogFactory.getLog(TezJobCompiler.class); + private static final Log log = LogFactory.getLog(TezDagBuilder.class); + + private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 * 1024L; + private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L; private DAG dag; private Map<String, LocalResource> localResources; private PigContext pc; private Configuration globalConf; + private Configuration pigContextConf; + private FileSystem fs; private long intermediateTaskInputSize; + private Set<String> inputSplitInDiskVertices; + private TezUDFContextSeparator udfContextSeparator; + + private String serializedTezPlan; + private String serializedPigContext; + private String serializedUDFImportList; + + // Map corresponds to root vertices, reduce to intermediate and leaf vertices + private Resource mapTaskResource; + private Resource reduceTaskResource; + private Map<String, String> mapTaskEnv = new HashMap<String, String>(); + private Map<String, String> reduceTaskEnv = new HashMap<String, String>(); + private String mapTaskLaunchCmdOpts; + private String reduceTaskLaunchCmdOpts; public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); this.pc = pc; - this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); this.localResources = localResources; this.dag = dag; + this.inputSplitInDiskVertices = new HashSet<String>(); try { - // Add credentials from binary token file and get tokens for namenodes - // specified in mapreduce.job.hdfs-servers - SecurityHelper.populateTokenCache(globalConf, dag.getCredentials()); + initialize(pc); + + udfContextSeparator = new TezUDFContextSeparator(plan, + new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + udfContextSeparator.visit(); } catch (IOException e) { - throw new RuntimeException("Error while fetching delegation tokens", e); + throw new RuntimeException(e); + } + } + + private void initialize(PigContext pc) throws IOException { + + this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); + + this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); + MRToTezHelper.processMRSettings(pigContextConf, globalConf); + + // Add credentials from binary token file and get tokens for namenodes + // specified in mapreduce.job.hdfs-servers + SecurityHelper.populateTokenCache(globalConf, dag.getCredentials()); + + // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods in TEZ-1012 + // set the timestamps, public/private visibility of the archives and files + ClientDistributedCacheManager + .determineTimestampsAndCacheVisibilities(globalConf); + // get DelegationToken for each cached file + ClientDistributedCacheManager.getDelegationTokens(globalConf, + dag.getCredentials()); + MRApps.setupDistributedCache(globalConf, this.localResources); + dag.addTaskLocalFiles(this.localResources); + + int mapMemoryMB; + int reduceMemoryMB; + int mapVCores; + int reduceVCores; + if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB) != null) { + mapMemoryMB = globalConf.getInt( + TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, + TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT); + reduceMemoryMB = mapMemoryMB; + } else { + // If tez setting is not defined, try MR setting + mapMemoryMB = globalConf.getInt(MRJobConfig.MAP_MEMORY_MB, + MRJobConfig.DEFAULT_MAP_MEMORY_MB); + reduceMemoryMB = globalConf.getInt(MRJobConfig.REDUCE_MEMORY_MB, + MRJobConfig.DEFAULT_REDUCE_MEMORY_MB); + } + + if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES) != null) { + mapVCores = globalConf.getInt( + TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES, + TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT); + reduceVCores = mapVCores; + } else { + mapVCores = globalConf.getInt(MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES); + reduceVCores = globalConf.getInt(MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + } + mapTaskResource = Resource.newInstance(mapMemoryMB, mapVCores); + reduceTaskResource = Resource.newInstance(reduceMemoryMB, reduceVCores); + + if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) { + // If tez setting is not defined + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true); + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true); + } + + if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) { + mapTaskLaunchCmdOpts = globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS); + reduceTaskLaunchCmdOpts = mapTaskLaunchCmdOpts; + } else { + // If tez setting is not defined, try MR setting + mapTaskLaunchCmdOpts = MRHelpers.getJavaOptsForMRMapper(globalConf); + reduceTaskLaunchCmdOpts = MRHelpers.getJavaOptsForMRReducer(globalConf); } try { - intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc)); + fs = FileSystem.get(globalConf); + intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc)); } catch (Exception e) { log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); intermediateTaskInputSize = 134217728L; @@ -199,6 +289,51 @@ public class TezDagBuilder extends TezOp globalConf.getLong( InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); + + serializedPigContext = ObjectSerializer.serialize(pc); + serializedUDFImportList = ObjectSerializer.serialize(PigContext.getPackageImportList()); + } + + private String getSerializedTezPlan() throws IOException { + if (serializedTezPlan == null) { + // Initialize lazy instead of constructor as this might not be needed + serializedTezPlan = ObjectSerializer.serialize(getPlan()); + } + return serializedTezPlan; + } + + // Hack to turn off relocalization till TEZ-2192 is fixed. + public void avoidContainerReuseIfInputSplitInDisk() throws IOException { + if (!inputSplitInDiskVertices.isEmpty()) { + // Create empty job.split file and add as resource to all other + // vertices that are not reading splits from disk so that their + // containers are not reused by vertices that read splits from disk + Path jobSplitFile = new Path(FileLocalizer.getTemporaryPath(pc), + MRJobConfig.JOB_SPLIT); + FSDataOutputStream out = fs.create(jobSplitFile); + out.close(); + log.info("Creating empty job.split in " + jobSplitFile); + FileStatus splitFileStatus = fs.getFileStatus(jobSplitFile); + LocalResource localResource = LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(jobSplitFile), + LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, + splitFileStatus.getLen(), + splitFileStatus.getModificationTime()); + for (Vertex vertex : dag.getVertices()) { + if (!inputSplitInDiskVertices.contains(vertex.getName())) { + if (vertex.getTaskLocalFiles().containsKey( + MRJobConfig.JOB_SPLIT)) { + throw new RuntimeException( + "LocalResources already contains a" + + " resource named " + + MRJobConfig.JOB_SPLIT); + } + vertex.getTaskLocalFiles().put(MRJobConfig.JOB_SPLIT, + localResource); + } + } + } } @Override @@ -244,7 +379,7 @@ public class TezDagBuilder extends TezOp if (tezOp.isVertexGroup()) { groupMembers[i] = from; } else { - EdgeProperty prop = newEdge(pred, tezOp); + EdgeProperty prop = newEdge(pred, tezOp, false); Edge edge = Edge.create(from, to, prop); dag.addEdge(edge); } @@ -262,7 +397,7 @@ public class TezDagBuilder extends TezOp POStore store = tezOp.getVertexGroupInfo().getStore(); if (store != null) { vertexGroup.addDataSink(store.getOperatorKey().toString(), - new DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(), + DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(), OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials())); } } @@ -272,7 +407,7 @@ public class TezDagBuilder extends TezOp private GroupInputEdge newGroupInputEdge(TezOperator fromOp, TezOperator toOp, VertexGroup from, Vertex to) throws IOException { - EdgeProperty edgeProperty = newEdge(fromOp, toOp); + EdgeProperty edgeProperty = newEdge(fromOp, toOp, true); String groupInputClass = ConcatenatedMergedKeyValueInput.class.getName(); @@ -284,8 +419,7 @@ public class TezDagBuilder extends TezOp } return GroupInputEdge.create(from, to, edgeProperty, - InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()) - .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText())); + InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())); } /** @@ -296,7 +430,7 @@ public class TezDagBuilder extends TezOp * @return EdgeProperty * @throws IOException */ - private EdgeProperty newEdge(TezOperator from, TezOperator to) + private EdgeProperty newEdge(TezOperator from, TezOperator to, boolean isMergedInput) throws IOException { TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey()); PhysicalPlan combinePlan = edge.combinePlan; @@ -304,9 +438,11 @@ public class TezDagBuilder extends TezOp InputDescriptor in = InputDescriptor.create(edge.inputClassName); OutputDescriptor out = OutputDescriptor.create(edge.outputClassName); - Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); + Configuration conf = new Configuration(pigContextConf); + if (!combinePlan.isEmpty()) { - addCombiner(combinePlan, to, conf); + udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC); + addCombiner(combinePlan, to, conf, isMergedInput); } List<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(from.plan, @@ -315,7 +451,7 @@ public class TezDagBuilder extends TezOp for (POLocalRearrangeTez lr : lrs) { if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { byte keyType = lr.getKeyType(); - setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage()); + setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput); // In case of secondary key sort, main key type is the actual key type conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType())); break; @@ -341,9 +477,9 @@ public class TezDagBuilder extends TezOp } conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); - conf.set("pig.pigContext", ObjectSerializer.serialize(pc)); - conf.set("udf.import.list", - ObjectSerializer.serialize(PigContext.getPackageImportList())); + conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true); + conf.set("pig.pigContext", serializedPigContext); + conf.set("udf.import.list", serializedUDFImportList); if(to.isGlobalSort() || to.isLimitAfterSort()){ conf.set("pig.sortOrder", @@ -371,34 +507,33 @@ public class TezDagBuilder extends TezOp edge.partitionerClass.getName()); } - conf.set("udf.import.list", - ObjectSerializer.serialize(PigContext.getPackageImportList())); - - MRToTezHelper.processMRSettings(conf, globalConf); + in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - String historyString = convertToHistoryText("", conf); - in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString); - out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString); - - if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) { + if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) { // Use custom edge return EdgeProperty.create((EdgeManagerPluginDescriptor)null, edge.dataSourceType, edge.schedulingType, out, in); } + if (to.isUseGraceParallelism()) { + // Put datamovement to null to prevent vertex "to" from starting. It will be started by PigGraceShuffleVertexManager + return EdgeProperty.create((EdgeManagerPluginDescriptor)null, edge.dataSourceType, + edge.schedulingType, out, in); + } return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType, edge.schedulingType, out, in); } private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp, - Configuration conf) throws IOException { + Configuration conf, boolean isMergedInput) throws IOException { POPackage combPack = (POPackage) combinePlan.getRoots().get(0); POLocalRearrange combRearrange = (POLocalRearrange) combinePlan .getLeaves().get(0); - setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp); + setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp, true, isMergedInput); LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer( - combinePlan, pkgTezOp, combPack); + combinePlan, null, pkgTezOp, combPack); lrDiscoverer.visit(); combinePlan.remove(combPack); @@ -406,10 +541,6 @@ public class TezDagBuilder extends TezOp MRCombiner.class.getName()); conf.set(MRJobConfig.COMBINE_CLASS_ATTR, PigCombiner.Combine.class.getName()); - conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); - conf.set("pig.pigContext", ObjectSerializer.serialize(pc)); - conf.set("udf.import.list", - ObjectSerializer.serialize(PigContext.getPackageImportList())); conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan)); conf.set("pig.combine.package", ObjectSerializer.serialize(combPack)); conf.set("pig.map.keytype", ObjectSerializer @@ -422,7 +553,7 @@ public class TezDagBuilder extends TezOp tezOp.getProcessorName()); // Pass physical plans to vertex as user payload. - JobConf payloadConf = new JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false)); + JobConf payloadConf = new JobConf(pigContextConf); // We do this so that dag.getCredentials(), job.getCredentials(), // job.getConfiguration().getCredentials() all reference the same Credentials object @@ -432,6 +563,38 @@ public class TezDagBuilder extends TezOp @SuppressWarnings("deprecation") Job job = new Job(payloadConf); payloadConf = (JobConf) job.getConfiguration(); + //TODO: Investigate. Setting as map writes empty output. + //payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR, tezOp.isUseMRMapSettings()); + payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); + payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true); + payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS, + PigInputFormatTez.class, InputFormat.class); + setOutputFormat(job); + payloadConf.set("udf.import.list", serializedUDFImportList); + payloadConf.set("exectype", "TEZ"); + + // Process stores + LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); + + Configuration inputPayLoad = null; + Configuration outputPayLoad = null; + + if (!stores.isEmpty()) { + outputPayLoad = new Configuration(payloadConf); + outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES, + ObjectSerializer.serialize(new ArrayList<POStore>())); + } + + if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) { + payloadConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp())); + payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists())); + payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits())); + inputPayLoad = new Configuration(payloadConf); + if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) { + inputPayLoad.set("pig.pigContext", serializedPigContext); + } + } + payloadConf.set("pig.pigContext", serializedPigContext); if (tezOp.getSampleOperator() != null) { payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString()); @@ -445,7 +608,7 @@ public class TezDagBuilder extends TezOp // usually followed by limit other than store. But would benefit // cases like skewed join followed by group by. if (tezOp.getSortOperator().getEstimatedParallelism() != -1 - && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) { + && tezOp.getSortOperator().isIntermediateReducer()) { payloadConf.setLong( InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, intermediateTaskInputSize); @@ -453,20 +616,6 @@ public class TezDagBuilder extends TezOp } - payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp())); - payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists())); - payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits())); - // Process stores - LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); - - payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc)); - payloadConf.set("udf.import.list", - ObjectSerializer.serialize(PigContext.getPackageImportList())); - payloadConf.set("exectype", "TEZ"); - payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); - payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS, - PigInputFormat.class, InputFormat.class); - // Set parent plan for all operators in the Tez plan. new PhyPlanSetter(tezOp.plan).visit(); @@ -492,23 +641,27 @@ public class TezDagBuilder extends TezOp byte keyType = pack.getPkgr().getKeyType(); tezOp.plan.remove(pack); payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); - setIntermediateOutputKeyValue(keyType, payloadConf, tezOp); + POShuffleTezLoad newPack = new POShuffleTezLoad(pack); if (tezOp.isSkewedJoin()) { newPack.setSkewedJoins(true); } tezOp.plan.add(newPack); + boolean isMergedInput = false; // Set input keys for POShuffleTezLoad. This is used to identify // the inputs that are attached to the POShuffleTezLoad in the // backend. Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>(); + TezOperator from = null; for (TezOperator pred : mPlan.getPredecessors(tezOp)) { if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) { // skip sample vertex input } else { String inputKey = pred.getOperatorKey().toString(); + boolean isVertexGroup = false; if (pred.isVertexGroup()) { + isVertexGroup = true; pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0)); } LinkedList<POLocalRearrangeTez> lrs = @@ -517,6 +670,10 @@ public class TezDagBuilder extends TezOp if (lr.isConnectedToPackage() && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) { localRearrangeMap.put((int) lr.getIndex(), inputKey); + if (isVertexGroup) { + isMergedInput = true; + } + from = pred; } } } @@ -531,36 +688,19 @@ public class TezDagBuilder extends TezOp } } - setIntermediateOutputKeyValue(pack.getPkgr().getKeyType(), payloadConf, tezOp); - } else if (roots.size() == 1 && roots.get(0) instanceof POIdentityInOutTez) { - POIdentityInOutTez identityInOut = (POIdentityInOutTez) roots.get(0); - // TODO Need to fix multiple input key mapping - TezOperator identityInOutPred = null; - for (TezOperator pred : mPlan.getPredecessors(tezOp)) { - if (!pred.isSampleAggregation()) { - identityInOutPred = pred; - break; - } - } - identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString()); - } else if (roots.size() == 1 && roots.get(0) instanceof POValueInputTez) { - POValueInputTez valueInput = (POValueInputTez) roots.get(0); - - LinkedList<String> scalarInputs = new LinkedList<String>(); - for (POUserFunc userFunc : PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) { - if (userFunc.getFunc() instanceof ReadScalarsTez) { - scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]); - } - } - // Make sure we don't find the scalar - for (TezOperator pred : mPlan.getPredecessors(tezOp)) { - if (!scalarInputs.contains(pred.getOperatorKey().toString())) { - valueInput.setInputKey(pred.getOperatorKey().toString()); - break; - } + //POShuffleTezLoad accesses the comparator setting + selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput); + + if (tezOp.isUseSecondaryKey()) { + TezEdgeDescriptor edge = tezOp.inEdges.get(from.getOperatorKey()); + // Currently only PigSecondaryKeyGroupingComparator is used in POShuffleTezLoad. + // When PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup is fixed + // in future, PigSecondaryKeyComparator will have to be used and that will require this. + payloadConf.set("pig.secondarySortOrder", ObjectSerializer + .serialize(edge.getSecondarySortOrder())); } + } - setOutputFormat(job); // set parent plan in all operators. currently the parent plan is really // used only when POStream, POSplit are present in the plan @@ -570,9 +710,7 @@ public class TezDagBuilder extends TezOp payloadConf.set(PigProcessor.PLAN, ObjectSerializer.serialize(tezOp.plan)); - UDFContext.getUDFContext().serialize(payloadConf); - - MRToTezHelper.processMRSettings(payloadConf, globalConf); + udfContextSeparator.serializeUDFContext(payloadConf, tezOp); if (!pc.inIllustrator) { for (POStore store : stores) { @@ -604,66 +742,190 @@ public class TezDagBuilder extends TezOp // Take our assembled configuration and create a vertex UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf); - procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf)); + TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName()); + String alias = dagScriptInfo.getAlias(tezOp); + String aliasLocation = dagScriptInfo.getAliasLocation(tezOp); + String features = dagScriptInfo.getPigFeatures(tezOp); + String vertexInfo = aliasLocation + " (" + features + ")" ; + procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf)); + + String vmPluginName = null; + Configuration vmPluginConf = null; + + // Set the right VertexManagerPlugin + if (tezOp.getEstimatedParallelism() != -1) { + if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { + if (tezOp.getVertexParallelism()==-1 && ( + tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1|| + tezOp.isSkewedJoin() &&getPlan().getPredecessors(tezOp).size()==2)) { + // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able + // to decrease/increase parallelism of sorting vertex dynamically + // based on the numQuantiles calculated by sample aggregation vertex + vmPluginName = PartitionerDefinedVertexManager.class.getName(); + log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); + } + } else { + boolean containScatterGather = false; + boolean containCustomPartitioner = false; + for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { + if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { + containScatterGather = true; + } + if (edge.partitionerClass!=null) { + containCustomPartitioner = true; + } + } + if (containScatterGather && !containCustomPartitioner) { + vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf; + // Use auto-parallelism feature of ShuffleVertexManager to dynamically + // reduce the parallelism of the vertex + if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) + && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) { + vmPluginName = PigGraceShuffleVertexManager.class.getName(); + tezOp.setUseGraceParallelism(true); + vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); + vmPluginConf.set("pig.pigContext", serializedPigContext); + } else { + vmPluginName = ShuffleVertexManager.class.getName(); + } + vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + // For Intermediate reduce, set the bytes per reducer to be block size. + long bytesPerReducer = intermediateTaskInputSize; + // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user. + // If not as default use 384MB for group bys and 256 MB for joins. Not using + // default 1G as that value was suited for mapreduce logic where numReducers=(map input size/bytesPerReducer). + // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce + // as map input sizes are mostly always high compared to map output. + if (stores.size() > 0) { + if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { + bytesPerReducer = vmPluginConf.getLong( + InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); + } else if (tezOp.isGroupBy()) { + bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT; + } else { + bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT; + } + } + vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer); + log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); + } + } + } + if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())|| + vmPluginName.equals(ShuffleVertexManager.class.getName()))) { + if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) { + // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks, + // limit job starts when 1 source task finishes. + // If limit is part of a group by or join because their parallelism is 1, + // we should leave the configuration with the defaults. + vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf; + vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001"); + vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001"); + log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString()); + } + } - Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(), - tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf)); + int parallel = tezOp.getVertexParallelism(); + if (tezOp.isUseGraceParallelism()) { + parallel = -1; + } + Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : reduceTaskResource; - Map<String, String> taskEnv = new HashMap<String, String>(); - MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, tezOp.isUseMRMapSettings()); - vertex.setTaskEnvironment(taskEnv); + Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, parallel, resource); - // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods in TEZ-1012 - // set the timestamps, public/private visibility of the archives and files - ClientDistributedCacheManager - .determineTimestampsAndCacheVisibilities(globalConf); - // get DelegationToken for each cached file - ClientDistributedCacheManager.getDelegationTokens(globalConf, - job.getCredentials()); - MRApps.setupDistributedCache(globalConf, localResources); - vertex.addTaskLocalFiles(localResources); + if (tezOp.isUseMRMapSettings()) { + vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts); + vertex.setTaskEnvironment(mapTaskEnv); + } else { + vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts); + vertex.setTaskEnvironment(reduceTaskEnv); + } - vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ? MRHelpers.getJavaOptsForMRMapper(globalConf) - : MRHelpers.getJavaOptsForMRReducer(globalConf)); + MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), globalConf); log.info("For vertex - " + tezOp.getOperatorKey().toString() + ": parallelism=" + tezOp.getVertexParallelism() + ", memory=" + vertex.getTaskResource().getMemory() + ", java opts=" + vertex.getTaskLaunchCmdOpts() ); - + log.info("Processing aliases: " + alias); + log.info("Detailed locations: " + aliasLocation); + log.info("Pig features in the vertex: " + features); // Right now there can only be one of each of these. Will need to be // more generic when there can be more. for (POLoad ld : tezOp.getLoaderInfo().getLoads()) { // TODO: These should get the globalConf, or a merged version that // keeps settings like pig.maxCombinedSplitSize - vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints())); + Builder userPayLoadBuilder = MRRuntimeProtos.MRInputUserPayloadProto.newBuilder(); + + InputSplitInfo inputSplitInfo = tezOp.getLoaderInfo().getInputSplitInfo(); + Map<String, LocalResource> additionalLocalResources = null; + int spillThreshold = payloadConf + .getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT); + + // Currently inputSplitInfo is always InputSplitInfoMem at this point + if (inputSplitInfo instanceof InputSplitInfoMem) { + MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); + int splitsSerializedSize = splitsProto.getSerializedSize(); + if(splitsSerializedSize > spillThreshold) { + inputPayLoad.setBoolean( + org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, + false); + // Write splits to disk + Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc); + log.info("Writing input splits to " + inputSplitsDir + + " for vertex " + vertex.getName() + + " as the serialized size in memory is " + + splitsSerializedSize + ". Configured " + + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD + + " is " + spillThreshold); + inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk( + (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs); + additionalLocalResources = new HashMap<String, LocalResource>(); + MRToTezHelper.updateLocalResourcesForInputSplits( + fs, inputSplitInfo, + additionalLocalResources); + inputSplitInDiskVertices.add(vertex.getName()); + } else { + // Send splits via RPC to AM + userPayLoadBuilder.setSplits(splitsProto); + } + //Free up memory + tezOp.getLoaderInfo().setInputSplitInfo(null); + } + + udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp, UDFType.LOADFUNC); + userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad)); + + vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints())); vertex.addDataSource(ld.getOperatorKey().toString(), DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName()) - .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder() - .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf)) - .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())) - .setHistoryText(convertToHistoryText("", payloadConf)), - InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials())); + .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())), + InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), + inputSplitInfo.getNumTasks(), + dag.getCredentials(), + null, + additionalLocalResources)); } + // Union within a split can have multiple stores writing to same output + Set<String> uniqueStoreOutputs = new HashSet<String>(); for (POStore store : stores) { - ArrayList<POStore> emptyList = new ArrayList<POStore>(); ArrayList<POStore> singleStore = new ArrayList<POStore>(); singleStore.add(store); - Configuration outputPayLoad = new Configuration(payloadConf); - outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES, - ObjectSerializer.serialize(emptyList)); - outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES, + Configuration outPayLoad = new Configuration(outputPayLoad); + udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store); + outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(singleStore)); OutputDescriptor storeOutDescriptor = OutputDescriptor.create( MROutput.class.getName()).setUserPayload(TezUtils - .createUserPayloadFromConf(outputPayLoad)) - .setHistoryText(convertToHistoryText("", outputPayLoad)); + .createUserPayloadFromConf(outPayLoad)); if (tezOp.getVertexGroupStores() != null) { OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey()); if (vertexGroupKey != null) { @@ -672,10 +934,14 @@ public class TezDagBuilder extends TezOp continue; } } - vertex.addDataSink(store.getOperatorKey().toString(), - new DataSinkDescriptor(storeOutDescriptor, - OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), - dag.getCredentials())); + String outputKey = ((POStoreTez) store).getOutputKey(); + if (!uniqueStoreOutputs.contains(outputKey)) { + vertex.addDataSink(outputKey.toString(), + DataSinkDescriptor.create(storeOutDescriptor, + OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), + dag.getCredentials())); + uniqueStoreOutputs.add(outputKey); + } } // LoadFunc and StoreFunc add delegation tokens to Job Credentials in @@ -686,62 +952,6 @@ public class TezDagBuilder extends TezOp new PigOutputFormat().checkOutputSpecs(job); } - String vmPluginName = null; - Configuration vmPluginConf = null; - - // Set the right VertexManagerPlugin - if (tezOp.getEstimatedParallelism() != -1) { - if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { - // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able - // to decrease/increase parallelism of sorting vertex dynamically - // based on the numQuantiles calculated by sample aggregation vertex - vmPluginName = PartitionerDefinedVertexManager.class.getName(); - log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); - } else { - boolean containScatterGather = false; - boolean containCustomPartitioner = false; - for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { - if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { - containScatterGather = true; - } - if (edge.partitionerClass!=null) { - containCustomPartitioner = true; - } - } - if (containScatterGather && !containCustomPartitioner) { - // Use auto-parallelism feature of ShuffleVertexManager to dynamically - // reduce the parallelism of the vertex - vmPluginName = ShuffleVertexManager.class.getName(); - vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf; - vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); - if (stores.size() <= 0) { - // Intermediate reduce. Set the bytes per reducer to be block size. - vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, - intermediateTaskInputSize); - } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, - InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) != - InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) { - vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, - vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, - InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); - } - log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); - } - } - } - if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(ShuffleVertexManager.class.getName()))) { - if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) { - // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks, - // limit job starts when 1 source task finishes. - // If limit is part of a group by or join because their parallelism is 1, - // we should leave the configuration with the defaults. - vmPluginName = ShuffleVertexManager.class.getName(); - vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf; - vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001"); - vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001"); - log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString()); - } - } // else if(tezOp.isLimitAfterSort()) // TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input // instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself @@ -750,8 +960,7 @@ public class TezDagBuilder extends TezOp if (vmPluginName != null) { VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName); if (vmPluginConf != null) { - vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf)) - .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf)); + vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf)); } vertex.setVertexManagerPlugin(vmPluginDescriptor); } @@ -828,14 +1037,9 @@ public class TezDagBuilder extends TezOp return stores; } - private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp) - throws JobCreationException, ExecException { - setIntermediateOutputKeyValue(keyType, conf, tezOp, true); - } - @SuppressWarnings("rawtypes") private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp, - boolean isConnectedToPackage) throws JobCreationException, ExecException { + boolean isConnectedToPackage, boolean isMergedInput) throws JobCreationException, ExecException { if (tezOp != null && tezOp.isUseSecondaryKey() && isConnectedToPackage) { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, NullableTuple.class.getName()); @@ -853,12 +1057,86 @@ public class TezDagBuilder extends TezOp NullableTuple.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName()); - selectOutputComparator(keyType, conf, tezOp); + selectKeyComparator(keyType, conf, tezOp, isMergedInput); } - private static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean hasOrderBy) + private static Class<? extends WritableComparator> getRawBytesComparator( + byte keyType) throws JobCreationException { + + // These comparators only compare bytes and we will use them except for + // order by for faster sorting. + // This ordering is good enough to be fed to reducer (POShuffleTezLoad) + // which will use the full comparator (GroupingComparator) for correct + // sorting and grouping. + // TODO: PIG-4652. Till Tez exposes a way to get bytes of keys being compared, + // we can use this only for groupby and distinct which are single inputs in + // POShuffleTezLoad and not join which has multiple inputs. + + switch (keyType) { + case DataType.BOOLEAN: + return PigWritableComparators.PigBooleanRawBytesComparator.class; + + case DataType.INTEGER: + return PigWritableComparators.PigIntRawBytesComparator.class; + + case DataType.BIGINTEGER: + return PigWritableComparators.PigBigIntegerRawBytesComparator.class; + + case DataType.BIGDECIMAL: + return PigWritableComparators.PigBigDecimalRawBytesComparator.class; + + case DataType.LONG: + return PigWritableComparators.PigLongRawBytesComparator.class; + + case DataType.FLOAT: + return PigWritableComparators.PigFloatRawBytesComparator.class; + + case DataType.DOUBLE: + return PigWritableComparators.PigDoubleRawBytesComparator.class; + + case DataType.DATETIME: + return PigWritableComparators.PigDateTimeRawBytesComparator.class; + + case DataType.CHARARRAY: + return PigWritableComparators.PigTextRawBytesComparator.class; + + case DataType.BYTEARRAY: + return PigWritableComparators.PigBytesRawBytesComparator.class; + + case DataType.MAP: + int errCode = 1068; + String msg = "Using Map as key not supported."; + throw new JobCreationException(msg, errCode, PigException.INPUT); + + case DataType.TUPLE: + return PigWritableComparators.PigTupleSortBytesComparator.class; + + case DataType.BAG: + errCode = 1068; + msg = "Using Bag as key not supported."; + throw new JobCreationException(msg, errCode, PigException.INPUT); + + default: + errCode = 2036; + msg = "Unhandled key type " + DataType.findTypeName(keyType); + throw new JobCreationException(msg, errCode, PigException.BUG); + } + } + + private static Class<? extends WritableComparator> getRawComparator(byte keyType) throws JobCreationException { + // These are full comparators used in order by jobs and as GroupingComparator in + // POShuffleTezLoad for other operations. + + // Mapreduce uses PigGrouping<DataType>WritableComparator for non-orderby jobs. + // In Tez, we will use the raw comparators itself on the reduce side as well as it is + // now fixed to handle nulls for different indexes. + // Also PigGrouping<DataType>WritableComparator use PigNullablePartitionWritable.compareTo + // which is not that efficient for cases like tuple where tuple is iterated for null checking + // instead of taking advantage of TupleRawComparator.hasComparedTupleNull(). + // Also skips multi-query index checking + switch (keyType) { case DataType.BOOLEAN: return PigBooleanRawComparator.class; @@ -888,11 +1166,7 @@ public class TezDagBuilder extends TezOp return PigTextRawComparator.class; case DataType.BYTEARRAY: - //if (hasOrderBy) { - return PigBytesRawComparator.class; - //} else { - // return PigDBAWritableComparator.class; - //} + return PigBytesRawComparator.class; case DataType.MAP: int errCode = 1068; @@ -900,14 +1174,7 @@ public class TezDagBuilder extends TezOp throw new JobCreationException(msg, errCode, PigException.INPUT); case DataType.TUPLE: - //TODO: PigTupleWritableComparator gives wrong results with cogroup in - //Checkin_2 and few other e2e tests. But MR has PigTupleWritableComparator - //Investigate the difference later - //if (hasOrderBy) { - return PigTupleSortComparator.class; - //} else { - // return PigTupleWritableComparator.class; - //} + return PigTupleSortComparator.class; case DataType.BAG: errCode = 1068; @@ -921,39 +1188,40 @@ public class TezDagBuilder extends TezOp } } - private static Class<? extends WritableComparator> getGroupingComparatorForKeyType(byte keyType) + private static Class<? extends WritableComparator> getRawBytesComparatorForSkewedJoin(byte keyType) throws JobCreationException { + // Extended Raw Bytes Comparators for SkewedJoin which unwrap the NullablePartitionWritable switch (keyType) { case DataType.BOOLEAN: - return PigGroupingBooleanWritableComparator.class; + return PigWritableComparators.PigBooleanRawBytesPartitionComparator.class; case DataType.INTEGER: - return PigGroupingIntWritableComparator.class; + return PigWritableComparators.PigIntRawBytesPartitionComparator.class; case DataType.BIGINTEGER: - return PigGroupingBigIntegerWritableComparator.class; + return PigWritableComparators.PigBigIntegerRawBytesPartitionComparator.class; case DataType.BIGDECIMAL: - return PigGroupingBigDecimalWritableComparator.class; + return PigWritableComparators.PigBigDecimalRawBytesPartitionComparator.class; case DataType.LONG: - return PigGroupingLongWritableComparator.class; + return PigWritableComparators.PigLongRawBytesPartitionComparator.class; case DataType.FLOAT: - return PigGroupingFloatWritableComparator.class; + return PigWritableComparators.PigFloatRawBytesPartitionComparator.class; case DataType.DOUBLE: - return PigGroupingDoubleWritableComparator.class; + return PigWritableComparators.PigDoubleRawBytesPartitionComparator.class; case DataType.DATETIME: - return PigGroupingDateTimeWritableComparator.class; + return PigWritableComparators.PigDateTimeRawBytesPartitionComparator.class; case DataType.CHARARRAY: - return PigGroupingCharArrayWritableComparator.class; + return PigWritableComparators.PigTextRawBytesPartitionComparator.class; case DataType.BYTEARRAY: - return PigGroupingDBAWritableComparator.class; + return PigWritableComparators.PigBytesRawBytesPartitionComparator.class; case DataType.MAP: int errCode = 1068; @@ -961,7 +1229,7 @@ public class TezDagBuilder extends TezOp throw new JobCreationException(msg, errCode, PigException.INPUT); case DataType.TUPLE: - return PigGroupingTupleWritableComparator.class; + return PigWritableComparators.PigTupleSortBytesPartitionComparator.class; case DataType.BAG: errCode = 1068; @@ -975,30 +1243,121 @@ public class TezDagBuilder extends TezOp } } - void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp) + private static Class<? extends WritableComparator> getRawComparatorForSkewedJoin(byte keyType) + throws JobCreationException { + + // Extended Raw Comparators for SkewedJoin which unwrap the NullablePartitionWritable + switch (keyType) { + case DataType.BOOLEAN: + return PigWritableComparators.PigBooleanRawPartitionComparator.class; + + case DataType.INTEGER: + return PigWritableComparators.PigIntRawPartitionComparator.class; + + case DataType.BIGINTEGER: + return PigWritableComparators.PigBigIntegerRawPartitionComparator.class; + + case DataType.BIGDECIMAL: + return PigWritableComparators.PigBigDecimalRawPartitionComparator.class; + + case DataType.LONG: + return PigWritableComparators.PigLongRawPartitionComparator.class; + + case DataType.FLOAT: + return PigWritableComparators.PigFloatRawPartitionComparator.class; + + case DataType.DOUBLE: + return PigWritableComparators.PigDoubleRawPartitionComparator.class; + + case DataType.DATETIME: + return PigWritableComparators.PigDateTimeRawPartitionComparator.class; + + case DataType.CHARARRAY: + return PigWritableComparators.PigTextRawPartitionComparator.class; + + case DataType.BYTEARRAY: + return PigWritableComparators.PigBytesRawPartitionComparator.class; + + case DataType.MAP: + int errCode = 1068; + String msg = "Using Map as key not supported."; + throw new JobCreationException(msg, errCode, PigException.INPUT); + + case DataType.TUPLE: + return PigWritableComparators.PigTupleSortPartitionComparator.class; + + case DataType.BAG: + errCode = 1068; + msg = "Using Bag as key not supported."; + throw new JobCreationException(msg, errCode, PigException.INPUT); + + default: + errCode = 2036; + msg = "Unhandled key type " + DataType.findTypeName(keyType); + throw new JobCreationException(msg, errCode, PigException.BUG); + } + } + + void selectKeyComparator(byte keyType, Configuration conf, TezOperator tezOp, boolean isMergedInput) throws JobCreationException { // TODO: Handle sorting like in JobControlCompiler // TODO: Group comparators as in JobControlCompiler - if (tezOp != null && tezOp.isUseSecondaryKey()) { + if (tezOp == null) { + return; + } + if (tezOp.isUseSecondaryKey()) { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, PigSecondaryKeyComparator.class.getName()); setGroupingComparator(conf, PigSecondaryKeyGroupComparator.class.getName()); } else { - if (tezOp != null && tezOp.isSkewedJoin()) { - // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR. - // What should be TEZ_RUNTIME_KEY_COMPARATOR_CLASS if same as MR? - conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, - PigGroupingPartitionWritableComparator.class.getName()); - setGroupingComparator(conf, PigGroupingPartitionWritableComparator.class.getName()); + // If it is not a merged input (OrderedGroupedMergedKVInput) from union then + // use bytes only comparator. This is temporary till PIG-4652 is done + if (!isMergedInput && (tezOp.isGroupBy() || tezOp.isDistinct())) { + conf.setClass( + TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawBytesComparator(keyType), RawComparator.class); + } else if (tezOp.isSkewedJoin()) { + conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawComparatorForSkewedJoin(keyType), RawComparator.class); } else { - boolean hasOrderby = hasOrderby(tezOp); conf.setClass( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, - comparatorForKeyType(keyType, hasOrderby), RawComparator.class); - if (!hasOrderby) { - setGroupingComparator(conf, getGroupingComparatorForKeyType(keyType).getName()); - } + getRawComparator(keyType), RawComparator.class); + } + + // Comparators now + // groupby/distinct : Comparator - RawBytesComparator + // groupby/distinct after union : Comparator - RawComparator + // orderby : Comparator - RawComparator + // skewed join : Comparator - RawPartitionComparator + // Rest (other joins) : Comparator - RawComparator + + //TODO: In PIG-4652: After Tez support for exposing key bytes + // groupby/distinct : Comparator - RawBytesComparator. No grouping comparator required. + // orderby : Comparator - RawComparator. No grouping comparator required. + // skewed join : Comparator - RawBytesPartitionComparator, GroupingComparator - RawPartitionComparator + // Rest (other joins) : Comparator - RawBytesComparator, GroupingComparator - RawComparator + + /* + if (tezOp.isSkewedJoin()) { + conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawBytesComparatorForSkewedJoin(keyType), RawComparator.class); + setGroupingComparator(conf, getRawComparatorForSkewedJoin(keyType).getName()); + } else if (tezOp.isGroupBy() || tezOp.isDistinct()) { + conf.setClass( + TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawBytesComparator(keyType), RawComparator.class); + } else if (hasOrderby(tezOp)) { + conf.setClass( + TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawComparator(keyType), RawComparator.class); + } else { + conf.setClass( + TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, + getRawBytesComparator(keyType), RawComparator.class); + setGroupingComparator(conf, getRawComparator(keyType).getName()); } + */ } } @@ -1016,6 +1375,7 @@ public class TezDagBuilder extends TezOp return hasOrderBy; } + private void setGroupingComparator(Configuration conf, String comparatorClass) { // In MR - job.setGroupingComparatorClass() or MRJobConfig.GROUP_COMPARATOR_CLASS // TODO: Check why tez-mapreduce ReduceProcessor use two different tez @@ -1046,27 +1406,4 @@ public class TezDagBuilder extends TezOp job.setOutputFormatClass(PigOutputFormatTez.class); } } - - // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2 - public static String convertToHistoryText(String description, Configuration conf) throws IOException { - // Add a version if this serialization is changed - JSONObject jsonObject = new JSONObject(); - try { - if (description != null && !description.isEmpty()) { - jsonObject.put("desc", description); - } - if (conf != null) { - JSONObject confJson = new JSONObject(); - Iterator<Entry<String, String>> iter = conf.iterator(); - while (iter.hasNext()) { - Entry<String, String> entry = iter.next(); - confJson.put(entry.getKey(), entry.getValue()); - } - jsonObject.put("config", confJson); - } - } catch (JSONException e) { - throw new IOException("Error when trying to convert description/conf to JSON", e); - } - return jsonObject.toString(); - } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Mar 4 18:17:39 2016 @@ -223,10 +223,16 @@ public class TezJob implements Runnable private class DAGStatusReporter extends TimerTask { + private String prevDAGStatus; + @Override public void run() { if (dagStatus == null) return; - log.info("DAG Status: " + dagStatus.toString()); + String currDAGStatus = dagStatus.toString(); + if (!currDAGStatus.equals(prevDAGStatus)) { + log.info("DAG Status: " + currDAGStatus); + prevDAGStatus = currDAGStatus; + } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Mar 4 18:17:39 2016 @@ -27,7 +27,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.pig.PigException; @@ -36,8 +35,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; import org.apache.pig.impl.PigContext; +import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; /** * This is compiler class that takes a TezOperPlan and converts it into a @@ -58,9 +60,10 @@ public class TezJobCompiler { public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources) throws IOException, YarnException { DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString()); - tezDag.setCredentials(new Credentials()); + tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials()); TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources); dagBuilder.visit(); + dagBuilder.avoidContainerReuseIfInputSplitInDisk(); return tezDag; } @@ -92,20 +95,19 @@ public class TezJobCompiler { String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files"); if (shipFiles != null) { for (String file : shipFiles.split(",")) { - TezResourceManager.getInstance().addTezResource(new File(file).toURI()); + TezResourceManager.getInstance().addTezResource(new File(file.trim()).toURI()); } } String cacheFiles = pigContext.getProperties().getProperty("pig.streaming.cache.files"); if (cacheFiles != null) { - for (String file : cacheFiles.split(",")) { - // Do new URI() before passing to Path constructor else it encodes # when there is symlink - TezResourceManager.getInstance().addTezResource(new Path(new URI(file.trim())).toUri()); - } + addCacheResources(cacheFiles.split(",")); } for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) { log.info("Local resource: " + entry.getKey()); } DAG tezDag = buildDAG(tezPlanNode, localResources); + tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript())); + log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism()); return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism()); } catch (Exception e) { int errCode = 2017; @@ -113,5 +115,33 @@ public class TezJobCompiler { throw new JobCreationException(msg, errCode, PigException.BUG, e); } } + + private void addCacheResources(String[] fileNames) throws Exception { + for (String fileName : fileNames) { + fileName = fileName.trim(); + if (fileName.length() > 0) { + URI resourceURI = new URI(fileName); + String fragment = resourceURI.getFragment(); + + Path remoteFsPath = new Path(resourceURI); + String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName(); + TezResourceManager.getInstance().addTezResource(resourceName, remoteFsPath); + } + } + } + + private String createDagInfo(String script) throws IOException { + String dagInfo; + try { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("context", "Pig"); + jsonObject.put("description", script); + dagInfo = jsonObject.toString(); + } catch (JSONException e) { + throw new IOException("Error when trying to convert Pig script to JSON", e); + } + log.debug("DagInfo: " + dagInfo); + return dagInfo; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Mar 4 18:17:39 2016 @@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.PrintStream; +import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,8 +34,9 @@ import java.util.concurrent.ThreadFactor import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.counters.Limits; +import org.apache.hadoop.util.StringUtils; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.PigWarning; @@ -112,10 +115,32 @@ public class TezLauncher extends Launche if (pc.getExecType().isLocal()) { pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, "true"); pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true"); - pc.getProperties().setProperty("tez.ignore.lib.uris", "true"); - pc.getProperties().setProperty("tez.am.dag.scheduler.class", DAGSchedulerNaturalOrderControlled.class.getName()); + pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true"); + pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, DAGSchedulerNaturalOrderControlled.class.getName()); } Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); + // Make sure MR counter does not exceed limit + if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX) != null) { + conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, Math.max( + conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, 0), + conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX, 0))); + } + if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS) != null) { + conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, Math.max( + conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, 0), + conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 0))); + } + + // This is hacky, but Limits cannot be initialized twice + try { + Field f = Limits.class.getDeclaredField("isInited"); + f.setAccessible(true); + f.setBoolean(null, false); + Limits.init(conf); + } catch (Throwable e) { + log.warn("Error when setting counter limit: " + e.getMessage()); + } + if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) { pc.defaultParallel = 1; } @@ -124,9 +149,16 @@ public class TezLauncher extends Launche TezResourceManager tezResourceManager = TezResourceManager.getInstance(); tezResourceManager.init(pc, conf); - Path stagingDir = tezResourceManager.getStagingDir(); - log.info("Tez staging directory is " + stagingDir.toString()); - conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); + String stagingDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR); + String resourcesDir = tezResourceManager.getResourcesDir().toString(); + if (stagingDir == null) { + // If not set in tez-site.xml, use Pig's tez resources directory as staging directory + // instead of TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT + stagingDir = resourcesDir; + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, resourcesDir); + } + log.info("Tez staging directory is " + stagingDir + " and resources directory is " + resourcesDir); + List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>(); @@ -403,11 +435,27 @@ public class TezLauncher extends Launche skOptimizer.visit(); } + boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true); + List<String> supportedStoreFuncs = null; + String unionSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS); + if (unionSupported != null && unionSupported.trim().length() > 0) { + supportedStoreFuncs = Arrays + .asList(StringUtils.split(unionSupported.trim())); + } + List<String> unionUnsupportedStoreFuncs = null; + String unionUnSupported = conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS); + if (unionUnSupported != null && unionUnSupported.trim().length() > 0) { + unionUnsupportedStoreFuncs = Arrays + .asList(StringUtils.split(unionUnSupported.trim())); + } + boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true); if (isMultiQuery) { // reduces the number of TezOpers in the Tez plan generated // by multi-query (multi-store) script. - MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan); + MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez( + tezPlan, isUnionOpt, supportedStoreFuncs, + unionUnsupportedStoreFuncs); mqOptimizer.visit(); } @@ -419,9 +467,8 @@ public class TezLauncher extends Launche } // Use VertexGroup in Tez - boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true); if (isUnionOpt) { - UnionOptimizer uo = new UnionOptimizer(tezPlan); + UnionOptimizer uo = new UnionOptimizer(tezPlan, supportedStoreFuncs, unionUnsupportedStoreFuncs); uo.visit(); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri Mar 4 18:17:39 2016 @@ -17,6 +17,8 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez; +import static org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getFromCache; + import java.io.IOException; import java.net.URI; import java.util.HashMap; @@ -33,13 +35,14 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.pig.PigConfiguration; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; public class TezResourceManager { private static TezResourceManager instance = null; private boolean inited = false; - private Path stagingDir; + private Path resourcesDir; private FileSystem remoteFs; private Configuration conf; private PigContext pigContext; @@ -54,7 +57,7 @@ public class TezResourceManager { public void init(PigContext pigContext, Configuration conf) throws IOException { if (!inited) { - this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext); + this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext); this.remoteFs = FileSystem.get(conf); this.conf = conf; this.pigContext = pigContext; @@ -62,8 +65,8 @@ public class TezResourceManager { } } - public Path getStagingDir() { - return stagingDir; + public Path getResourcesDir() { + return resourcesDir; } // Add files from the source FS as local resources. The resource name will @@ -79,7 +82,19 @@ public class TezResourceManager { // Ship the local resource to the staging directory on the remote FS if (!pigContext.getExecType().isLocal() && uri.toString().startsWith("file:")) { - Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName)); + boolean cacheEnabled = + conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, false); + + if(cacheEnabled){ + Path pathOnDfs = getFromCache(pigContext, conf, uri.toURL()); + if(pathOnDfs != null) { + resources.put(resourceName, pathOnDfs); + return pathOnDfs; + } + + } + + Path remoteFsPath = remoteFs.makeQualified(new Path(resourcesDir, resourceName)); remoteFs.copyFromLocalFile(resourcePath, remoteFsPath); remoteFs.setReplication(remoteFsPath, (short)conf.getInt(Job.SUBMIT_REPLICATION, 3)); resources.put(resourceName, remoteFsPath); @@ -115,7 +130,8 @@ public class TezResourceManager { // The resource name will be symlinked to the resource path in the // container's working directory. Path resourcePath = resources.get(resourceName); - FileStatus fstat = remoteFs.getFileStatus(resourcePath); + FileSystem fileSystem = resourcePath.getFileSystem(conf); + FileStatus fstat = fileSystem.getFileStatus(resourcePath); LocalResource tezResource = LocalResource.newInstance( ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
