Author: xuefu
Date: Thu Feb 18 14:02:40 2016
New Revision: 1731077
URL: http://svn.apache.org/viewvc?rev=1731077&view=rev
Log:
PIG-4601: Implement Merge CoGroup for Spark engine (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1731077&r1=1731076&r2=1731077&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Feb 18 14:02:40 2016
@@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
@@ -76,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
@@ -202,6 +204,7 @@ public class SparkLauncher extends Launc
convertMap.put(PORank.class, new RankConverter());
convertMap.put(POStream.class, new StreamConverter(confBytes));
convertMap.put(POFRJoin.class, new FRJoinConverter());
+ convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
convertMap.put(POPreCombinerLocalRearrange.class, new
LocalRearrangeConverter());
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1731077&r1=1731076&r2=1731077&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Thu Feb 18 14:02:40 2016
@@ -18,29 +18,38 @@
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import java.util.UUID;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.rdd.RDD;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
public class SparkUtil {
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -119,4 +128,25 @@ public class SparkUtil {
return new MapReducePartitionerWrapper(customPartitioner,
parallelism);
}
}
-}
+
+ // createIndexerSparkNode is a utility to create an indexer spark node
with baseSparkOp
+ static public void createIndexerSparkNode(SparkOperator baseSparkOp,
String scope, NodeIdGenerator nig) throws PlanException, ExecException {
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ eps.add(ep);
+
+ List<Boolean> ascCol = new ArrayList<Boolean>();
+ ascCol.add(true);
+
+ int requestedParallelism = baseSparkOp.requestedParallelism;
+ POSort sort = new POSort(new OperatorKey(scope,
nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
+ //POSort is added to sort the index tuples genereated by
MergeJoinIndexer.More detail, see PIG-4601
+ baseSparkOp.physicalPlan.addAsLeaf(sort);
+ }
+}
\ No newline at end of file
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1731077&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
Thu Feb 18 14:02:40 2016
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+
+public class MergeCogroupConverter implements RDDConverter<Tuple, Tuple,
POMergeCogroup> {
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POMergeCogroup physicalOperator) {
+ SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ MergeCogroupFunction mergeCogroupFunction = new
MergeCogroupFunction(physicalOperator);
+ return rdd.toJavaRDD().mapPartitions(mergeCogroupFunction, true).rdd();
+ }
+
+ private static class MergeCogroupFunction implements
+ FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+
+ private POMergeCogroup poMergeCogroup;
+
+ @Override
+ public Iterable<Tuple> call(final Iterator<Tuple> input) throws
Exception {
+ return new Iterable<Tuple>() {
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new OutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ poMergeCogroup.setInputs(null);
+ poMergeCogroup.attachInput(tuple);
+ }
+
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poMergeCogroup.getNextTuple();
+ }
+
+ @Override
+ protected void endOfInput() {
+ poMergeCogroup.getParentPlan().endOfAllInput =
true;
+ }
+ };
+ }
+ };
+ }
+
+ private MergeCogroupFunction(POMergeCogroup poMergeCogroup) {
+ this.poMergeCogroup = poMergeCogroup;
+ }
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1731077&r1=1731076&r2=1731077&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Thu Feb 18 14:02:40 2016
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.spark.plan;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -30,10 +31,16 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -47,6 +54,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -60,11 +68,11 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.builtin.LOG;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -73,6 +81,8 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
/**
* The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -801,4 +811,173 @@ public class SparkCompiler extends PhyPl
finPlan.merge(e);
}
}
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws
VisitorException {
+ if (compiledInputs.length < 2) {
+ int errCode = 2251;
+ String errMsg = "Merge Cogroup work on two or more relations." +
+ "To use map-side group-by on single relation, use
'collected' qualifier.";
+ throw new SparkCompilerException(errMsg, errCode);
+ }
+
+ List<FuncSpec> funcSpecs = new
ArrayList<FuncSpec>(compiledInputs.length - 1);
+ List<String> fileSpecs = new ArrayList<String>(compiledInputs.length -
1);
+ List<String> loaderSigns = new ArrayList<String>(compiledInputs.length
- 1);
+
+ try {
+ poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
+
+ // Iterate through all the SparkOpererators, disconnect side
SparkOperators from
+ // SparkOperator and collect all the information needed in
different lists.
+
+ for (int i = 0; i < compiledInputs.length; i++) {
+ SparkOperator sparkOper = compiledInputs[i];
+ PhysicalPlan plan = sparkOper.physicalPlan;
+ if (plan.getRoots().size() != 1) {
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root
physical operator in physical plan.";
+ throw new SparkCompilerException(errMsg, errCode,
PigException.BUG);
+ }
+
+ PhysicalOperator rootPOOp = plan.getRoots().get(0);
+ if (!(rootPOOp instanceof POLoad)) {
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be
POLoad. Found : " + rootPOOp.getClass().getCanonicalName();
+ throw new SparkCompilerException(errMsg, errCode);
+ }
+
+ POLoad sideLoader = (POLoad) rootPOOp;
+ FileSpec loadFileSpec = sideLoader.getLFile();
+ FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+ LoadFunc loadfunc = sideLoader.getLoadFunc();
+ if (i == 0) {
+
+ if
(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+ int errCode = 2252;
+ throw new SparkCompilerException("Base loader in
Cogroup must implement CollectableLoadFunc.", errCode);
+ }
+
+ ((CollectableLoadFunc)
loadfunc).ensureAllKeyInstancesInSameSplit();
+ continue;
+ }
+ if
(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+ int errCode = 2253;
+ throw new SparkCompilerException("Side loaders in cogroup
must implement IndexableLoadFunc.", errCode);
+ }
+
+ funcSpecs.add(funcSpec);
+ fileSpecs.add(loadFileSpec.getFileName());
+ loaderSigns.add(sideLoader.getSignature());
+ sparkPlan.remove(sparkOper);
+ }
+
+ poCoGrp.setSideLoadFuncs(funcSpecs);
+ poCoGrp.setSideFileSpecs(fileSpecs);
+ poCoGrp.setLoaderSignatures(loaderSigns);
+
+ // Use spark operator of base relation for the cogroup operation.
+ SparkOperator baseSparkOp =
phyToSparkOpMap.get(poCoGrp.getInputs().get(0));
+
+ // Create a spark operator to generate index file for tuples from
leftmost relation
+ SparkOperator indexerSparkOp = getSparkOp();
+ FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp,
poCoGrp.getLRInnerPlansOf(0));
+ poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+ poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+ baseSparkOp.physicalPlan.addAsLeaf(poCoGrp);
+ for (FuncSpec funcSpec : funcSpecs)
+ baseSparkOp.UDFs.add(funcSpec.toString());
+
+ sparkPlan.add(indexerSparkOp);
+ sparkPlan.connect(indexerSparkOp, baseSparkOp);
+ phyToSparkOpMap.put(poCoGrp, baseSparkOp);
+ curSparkOp = baseSparkOp;
+ } catch (ExecException e) {
+ throw new SparkCompilerException(e.getDetailedMessage(),
e.getErrorCode(), e.getErrorSource(), e);
+ } catch (SparkCompilerException mrce) {
+ throw (mrce);
+ } catch (CloneNotSupportedException e) {
+ throw new SparkCompilerException(e);
+ } catch (PlanException e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " +
poCoGrp.getClass().getCanonicalName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG,
e);
+ } catch (IOException e) {
+ int errCode = 3000;
+ String errMsg = "IOException caught while compiling
POMergeCoGroup";
+ throw new SparkCompilerException(errMsg, errCode, e);
+ }
+ }
+
+ // Sets up the indexing job for single-stage cogroups.
+ private FileSpec getIndexingJob(SparkOperator indexerSparkOp,
+ final SparkOperator baseSparkOp, final
List<PhysicalPlan> mapperLRInnerPlans)
+ throws SparkCompilerException, PlanException, ExecException,
IOException, CloneNotSupportedException {
+
+ // First replace loader with MergeJoinIndexer.
+ PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan;
+ POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0);
+ FileSpec origLoaderFileSpec = baseLoader.getLFile();
+ FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+ LoadFunc loadFunc = baseLoader.getLoadFunc();
+
+ if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) {
+ int errCode = 1104;
+ String errMsg = "Base relation of merge-coGroup must implement " +
+ "OrderedLoadFunc interface. The specified loader "
+ + funcSpec + " doesn't implement it";
+ throw new SparkCompilerException(errMsg, errCode);
+ }
+
+ String[] indexerArgs = new String[6];
+ indexerArgs[0] = funcSpec.toString();
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)
mapperLRInnerPlans);
+ indexerArgs[3] = baseLoader.getSignature();
+ indexerArgs[4] = baseLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+ PhysicalPlan phyPlan;
+ if (baseMapPlan.getSuccessors(baseLoader) == null
+ || baseMapPlan.getSuccessors(baseLoader).isEmpty()) {
+ // Load-Load-Cogroup case.
+ phyPlan = null;
+ } else { // We got something. Yank it and set it as inner plan.
+ phyPlan = baseMapPlan.clone();
+ PhysicalOperator root = phyPlan.getRoots().get(0);
+ phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+ phyPlan.remove(root);
+
+ }
+ indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+ POLoad idxJobLoader = getLoad();
+ idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+ new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+ indexerSparkOp.physicalPlan.add(idxJobLoader);
+
indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
+
+ // Loader of sparkOp will return a tuple of form -
+ // (key1, key2, .. , WritableComparable, splitIndex). See
MergeJoinIndexer for details.
+ // Create a spark node to retrieve index file by MergeJoinIndexer
+ SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig);
+
+ POStore st = getStore();
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ indexerSparkOp.physicalPlan.addAsLeaf(st);
+
+ return strFile;
+ }
+
+ /**
+ * Returns a temporary DFS Path
+ *
+ * @return
+ * @throws IOException
+ */
+ private FileSpec getTempFileSpec() throws IOException {
+ return new
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
+ new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1731077&r1=1731076&r2=1731077&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Thu Feb 18 14:02:40 2016
@@ -22,13 +22,17 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -40,12 +44,18 @@ public class PigInputFormatSpark extends
InterruptedException {
init();
resetUDFContext();
- RecordReader recordReader = super.createRecordReader(split, context);
//PigSplit#conf is the default hadoop configuration, we need get the
configuration
//from context.getConfigration() to retrieve pig properties
PigSplit pigSplit = (PigSplit) split;
- pigSplit.setConf(context.getConfiguration());
- return recordReader;
+ Configuration conf = context.getConfiguration();
+ pigSplit.setConf(conf);
+ //Set current splitIndex in
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+ //which will be used in POMergeCogroup#setup
+ if (PigMapReduce.sJobContext == null) {
+ PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new
JobID());
+ }
+
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
pigSplit.getSplitIndex());
+ return super.createRecordReader(split, context);
}
private void resetUDFContext() {
Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1731077&r1=1731076&r2=1731077&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Feb
18 14:02:40 2016
@@ -311,12 +311,11 @@ public class TestMapSideCogroup {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4
+ "' using "+ DummyCollectableLoader.class.getName() +"() as
(c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+
DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
- DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
+ List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
@@ -335,12 +334,29 @@ public class TestMapSideCogroup {
"(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
};
- assertEquals(9, dbMergeCogrp.size());
+ List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
+
+ //We need sort dbMergeCogrp because the result is different in
sequence between spark and other mode when
+ //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
+ for (Tuple t : dbMergeCogrp) {
+ Util.convertBagToSortedBag(t);
+ }
+ for (Tuple t : expected) {
+ Util.convertBagToSortedBag(t);
+ }
+
+ Collections.sort(dbMergeCogrp);
+ Collections.sort(expected);
+ assertEquals(dbMergeCogrp.size(), expected.size());
+
+ //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
+ //apply schema for each input
tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
+ // The schema for C is
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
+ //But the schema for result "dbMergeCogrp" is
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
Iterator<Tuple> itr = dbMergeCogrp.iterator();
- for(int i=0; i<9; i++){
- assertEquals(itr.next().toString(), results[i]);
+ for (int i = 0; i < dbMergeCogrp.size(); i++) {
+ assertEquals(itr.next().toString(), expected.get(i).toString());
}
- assertFalse(itr.hasNext());
}
@Test