Author: zly Date: Sun Mar 12 22:18:40 2017 New Revision: 1786618 URL: http://svn.apache.org/viewvc?rev=1786618&view=rev Log: PIG-5054: Initialize SchemaTupleBackend correctly in backend in spark mode if spark job has more than 1 stage (Adam via Liyun)
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1786618&r1=1786617&r2=1786618&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Sun Mar 12 22:18:40 2017 @@ -194,7 +194,7 @@ public class SparkLauncher extends Launc convertMap.put(POLoad.class, new LoadConverter(pigContext, physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf)); convertMap.put(POStore.class, new StoreConverter(jobConf)); - convertMap.put(POForEach.class, new ForEachConverter()); + convertMap.put(POForEach.class, new ForEachConverter(jobConf)); convertMap.put(POFilter.class, new FilterConverter()); convertMap.put(POPackage.class, new PackageConverter()); convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1786618&r1=1786617&r2=1786618&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Sun Mar 12 22:18:40 2017 @@ -17,18 +17,24 @@ */ package org.apache.pig.backend.hadoop.executionengine.spark.converter; +import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; +import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.rdd.RDD; @@ -38,12 +44,22 @@ import org.apache.spark.rdd.RDD; @SuppressWarnings({"serial" }) public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> { + private JobConf jobConf; + + public ForEachConverter(JobConf jobConf) { + this.jobConf = jobConf; + } + @Override public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POForEach physicalOperator) { + + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1); RDD<Tuple> rdd = predecessors.get(0); - ForEachFunction forEachFunction = new ForEachFunction(physicalOperator); + ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes); + return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd(); } @@ -51,12 +67,18 @@ public class ForEachConverter implements FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { private POForEach poForEach; + private byte[] confBytes; + private transient JobConf jobConf; - private ForEachFunction(POForEach poForEach) { + private ForEachFunction(POForEach poForEach, byte[] confBytes) { this.poForEach = poForEach; + this.confBytes = confBytes; } public Iterable<Tuple> call(final Iterator<Tuple> input) { + + initialize(); + // Initialize a reporter as the UDF might want to report progress. PhysicalOperator.setReporter(new ProgressableReporter()); PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps(); @@ -94,5 +116,17 @@ public class ForEachConverter implements } }; } + + private void initialize() { + if (this.jobConf == null) { + try { + this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes); + PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext")); + SchemaTupleBackend.initialize(jobConf, pc); + } catch (IOException e) { + throw new RuntimeException("Couldn't initialize ForEachConverter"); + } + } + } } }