Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Mon May 29 15:00:39 2017 @@ -122,6 +122,15 @@ public class POMergeCogroup extends Phys this.endOfRecordMark = endOfRecordMark; } + //For Spark + private transient boolean endOfInput = false; + public boolean isEndOfInput() { + return endOfInput; + } + public void setEndOfInput (boolean isEndOfInput) { + endOfInput = isEndOfInput; + } + @Override public Result getNextTuple() throws ExecException { @@ -145,7 +154,7 @@ public class POMergeCogroup extends Phys break; case POStatus.STATUS_EOP: - if(!this.parentPlan.endOfAllInput) + if(!(this.parentPlan.endOfAllInput || isEndOfInput())) return baseInp; if(lastTime)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Mon May 29 15:00:39 2017 @@ -131,6 +131,24 @@ public class POMergeJoin extends Physica private byte endOfRecordMark = POStatus.STATUS_NULL; + // Only for Spark + // If current operator reaches at its end, flag endOfInput is set as true. + // The old flag parentPlan.endOfAllInput doesn't work in spark mode, because it is shared + // between operators in the same plan, so it could be set by preceding operators even + // current operator does not reach at its end. (see PIG-4876) + private transient boolean endOfInput = false; + public boolean isEndOfInput() { + return endOfInput; + } + public void setEndOfInput (boolean isEndOfInput) { + endOfInput = isEndOfInput; + } + + // Only for spark. + // it means that current operator reaches at its end and the last left input was + // added into 'leftTuples', ready for join. + private boolean leftInputConsumedInSpark = false; + // This serves as the default TupleFactory private transient TupleFactory mTupleFactory; @@ -352,7 +370,7 @@ public class POMergeJoin extends Physica } else if(cmpval > 0){ // We got ahead on right side. Store currently read right tuple. - if(!this.parentPlan.endOfAllInput){ + if(!(this.parentPlan.endOfAllInput|| leftInputConsumedInSpark)){ prevRightKey = rightKey; prevRightInp = rightInp; // There cant be any more join on this key. @@ -413,11 +431,14 @@ public class POMergeJoin extends Physica } case POStatus.STATUS_EOP: - if(this.parentPlan.endOfAllInput){ + if(this.parentPlan.endOfAllInput || isEndOfInput()){ // We hit the end on left input. // Tuples in bag may still possibly join with right side. curJoinKey = prevLeftKey; curLeftKey = null; + if (isEndOfInput()) { + leftInputConsumedInSpark = true; + } break; } else // Fetch next left input. @@ -427,7 +448,9 @@ public class POMergeJoin extends Physica return curLeftInp; } - if((null != prevRightKey) && !this.parentPlan.endOfAllInput && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){ + if((null != prevRightKey) + && !(this.parentPlan.endOfAllInput || leftInputConsumedInSpark) + && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){ // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side // In that case, throw away the tuples accumulated till now and add the one we read in this function call. @@ -509,7 +532,7 @@ public class POMergeJoin extends Physica leftTuples.add((Tuple)curLeftInp.result); prevLeftInp = curLeftInp; prevLeftKey = curLeftKey; - if(this.parentPlan.endOfAllInput){ // This is end of all input and this is last time we will read right input. + if(this.parentPlan.endOfAllInput || leftInputConsumedInSpark){ // This is end of all input and this is last time we will read right input. // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself. try { ((IndexableLoadFunc)rightLoader).close(); @@ -719,4 +742,8 @@ public class POMergeJoin extends Physica public LOJoin.JOINTYPE getJoinType() { return joinType; } + + public POLocalRearrange[] getLRs() { + return LRs; + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Mon May 29 15:00:39 2017 @@ -29,45 +29,45 @@ import org.apache.pig.impl.plan.VisitorE public class POPoissonSample extends PhysicalOperator { - private static final long serialVersionUID = 1L; + protected static final long serialVersionUID = 1L; // 17 is not a magic number. It can be obtained by using a poisson // cumulative distribution function with the mean set to 10 (empirically, // minimum number of samples) and the confidence set to 95% public static final int DEFAULT_SAMPLE_RATE = 17; - private int sampleRate = 0; + protected int sampleRate = 0; - private float heapPerc = 0f; + protected float heapPerc = 0f; - private Long totalMemory; + protected Long totalMemory; - private transient boolean initialized; + protected transient boolean initialized; // num of rows skipped so far - private transient int numSkipped; + protected transient int numSkipped; // num of rows sampled so far - private transient int numRowsSampled; + protected transient int numRowsSampled; // average size of tuple in memory, for tuples sampled - private transient long avgTupleMemSz; + protected transient long avgTupleMemSz; // current row number - private transient long rowNum; + protected transient long rowNum; // number of tuples to skip after each sample - private transient long skipInterval; + protected transient long skipInterval; // bytes in input to skip after every sample. // divide this by avgTupleMemSize to get skipInterval - private transient long memToSkipPerSample; + protected transient long memToSkipPerSample; // has the special row with row number information been returned - private transient boolean numRowSplTupleReturned; + protected transient boolean numRowSplTupleReturned; // new Sample result - private transient Result newSample; + protected transient Result newSample; public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) { super(k, rp, null); @@ -204,7 +204,7 @@ public class POPoissonSample extends Phy * and recalculate skipInterval * @param t - tuple */ - private void updateSkipInterval(Tuple t) { + protected void updateSkipInterval(Tuple t) { avgTupleMemSz = ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1); skipInterval = memToSkipPerSample/avgTupleMemSz; @@ -224,7 +224,7 @@ public class POPoissonSample extends Phy * @return - Tuple appended with special marker string column, num-rows column * @throws ExecException */ - private Result createNumRowTuple(Tuple sample) throws ExecException { + protected Result createNumRowTuple(Tuple sample) throws ExecException { int sz = (sample == null) ? 0 : sample.size(); Tuple t = mTupleFactory.newTuple(sz + 2); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Mon May 29 15:00:39 2017 @@ -347,6 +347,10 @@ public class POSort extends PhysicalOper mSortFunc = sortFunc; } + public Comparator<Tuple> getMComparator() { + return mComparator; + } + public List<Boolean> getMAscCols() { return mAscCols; } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Mon May 29 15:00:39 2017 @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.JobCreationException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.tools.pigstats.spark.SparkPigStats; +import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; + +import com.google.common.collect.Lists; + +public class JobGraphBuilder extends SparkOpPlanVisitor { + + private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class); + public static final int NULLPART_JOB_ID = -1; + + private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap = null; + private SparkPigStats sparkStats = null; + private JavaSparkContext sparkContext = null; + private JobMetricsListener jobMetricsListener = null; + private String jobGroupID = null; + private Set<Integer> seenJobIDs = new HashSet<Integer>(); + private SparkOperPlan sparkPlan = null; + private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap<OperatorKey, RDD<Tuple>>(); + private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap<OperatorKey, RDD<Tuple>>(); + private JobConf jobConf = null; + private PigContext pc; + + public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap, + SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener + jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext pc) { + super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true)); + this.sparkPlan = plan; + this.convertMap = convertMap; + this.sparkStats = sparkStats; + this.sparkContext = sparkContext; + this.jobMetricsListener = jobMetricsListener; + this.jobGroupID = jobGroupID; + this.jobConf = jobConf; + this.pc = pc; + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + new PhyPlanSetter(sparkOp.physicalPlan).visit(); + try { + setReplicationForMergeJoin(sparkOp.physicalPlan); + sparkOperToRDD(sparkOp); + finishUDFs(sparkOp.physicalPlan); + } catch (Exception e) { + throw new VisitorException("fail to get the rdds of this spark operator: ", e); + } + } + + private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException { + List<Path> filesForMoreReplication = new ArrayList<>(); + List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class); + if (poMergeJoins.size() > 0) { + for (POMergeJoin poMergeJoin : poMergeJoins) { + String idxFileName = poMergeJoin.getIndexFile(); + if (idxFileName != null) { + filesForMoreReplication.add(new Path(idxFileName)); + } + // in spark mode, set as null so that PoMergeJoin won't use hadoop distributed cache + // see POMergeJoin.seekInRightStream() + poMergeJoin.setIndexFile(null); + } + } + + setReplicationForFiles(filesForMoreReplication); + } + + private void setReplicationForFiles(List<Path> files) throws IOException { + FileSystem fs = FileSystem.get(this.jobConf); + short replication = (short) jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10); + for (int i = 0; i < files.size(); i++) { + fs.setReplication(files.get(i), replication); + } + } + + // Calling EvalFunc.finish() + private void finishUDFs(PhysicalPlan physicalPlan) throws VisitorException { + UDFFinishVisitor finisher = new UDFFinishVisitor(physicalPlan, + new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>( + physicalPlan)); + try { + finisher.visit(); + } catch (VisitorException e) { + int errCode = 2121; + String msg = "Error while calling finish method on UDFs."; + throw new VisitorException(msg, errCode, PigException.BUG, e); + } + } + + private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException, ExecException { + List<SparkOperator> predecessors = sparkPlan + .getPredecessors(sparkOperator); + Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>(); + if (predecessors != null) { + for (SparkOperator pred : predecessors) { + predecessorOfPreviousSparkOp.add(pred.getOperatorKey()); + } + } + + boolean isFail = false; + Exception exception = null; + if (sparkOperator instanceof NativeSparkOperator) { + ((NativeSparkOperator) sparkOperator).runJob(); + } else { + List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves(); + + //One SparkOperator may have multiple leaves(POStores) after multiquery feature is enabled + if (LOG.isDebugEnabled()) { + LOG.debug("sparkOperator.physicalPlan have " + sparkOperator.physicalPlan.getLeaves().size() + " leaves"); + } + for (PhysicalOperator leafPO : leafPOs) { + try { + physicalToRDD(sparkOperator, sparkOperator.physicalPlan, leafPO, + predecessorOfPreviousSparkOp); + sparkOpRdds.put(sparkOperator.getOperatorKey(), + physicalOpRdds.get(leafPO.getOperatorKey())); + } catch (Exception e) { + LOG.error("throw exception in sparkOperToRDD: ", e); + exception = e; + isFail = true; + boolean stopOnFailure = Boolean.valueOf(pc + .getProperties().getProperty("stop.on.failure", + "false")); + if (stopOnFailure) { + int errCode = 6017; + throw new ExecException(e.getMessage(), errCode, + PigException.REMOTE_ENVIRONMENT); + } + } + } + + + List<POStore> poStores = PlanHelper.getPhysicalOperators( + sparkOperator.physicalPlan, POStore.class); + Collections.sort(poStores); + if (poStores.size() > 0) { + int i = 0; + if (!isFail) { + List<Integer> jobIDs = getJobIDs(seenJobIDs); + for (POStore poStore : poStores) { + if (jobIDs.size() == 0) { + /** + * Spark internally misses information about its jobs that mapped 0 partitions. + * Although these have valid jobIds, Spark itself is unable to tell anything about them. + * If the store rdd had 0 partitions we return a dummy success stat with jobId = + * NULLPART_JOB_ID, in any other cases we throw exception if no new jobId was seen. + */ + if (physicalOpRdds.get(poStore.getOperatorKey()).partitions().length == 0) { + sparkStats.addJobStats(poStore, sparkOperator, NULLPART_JOB_ID, null, sparkContext); + return; + } else { + throw new RuntimeException("Expected at least one unseen jobID " + + " in this call to getJobIdsForGroup, but got 0"); + } + } + SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator, + jobMetricsListener, sparkContext, sparkStats); + } + } else { + for (POStore poStore : poStores) { + String failJobID = sparkOperator.name().concat("_fail"); + SparkStatsUtil.addFailJobStats(failJobID, poStore, sparkOperator, sparkStats, exception); + } + } + } + } + } + + private void physicalToRDD(SparkOperator sparkOperator, PhysicalPlan plan, + PhysicalOperator physicalOperator, + Set<OperatorKey> predsFromPreviousSparkOper) + throws IOException { + RDD<Tuple> nextRDD = null; + List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan, physicalOperator); + LinkedHashSet<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>(); + addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds); + if (predecessorsOfCurrentPhysicalOp != null) { + for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) { + physicalToRDD(sparkOperator, plan, predecessor, predsFromPreviousSparkOper); + operatorKeysOfAllPreds.add(predecessor.getOperatorKey()); + } + + } else { + if (predsFromPreviousSparkOper != null + && predsFromPreviousSparkOper.size() > 0) { + for (OperatorKey predFromPreviousSparkOper : predsFromPreviousSparkOper) { + operatorKeysOfAllPreds.add(predFromPreviousSparkOper); + } + } + } + + if (physicalOperator instanceof POSplit) { + List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans(); + for (PhysicalPlan successorPlan : successorPlans) { + List<PhysicalOperator> leavesOfSuccessPlan = successorPlan.getLeaves(); + if (leavesOfSuccessPlan.size() != 1) { + throw new RuntimeException("the size of the leaves of successorPlan should be 1"); + } + PhysicalOperator leafOfSuccessPlan = leavesOfSuccessPlan.get(0); + physicalToRDD(sparkOperator, successorPlan, leafOfSuccessPlan, operatorKeysOfAllPreds); + } + } else { + RDDConverter converter = convertMap.get(physicalOperator.getClass()); + if (converter == null) { + throw new IllegalArgumentException( + "Pig on Spark does not support Physical Operator: " + physicalOperator); + } + + LOG.info("Converting operator " + + physicalOperator.getClass().getSimpleName() + " " + + physicalOperator); + List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds); + + if (converter instanceof FRJoinConverter) { + setReplicatedInputs(physicalOperator, (FRJoinConverter) converter); + } + + if (sparkOperator.isSkewedJoin() && converter instanceof SkewedJoinConverter) { + SkewedJoinConverter skewedJoinConverter = (SkewedJoinConverter) converter; + skewedJoinConverter.setSkewedJoinPartitionFile(sparkOperator.getSkewedJoinPartitionFile()); + } + adjustRuntimeParallelismForSkewedJoin(physicalOperator, sparkOperator, allPredRDDs); + nextRDD = converter.convert(allPredRDDs, physicalOperator); + + if (nextRDD == null) { + throw new IllegalArgumentException( + "RDD should not be null after PhysicalOperator: " + + physicalOperator); + } + + physicalOpRdds.put(physicalOperator.getOperatorKey(), nextRDD); + } + } + + private void setReplicatedInputs(PhysicalOperator physicalOperator, FRJoinConverter converter) { + Set<String> replicatedInputs = new HashSet<>(); + for (PhysicalOperator operator : physicalOperator.getInputs()) { + if (operator instanceof POBroadcastSpark) { + replicatedInputs.add(((POBroadcastSpark) operator).getBroadcastedVariableName()); + } + } + converter.setReplicatedInputs(replicatedInputs); + } + + private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) { + List preds = null; + if (!(op instanceof POJoinGroupSpark)) { + preds = plan.getPredecessors(op); + if (preds != null && preds.size() > 1 && !(op instanceof POSkewedJoin)) { + Collections.sort(preds); + } + } else { + //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ sort to get + //the predecessors with correct order, more detail see JoinOptimizerSpark#restructSparkOp + preds = ((POJoinGroupSpark) op).getPredecessors(); + } + return preds; + } + + //get all rdds of predecessors sorted by the OperatorKey + private List<RDD<Tuple>> sortPredecessorRDDs(LinkedHashSet <OperatorKey> operatorKeysOfAllPreds) { + List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList(); + for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) { + predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred)); + } + return predecessorRDDs; + } + + //deal special cases containing operators with multiple predecessors when multiquery is enabled to get the predecessors of specified + // physicalOp in previous SparkOp(see PIG-4675) + private void addPredsFromPrevoiousSparkOp(SparkOperator sparkOperator, PhysicalOperator physicalOperator, Set<OperatorKey> operatorKeysOfPredecessors) { + // the relationship is stored in sparkOperator.getMultiQueryOptimizeConnectionItem() + List<OperatorKey> predOperatorKeys = sparkOperator.getMultiQueryOptimizeConnectionItem().get(physicalOperator.getOperatorKey()); + if (predOperatorKeys != null) { + for (OperatorKey predOperator : predOperatorKeys) { + LOG.debug(String.format("add predecessor(OperatorKey:%s) for OperatorKey:%s", predOperator, physicalOperator.getOperatorKey())); + operatorKeysOfPredecessors.add(predOperator); + } + } + } + + /** + * In Spark, currently only async actions return job id. There is no async + * equivalent of actions like saveAsNewAPIHadoopFile() + * <p/> + * The only other way to get a job id is to register a "job group ID" with + * the spark context and request all job ids corresponding to that job group + * via getJobIdsForGroup. + * <p/> + * However getJobIdsForGroup does not guarantee the order of the elements in + * it's result. + * <p/> + * This method simply returns the previously unseen job ids. + * + * @param seenJobIDs job ids in the job group that are already seen + * @return Spark job ids not seen before + */ + private List<Integer> getJobIDs(Set<Integer> seenJobIDs) { + Set<Integer> groupjobIDs = new HashSet<Integer>( + Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker() + .getJobIdsForGroup(jobGroupID)))); + groupjobIDs.removeAll(seenJobIDs); + List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs); + seenJobIDs.addAll(unseenJobIDs); + return unseenJobIDs; + } + + + /** + * if the parallelism of skewed join is NOT specified by user in the script when sampling, + * set a default parallelism for sampling + * + * @param physicalOperator + * @param sparkOperator + * @param allPredRDDs + * @throws VisitorException + */ + private void adjustRuntimeParallelismForSkewedJoin(PhysicalOperator physicalOperator, + SparkOperator sparkOperator, + List<RDD<Tuple>> allPredRDDs) throws VisitorException { + // We need to calculate the final number of reducers of the next job (skew-join) + // adjust parallelism of ConstantExpression + if (sparkOperator.isSampler() && sparkPlan.getSuccessors(sparkOperator) != null + && physicalOperator instanceof POPoissonSampleSpark) { + // set the runtime #reducer of the next job as the #partition + + int defaultParallelism = SparkPigContext.get().getParallelism(allPredRDDs, physicalOperator); + + ParallelConstantVisitor visitor = + new ParallelConstantVisitor(sparkOperator.physicalPlan, defaultParallelism); + visitor.visit(); + } + } + + /** + * here, we don't reuse MR/Tez's ParallelConstantVisitor + * To automatic adjust reducer parallelism for skewed join, we only adjust the + * ConstantExpression operator after POPoissionSampleSpark operator + */ + private static class ParallelConstantVisitor extends PhyPlanVisitor { + + private int rp; + private boolean replaced = false; + private boolean isAfterSampleOperator = false; + + public ParallelConstantVisitor(PhysicalPlan plan, int rp) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + this.rp = rp; + } + + @Override + public void visitConstant(ConstantExpression cnst) throws VisitorException { + if (isAfterSampleOperator && cnst.getRequestedParallelism() == -1) { + Object obj = cnst.getValue(); + if (obj instanceof Integer) { + if (replaced) { + // sample job should have only one ConstantExpression + throw new VisitorException("Invalid reduce plan: more " + + "than one ConstantExpression found in sampling job"); + } + cnst.setValue(rp); + cnst.setRequestedParallelism(rp); + replaced = true; + } + } + } + + @Override + public void visitPoissonSample(POPoissonSample po) { + isAfterSampleOperator = true; + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java Mon May 29 15:00:39 2017 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerBlockUpdated; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class JobMetricsListener implements SparkListener { + + private static final Log LOG = LogFactory.getLog(JobMetricsListener.class); + + private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap(); + private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap(); + private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap(); + private final Set<Integer> finishedJobIds = Sets.newHashSet(); + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { +// uncomment and remove the code onTaskEnd until we fix PIG-5157. It is better to update taskMetrics of stage when stage completes +// if we update taskMetrics in onTaskEnd(), it consumes lot of memory. +// int stageId = stageCompleted.stageInfo().stageId(); +// int stageAttemptId = stageCompleted.stageInfo().attemptId(); +// String stageIdentifier = stageId + "_" + stageAttemptId; +// Integer jobId = stageIdToJobId.get(stageId); +// if (jobId == null) { +// LOG.warn("Cannot find job id for stage[" + stageId + "]."); +// } else { +// Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); +// if (jobMetrics == null) { +// jobMetrics = Maps.newHashMap(); +// allJobMetrics.put(jobId, jobMetrics); +// } +// List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); +// if (stageMetrics == null) { +// stageMetrics = Lists.newLinkedList(); +// jobMetrics.put(stageIdentifier, stageMetrics); +// } +// // uncomment until we fix PIG-5157. after we upgrade to spark2.0 StageInfo().taskMetrics() api is available +// // stageMetrics.add(stageCompleted.stageInfo().taskMetrics()); +// } + } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + + } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { + + } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + + } + + @Override + public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { + + } + + @Override + public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { + + } + + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated){ + + } + + @Override + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { + int stageId = taskEnd.stageId(); + int stageAttemptId = taskEnd.stageAttemptId(); + String stageIdentifier = stageId + "_" + stageAttemptId; + Integer jobId = stageIdToJobId.get(stageId); + if (jobId == null) { + LOG.warn("Cannot find job id for stage[" + stageId + "]."); + } else { + Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId); + if (jobMetrics == null) { + jobMetrics = Maps.newHashMap(); + allJobMetrics.put(jobId, jobMetrics); + } + List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier); + if (stageMetrics == null) { + stageMetrics = Lists.newLinkedList(); + jobMetrics.put(stageIdentifier, stageMetrics); + } + stageMetrics.add(taskEnd.taskMetrics()); + } + } + + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + int jobId = jobStart.jobId(); + int size = jobStart.stageIds().size(); + int[] intStageIds = new int[size]; + for (int i = 0; i < size; i++) { + Integer stageId = (Integer) jobStart.stageIds().apply(i); + intStageIds[i] = stageId; + stageIdToJobId.put(stageId, jobId); + } + jobIdToStageId.put(jobId, intStageIds); + } + + @Override + public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { + finishedJobIds.add(jobEnd.jobId()); + notify(); + } + + @Override + public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { + + } + + @Override + public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { + + } + + @Override + public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { + + } + + @Override + public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { + + } + + @Override + public void onApplicationStart(SparkListenerApplicationStart applicationStart) { + + } + + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { + + } + + @Override + public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { + + } + + + public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) { + return allJobMetrics.get(jobId); + } + + public synchronized boolean waitForJobToEnd(int jobId) throws InterruptedException { + if (finishedJobIds.contains(jobId)) { + finishedJobIds.remove(jobId); + return true; + } + + wait(); + return false; + } + + public synchronized void cleanup(int jobId) { + allJobMetrics.remove(jobId); + jobIdToStageId.remove(jobId); + Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, Integer> entry = iterator.next(); + if (entry.getValue() == jobId) { + iterator.remove(); + } + } + } + + public synchronized void reset() { + stageIdToJobId.clear(); + jobIdToStageId.clear(); + allJobMetrics.clear(); + finishedJobIds.clear(); + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java Mon May 29 15:00:39 2017 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +/** + * Pulled class from Hive on Spark + */ +public class KryoSerializer { + private static final Log LOG = LogFactory.getLog(KryoSerializer.class); + + public static byte[] serializeJobConf(JobConf jobConf) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + jobConf.write(new DataOutputStream(out)); + } catch (IOException e) { + LOG.error("Error serializing job configuration", e); + return null; + } finally { + try { + out.close(); + } catch (IOException e) { + LOG.error("Error closing output stream", e); + } + } + + return out.toByteArray(); + + } + + public static JobConf deserializeJobConf(byte[] buffer) { + JobConf conf = new JobConf(); + try { + conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer))); + } catch (IOException e) { + LOG.error("Error de-serializing job configuration"); + return null; + } + return conf; + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java Mon May 29 15:00:39 2017 @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.lang.reflect.Method; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.spark.Partitioner; + +/** + * Spark Partitioner that wraps a custom partitioner that implements + * org.apache.hadoop.mapreduce.Partitioner interface. + * + * Since Spark's shuffle API takes a different parititioner class + * (@see org.apache.spark.Partitioner) compared to MapReduce, we need to + * wrap custom partitioners written for MapReduce inside this Spark Partitioner. + * + * MR Custom partitioners are expected to implement getPartition() with + * specific arguments: + * public int getPartition(PigNullableWritable key, Writable value, int numPartitions) + * For an example of such a partitioner, + * @see org.apache.pig.test.utils.SimpleCustomPartitioner + */ +public class MapReducePartitionerWrapper extends Partitioner { + private static final Log LOG = LogFactory.getLog(MapReducePartitionerWrapper.class); + + private int numPartitions; + private String partitionerName; + // MR's Partitioner interface is not serializable. + // And since it is not serializable, it cannot be initialized in the constructor + // (in Spark's DAG scheduler thread in Spark Driver), + // To workaround this, It will be lazily initialized inside the map task + // (Executor thread) first time that getPartitions() gets called. + transient private org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable> + mapredPartitioner = null; + transient private Method getPartitionMethod = null; + + public MapReducePartitionerWrapper(String partitionerName, + int numPartitions) { + if (partitionerName == null) { + throw new RuntimeException("MapReduce Partitioner cannot be null."); + } + + this.partitionerName = partitionerName; + this.numPartitions = numPartitions; + } + + public int numPartitions() { + return numPartitions; + } + + public int getPartition(final Object key) { + try { + + PigNullableWritable writeableKey = new PigNullableWritable() { + public Object getValueAsPigType() { + if (key instanceof IndexedKey) { + IndexedKey indexedKey = (IndexedKey) key; + this.setIndex(indexedKey.getIndex()); + return indexedKey.getKey(); + } else { + return key; + } + } + }; + + + // Lazy initialization + // Synchronized because multiple (map) tasks in the same Spark Executor + // may call getPartition, attempting to initialize at the same time. + if (mapredPartitioner == null) { + synchronized (this) { + // check again for race condition + if (mapredPartitioner == null) { + Class<?> mapredPartitionerClass = + PigContext.resolveClassName(partitionerName); + Configuration conf = new Configuration(); + mapredPartitioner = (org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>) + ReflectionUtils.newInstance(mapredPartitionerClass, conf); + getPartitionMethod = mapredPartitionerClass.getMethod( + "getPartition", + PigNullableWritable.class, + org.apache.hadoop.io.Writable.class, + int.class); + } + } + } + + // MR Parititioner getPartition takes a value argument as well, but + // Spark's Partitioner only accepts the key argument. + // In practice, MR Partitioners ignore the value. However, it's + // possible that some don't. + // TODO: We could handle this case by packaging the value inside the + // key (conditioned on some config option, since this will balloon + // memory usage). PIG-4575. + int partition = (Integer) getPartitionMethod.invoke(mapredPartitioner, + writeableKey, null, numPartitions); + + return partition; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java Mon May 29 15:00:39 2017 @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +/** + * The object of SparkEngineConf is to solve the initialization problem of PigContext.properties.get("udf.import.list"), + * UDFContext#udfConfs, UDFContext#clientSysProps in spark mode. These variables can not be + * serialized because they are ThreadLocal variables. In MR mode, they are serialized in JobConfiguration + * in JobControlCompiler#getJob and deserialized by JobConfiguration in PigGenericMapBase#setup. But there is no + * setup() in spark like what in mr, so these variables can be correctly deserialized before spark programs call them. + * Here we use following solution to solve: + * these variables are saved in SparkEngineConf#writeObject and available and then initialized + * in SparkEngineConf#readObject in spark executor thread. + */ +public class SparkEngineConf implements Serializable { + + private static final Log LOG = LogFactory.getLog(SparkEngineConf.class); + private static String SPARK_UDF_IMPORT_LIST = "pig.spark.udf.import.list"; + private static String SPARK_UDFCONTEXT_UDFCONFS = "pig.spark.udfcontext.udfConfs"; + private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS = "pig.spark.udfcontext.clientSysProps"; + + private Properties properties = new Properties(); + + public SparkEngineConf() { + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + ArrayList<String> udfImportList = (ArrayList<String>) in.readObject(); + PigContext.setPackageImportList(udfImportList); + String udfConfsStr = (String) in.readObject(); + String clientSysPropsStr = (String) in.readObject(); + UDFContext.getUDFContext().deserializeForSpark(udfConfsStr, clientSysPropsStr); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + ArrayList<String> udfImportList = Lists.newArrayList(Splitter.on(",").split(properties.getProperty(SPARK_UDF_IMPORT_LIST))); + out.writeObject(udfImportList); + //2 threads call SparkEngineConf#writeObject + //In main thread: SparkLauncher#initialize->SparkUtil#newJobConf + // ->ObjectSerializer#serialize-> SparkEngineConf#writeObject + //In dag-scheduler-event-loop thread: DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject + // + //In main thread,UDFContext#getUDFContext is not empty, we store UDFContext#udfConfs and UDFContext#clientSysProps + //into properties and serialize them. + //In dag-scheduler-event-loop thread, UDFContext#getUDFContext is empty, we get value of UDFContext#udfConfs and UDFContext#clientSysProps + //from properties and serialize them. + if (!UDFContext.getUDFContext().isUDFConfEmpty()) { + //In SparkUtil#newJobConf(), sparkEngineConf is serialized in job configuration and will call + //SparkEngineConf#writeObject(at this time UDFContext#udfConfs and UDFContext#clientSysProps is not null) + //later spark will call JavaSerializationStream.writeObject to serialize all objects when submit spark + //jobs(at that time, UDFContext#udfConfs and UDFContext#clientSysProps is null so we need to save their + //value in SparkEngineConf#properties after these two variables are correctly initialized in + //SparkUtil#newJobConf, More detailed see PIG-4920 + String udfConfsStr = UDFContext.getUDFContext().serialize(); + String clientSysPropsStr = ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSystemProps()); + this.properties.setProperty(SPARK_UDFCONTEXT_UDFCONFS, udfConfsStr); + this.properties.setProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS, clientSysPropsStr); + out.writeObject(udfConfsStr); + out.writeObject(clientSysPropsStr); + } else { + out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS)); + out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS)); + } + } + + public void setSparkUdfImportListStr(String sparkUdfImportListStr) { + this.properties.setProperty(SPARK_UDF_IMPORT_LIST, sparkUdfImportListStr); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java Mon May 29 15:00:39 2017 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.util.Properties; + +import org.apache.pig.ExecType; +import org.apache.pig.backend.executionengine.ExecutionEngine; +import org.apache.pig.impl.PigContext; + +public class SparkExecType implements ExecType { + + private static final long serialVersionUID = 1L; + private static final String mode = "SPARK"; + + @Override + public boolean accepts(Properties properties) { + String execTypeSpecified = properties.getProperty("exectype", "") + .toUpperCase(); + if (execTypeSpecified.equals(mode)) + return true; + return false; + } + + @Override + public ExecutionEngine getExecutionEngine(PigContext pigContext) { + return new SparkExecutionEngine(pigContext); + } + + @Override + public Class<? extends ExecutionEngine> getExecutionEngineClass() { + return SparkExecutionEngine.class; + } + + @Override + public boolean isLocal() { + return false; + } + + @Override + public String name() { + return "SPARK"; + } + + public String toString() { + return name(); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java Mon May 29 15:00:39 2017 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark; + +import java.util.UUID; + +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; +import org.apache.pig.backend.hadoop.executionengine.spark.streaming.SparkExecutableManager; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.spark.SparkPigStats; +import org.apache.pig.tools.pigstats.spark.SparkScriptState; + +public class SparkExecutionEngine extends HExecutionEngine { + + public SparkExecutionEngine(PigContext pigContext) { + super(pigContext); + this.launcher = new SparkLauncher(); + } + + @Override + public ScriptState instantiateScriptState() { + SparkScriptState ss = new SparkScriptState(UUID.randomUUID().toString()); + ss.setPigContext(pigContext); + return ss; + } + + @Override + public ExecutableManager getExecutableManager() { + return new SparkExecutableManager(); + } + + @Override + public PigStats instantiatePigStats() { + return new SparkPigStats(); + } +}