Author: xuefu
Date: Wed Feb 15 04:28:47 2017
New Revision: 1783062
URL: http://svn.apache.org/viewvc?rev=1783062&view=rev
Log:
PIG-5044: Create SparkCompiler#getSamplingJob in spark mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
Wed Feb 15 04:28:47 2017
@@ -74,6 +74,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -367,4 +368,9 @@ public class PhyPlanVisitor extends Plan
public void visitBroadcastSpark(POBroadcastSpark poBroadcastSpark) {
}
+
+ public void visitPoissonSampleSpark(
+ POPoissonSampleSpark poPoissonSampleSpark) {
+ // TODO Auto-generated method stub
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Feb 15 04:28:47 2017
@@ -87,18 +87,22 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.PoissonSampleConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SparkSampleSortConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
import
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
@@ -212,6 +216,8 @@ public class SparkLauncher extends Launc
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
convertMap.put(POPreCombinerLocalRearrange.class, new
LocalRearrangeConverter());
convertMap.put(POBroadcastSpark.class, new
BroadcastConverter(sparkContext));
+ convertMap.put(POSampleSortSpark.class, new
SparkSampleSortConverter());
+ convertMap.put(POPoissonSampleSpark.class, new
PoissonSampleConverter());
uploadResources(sparkplan);
new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext,
jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java?rev=1783062&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
Wed Feb 15 04:28:47 2017
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PoissonSampleConverter implements RDDConverter<Tuple, Tuple,
POPoissonSampleSpark> {
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POPoissonSampleSpark po) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, po, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ PoissionSampleFunction poissionSampleFunction = new
PoissionSampleFunction(po);
+ return rdd.toJavaRDD().mapPartitions(poissionSampleFunction,
false).rdd();
+ }
+
+ private static class PoissionSampleFunction implements
FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+ private final POPoissonSampleSpark po;
+
+ public PoissionSampleFunction(POPoissonSampleSpark po) {
+ this.po = po;
+ }
+
+ @Override
+ public Iterable<Tuple> call(final Iterator<Tuple> tuples) {
+
+ return new Iterable<Tuple>() {
+
+ public Iterator<Tuple> iterator() {
+ return new OutputConsumerIterator(tuples) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ po.setInputs(null);
+ po.attachInput(tuple);
+ }
+
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return po.getNextTuple();
+ }
+
+ @Override
+ protected void endOfInput() {
+ po.setEndOfInput(true);
+ }
+ };
+ }
+ };
+ }
+ }
+}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1783062&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
Wed Feb 15 04:28:47 2017
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.rdd.RDD;
+ /*
+ sort the sample data and convert the sample data to the format
(all,{(sampleEle1),(sampleEle2),...})
+
+ */
+@SuppressWarnings("serial")
+public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple,
POSampleSortSpark> {
+ private static final Log LOG =
LogFactory.getLog(SparkSampleSortConverter.class);
+ private static TupleFactory tf = TupleFactory.getInstance();
+ private static BagFactory bf = DefaultBagFactory.getInstance();
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSampleSortSpark
sortOperator)
+ throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
+ SparkUtil.<Tuple, Object> getTuple2Manifest());
+
+ JavaPairRDD<Tuple, Object> r = new JavaPairRDD<Tuple, Object>(rddPair,
+ SparkUtil.getManifest(Tuple.class),
+ SparkUtil.getManifest(Object.class));
+ //sort sample data
+ JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
+ //convert every element in sample data from element to (all, element)
format
+ JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new
AggregateFunction());
+ //use groupByKey to aggregate all values( the format will be
((all),{(sampleEle1),(sampleEle2),...} )
+ JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new
ToValueFunction());
+ return groupByKey.rdd();
+ }
+
+
+ private static class MergeFunction implements
org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple>
+ , Serializable {
+
+ @Override
+ public Tuple call(Tuple v1, Tuple v2) {
+ Tuple res = tf.newTuple();
+ res.append(v1);
+ res.append(v2);
+ LOG.info("MergeFunction out:"+res);
+ return res;
+ }
+ }
+
+ // input: Tuple2<Tuple,Object>
+ // output: Tuple2("all", Tuple)
+ private static class AggregateFunction implements
+ PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>,
String,Tuple>, Serializable {
+
+ private class Tuple2TransformIterable implements
Iterable<Tuple2<String,Tuple>> {
+
+ Iterator<Tuple2<Tuple, Object>> in;
+
+ Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+ in = input;
+ }
+
+ public Iterator<Tuple2<String,Tuple>> iterator() {
+ return new IteratorTransform<Tuple2<Tuple, Object>,
Tuple2<String,Tuple>>(in) {
+ @Override
+ protected Tuple2<String,Tuple> transform(Tuple2<Tuple,
Object> next) {
+ LOG.info("AggregateFunction in:"+ next._1()) ;
+ return new Tuple2<String,Tuple>("all",next._1());
+ }
+ };
+ }
+ }
+
+ @Override
+ public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple,
Object>> input) throws Exception {
+ return new Tuple2TransformIterable(input);
+ }
+
+ }
+
+ private static class ToValueFunction implements Function<Tuple2<String,
Iterable<Tuple>>, Tuple> {
+ @Override
+ public Tuple call(Tuple2<String, Iterable<Tuple>> next) throws
Exception {
+ Tuple res = tf.newTuple();
+ res.append(next._1());
+ Iterator<Tuple> iter = next._2().iterator();
+ DataBag bag = bf.newDefaultBag();
+ while(iter.hasNext()) {
+ bag.add(iter.next());
+ }
+ res.append(bag);
+ LOG.info("ToValueFunction1 out:" + res);
+ return res;
+ }
+ }
+
+ private static class ToKeyValueFunction extends
+ AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+ Serializable {
+
+ @Override
+ public Tuple2<Tuple, Object> apply(Tuple t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Sort ToKeyValueFunction in " + t);
+ }
+ Tuple key = t;
+ Object value = null;
+ // (key, value)
+ Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Sort ToKeyValueFunction out " + out);
+ }
+ return out;
+ }
+ }
+}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1783062&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
Wed Feb 15 04:28:47 2017
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POPoissonSampleSpark extends PhysicalOperator {
+ private static final Log LOG =
LogFactory.getLog(POPoissonSampleSpark.class);
+ private static final long serialVersionUID = 1L;
+
+ // 17 is not a magic number. It can be obtained by using a poisson
+ // cumulative distribution function with the mean set to 10 (empirically,
+ // minimum number of samples) and the confidence set to 95%
+ public static final int DEFAULT_SAMPLE_RATE = 17;
+
+ private int sampleRate = 0;
+
+ private float heapPerc = 0f;
+
+ private Long totalMemory;
+
+ private transient boolean initialized;
+
+ // num of rows sampled so far
+ private transient int numRowsSampled;
+
+ // average size of tuple in memory, for tuples sampled
+ private transient long avgTupleMemSz;
+
+ // current row number
+ private transient long rowNum;
+
+ // number of tuples to skip after each sample
+ private transient long skipInterval;
+
+ // number of tuples which have been skipped.
+ private transient long numSkipped = 0;
+
+ // bytes in input to skip after every sample.
+ // divide this by avgTupleMemSize to get skipInterval
+ private transient long memToSkipPerSample;
+
+ // has the special row with row number information been returned
+ private transient boolean numRowSplTupleReturned;
+
+ // new Sample result
+ private transient Result newSample;
+
+ // Only for Spark
+ private boolean endOfInput = false;
+
+ public boolean isEndOfInput() {
+ return endOfInput;
+ }
+
+ public void setEndOfInput(boolean isEndOfInput) {
+ endOfInput = isEndOfInput;
+ }
+
+ public POPoissonSampleSpark(OperatorKey k, int rp, int sr, float hp, long
tm) {
+ super(k, rp, null);
+ sampleRate = sr;
+ heapPerc = hp;
+ if (tm != -1) {
+ totalMemory = tm;
+ }
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitPoissonSampleSpark(this);
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ if (!initialized) {
+ numRowsSampled = 0;
+ avgTupleMemSz = 0;
+ rowNum = 0;
+ skipInterval = -1;
+ if (totalMemory == null) {
+ // Initialize in backend to get memory of task
+ totalMemory = Runtime.getRuntime().maxMemory();
+ }
+ long availRedMem = (long) (totalMemory * heapPerc);
+ memToSkipPerSample = availRedMem/sampleRate;
+ initialized = true;
+ }
+ if (numRowSplTupleReturned) {
+ // row num special row has been returned after all inputs
+ // were read, nothing more to read
+ return RESULT_EOP;
+ }
+
+ Result res;
+ res = processInput();
+
+ // if reaches at the end, pick last sampled record and return
+ if (this.isEndOfInput() && newSample != null) {
+ return createNumRowTuple((Tuple)newSample.result);
+ }
+
+ // just return to read next record from input
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ return new Result(POStatus.STATUS_NULL, null);
+ } else if (res.returnStatus == POStatus.STATUS_EOP
+ || res.returnStatus == POStatus.STATUS_ERR) {
+ return res;
+ }
+
+ // got a 'OK' record
+ rowNum++;
+
+ if (numSkipped < skipInterval) {
+ numSkipped++;
+
+ // skip this tuple, and continue to read from input
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // pick this record as sampled
+ newSample = res;
+ numSkipped = 0;
+ Result pickedSample = newSample;
+ updateSkipInterval((Tuple) pickedSample.result);
+
+ LOG.debug("pickedSample:");
+ if(pickedSample.result!=null){
+ for(int i=0;i<((Tuple) pickedSample.result).size();i++) {
+ LOG.debug("the "+i+" ele:"+((Tuple)
pickedSample.result).get(i));
+ }
+ }
+ return pickedSample;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "PoissonSample - " + mKey.toString();
+ }
+
+ /**
+ * Update the average tuple size base on newly sampled tuple t
+ * and recalculate skipInterval
+ *
+ * @param t - tuple
+ */
+ private void updateSkipInterval(Tuple t) {
+ avgTupleMemSz =
+ ((avgTupleMemSz * numRowsSampled) + t.getMemorySize()) /
(numRowsSampled + 1);
+ skipInterval = memToSkipPerSample / avgTupleMemSz;
+
+ // skipping fewer number of rows the first few times, to reduce the
+ // probability of first tuples size (if much smaller than rest)
+ // resulting in very few samples being sampled. Sampling a little extra
+ // is OK
+ if (numRowsSampled < 5) {
+ skipInterval = skipInterval / (10 - numRowsSampled);
+ }
+
+ ++numRowsSampled;
+ }
+
+ /**
+ * @param sample - sample tuple
+ * @return - Tuple appended with special marker string column, num-rows
column
+ * @throws ExecException
+ */
+ private Result createNumRowTuple(Tuple sample) throws ExecException {
+ int sz = (sample == null) ? 0 : sample.size();
+ Tuple t = mTupleFactory.newTuple(sz + 2);
+
+ if (sample != null) {
+ for (int i = 0; i < sample.size(); i++) {
+ t.set(i, sample.get(i));
+ }
+ }
+
+ t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
+ t.set(sz + 1, rowNum);
+ numRowSplTupleReturned = true;
+ return new Result(POStatus.STATUS_OK, t);
+ }
+}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java?rev=1783062&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
Wed Feb 15 04:28:47 2017
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSampleSortSpark extends POSort {
+
+ public POSampleSortSpark(POSort sort){
+ super(sort.getOperatorKey(), sort.getRequestedParallelism(), null,
sort.getSortPlans(), sort.getMAscCols(), sort
+ .getMSortFunc());
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "POSparkSort" + "["
+ + DataType.findTypeName(resultType) + "]" + "("
+ + (super.getMSortFunc() != null ?
super.getMSortFunc().getFuncSpec() : "") + ")"
+ + " - " + mKey.toString();
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Wed Feb 15 04:28:47 2017
@@ -26,27 +26,34 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -61,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -68,14 +76,17 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POPoissonSampleSpark;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
+import org.apache.pig.impl.builtin.GetMemNumRows;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -85,7 +96,9 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -98,6 +111,7 @@ public class SparkCompiler extends PhyPl
private static final Log LOG = LogFactory.getLog(SparkCompiler.class);
private PigContext pigContext;
+ private Properties pigProperties;
// The physicalPlan that is being compiled
private PhysicalPlan physicalPlan;
@@ -691,6 +705,50 @@ public class SparkCompiler extends PhyPl
}
}
+// /**
+// * currently use regular join to replace skewedJoin
+// * Skewed join currently works with two-table inner join.
+// * More info about pig SkewedJoin, See
https://wiki.apache.org/pig/PigSkewedJoinSpec
+// *
+// * @param op
+// * @throws VisitorException
+// */
+// @Override
+// public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+// try {
+// Random r = new Random();
+// String pigKeyDistFile = "pig.keyDistFile" + r.nextInt();
+// // firstly, build sample job
+// SparkOperator sampleSparkOp = getSkewedJoinSampleJob(op);
+//
+// buildBroadcastForSkewedJoin(sampleSparkOp,
pigKeyDistFile);
+//
+// sampleSparkOp.markSampler();
+// sparkPlan.add(sampleSparkOp);
+//
+// // secondly, build the join job.
+// addToPlan(op);
+// curSparkOp.setSkewedJoinPartitionFile(pigKeyDistFile);
+//
+// // do sampling job before join job
+// sparkPlan.connect(sampleSparkOp, curSparkOp);
+//
+// phyToSparkOpMap.put(op, curSparkOp);
+// } catch (Exception e) {
+// int errCode = 2034;
+// String msg = "Error compiling operator " +
+// op.getClass().getSimpleName();
+// throw new SparkCompilerException(msg, errCode, PigException.BUG,
e);
+// }
+// }
+
+/* private void buildBroadcastForSkewedJoin(SparkOperator sampleSparkOp,
String pigKeyDistFile) throws PlanException {
+
+ POBroadcastSpark poBroadcast = new POBroadcastSpark(new
OperatorKey(scope, nig.getNextNodeId(scope)));
+ poBroadcast.setBroadcastedVariableName(pigKeyDistFile);
+ sampleSparkOp.physicalPlan.addAsLeaf(poBroadcast);
+ }*/
+
@Override
public void visitFRJoin(POFRJoin op) throws VisitorException {
try {
@@ -1141,4 +1199,359 @@ public class SparkCompiler extends PhyPl
return new
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
+
+ private static class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+ byte keyType = DataType.UNKNOWN;
+
+ FindKeyTypeVisitor(PhysicalPlan plan) {
+ super(plan,
+ new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitProject(POProject p) throws VisitorException {
+ keyType = p.getResultType();
+ }
+ }
+
+
+ /**
+ * build a POPoissonSampleSpark operator for SkewedJoin's sampling job
+ */
+ private void addSampleOperatorForSkewedJoin(SparkOperator sampleSparkOp)
+ throws PlanException {
+ Configuration conf =
ConfigurationUtil.toConfiguration(pigProperties);
+ int sampleRate = conf.getInt(
+
PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE,
+ POPoissonSampleSpark.DEFAULT_SAMPLE_RATE);
+ float heapPerc = conf.getFloat(
+ PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
+ PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+ long totalMemory = conf.getLong(
+ PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1);
+
+ POPoissonSampleSpark poSample = new POPoissonSampleSpark(
+ new OperatorKey(scope,
nig.getNextNodeId(scope)), -1,
+ sampleRate, heapPerc, totalMemory);
+
+ sampleSparkOp.physicalPlan.addAsLeaf(poSample);
+ }
+
+ private SparkOperator getSortJob(
+ POSort sort,
+ SparkOperator quantJob,
+ FileSpec lFile,
+ FileSpec quantFile,
+ int rp, Pair<POProject, Byte>[] fields) throws PlanException {
+ SparkOperator sparkOper = startNew(lFile, quantJob);
+ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+ byte keyType = DataType.UNKNOWN;
+ if (fields == null) {
+ // This is project *
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ eps1.add(ep);
+ } else {
+ /*
+ for (int i : fields) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ eps1.add(ep);
+ }
+ */
+ // Attach the sort plans to the local rearrange to get the
+ // projection.
+ eps1.addAll(sort.getSortPlans());
+
+ // Visit the first sort plan to figure out our key type. We only
+ // have to visit the first because if we have more than one plan,
+ // then the key type will be tuple.
+ try {
+ FindKeyTypeVisitor fktv =
+ new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+ fktv.visit();
+ keyType = fktv.keyType;
+ } catch (VisitorException ve) {
+ int errCode = 2035;
+ String msg = "Internal error. Could not compute key type of
sort operator.";
+ throw new PlanException(msg, errCode, PigException.BUG, ve);
+ }
+ }
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created
POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ lr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE :
+ keyType);
+ lr.setPlans(eps1);
+ lr.setResultType(DataType.TUPLE);
+ lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+ sparkOper.physicalPlan.addAsLeaf(lr);
+
+ sparkOper.setGlobalSort(true);
+ pigContext.getProperties().setProperty("pig.reduce.keytype",
Byte.toString(lr.getKeyType()));
+ sparkOper.requestedParallelism = rp;
+ sparkOper.physicalPlan.addAsLeaf(sort);
+
+ long limit = sort.getLimit();
+ if (limit != -1) {
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ pLimit2.setLimit(limit);
+ sparkOper.physicalPlan.addAsLeaf(pLimit2);
+ sparkOper.markLimitAfterSort();
+ }
+
+ return sparkOper;
+ }
+
+ /**
+ * Create a sampling job to collect statistics by sampling an input file.
The sequence of operations is as
+ * following:
+ * <li>Transform input sample tuples into another tuple.</li>
+ * <li>Add an extra field "all" into the tuple </li>
+ * <li>Package all tuples into one bag </li>
+ * <li>Add constant field for number of reducers. </li>
+ * <li>Sorting the bag </li>
+ * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
+ * <li>Data generated by UDF is stored into a file.</li>
+ *
+ * @param sort the POSort operator used to sort the bag
+ * @param sampleOperator current sampling job
+ * @param rp configured parallemism
+ * @param udfClassName the class name of UDF
+ * @param udfArgs the arguments of UDF
+ * @return pair<SparkOper,integer>
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ @SuppressWarnings("deprecation")
+ private SparkOperator getSamplingJob(POSort sort, SparkOperator
sampleOperator, List<PhysicalPlan>
+ transformPlans,
+ int rp,
+ String udfClassName, String[]
udfArgs) throws PlanException,
+ VisitorException, ExecException {
+ addSampleOperatorForSkewedJoin(sampleOperator);
+ List<Boolean> flat1 = new ArrayList<Boolean>();
+ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+ // if transform plans are not specified, project the columns of
sorting keys
+ if (transformPlans == null) {
+ Pair<POProject, Byte>[] sortProjs = null;
+ try {
+ sortProjs = getSortCols(sort.getSortPlans());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ // Set up the projections of the key columns
+ if (sortProjs == null) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ eps1.add(ep);
+ flat1.add(false);
+ } else {
+ for (Pair<POProject, Byte> sortProj : sortProjs) {
+ // Check for proj being null, null is used by getSortCols
for a non POProject
+ // operator. Since Order by does not allow expression
operators,
+ //it should never be set to null
+ if (sortProj == null) {
+ int errCode = 2174;
+ String msg = "Internal exception. Could not create a
sampler job";
+ throw new SparkCompilerException(msg, errCode,
PigException.BUG);
+ }
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj;
+ try {
+ prj = sortProj.first.clone();
+ } catch (CloneNotSupportedException e) {
+ //should not get here
+ throw new AssertionError(
+ "Error cloning project caught exception" + e
+ );
+ }
+ ep.add(prj);
+ eps1.add(ep);
+ flat1.add(false);
+ }
+ }
+ } else {
+ for (int i = 0; i < transformPlans.size(); i++) {
+ eps1.add(transformPlans.get(i));
+ flat1.add(i == transformPlans.size() - 1 ? true : false);
+ }
+ }
+ // This foreach will pick the sort key columns from the
RandomSampleLoader output
+ POForEach nfe1 = new POForEach(new OperatorKey(scope,
nig.getNextNodeId(scope)), -1, eps1, flat1);
+ sampleOperator.physicalPlan.addAsLeaf(nfe1);
+
+ //sort the sample
+ POSampleSortSpark poSparkSampleSort = new POSampleSortSpark(sort);
+ sampleOperator.physicalPlan.addAsLeaf(poSparkSampleSort);
+
+ // for the foreach
+ PhysicalPlan fe2Plan = new PhysicalPlan();
+ POProject topPrj = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ topPrj.setColumn(1);
+ topPrj.setResultType(DataType.BAG);
+ topPrj.setOverloaded(true);
+ fe2Plan.add(topPrj);
+
+
+ // The plan which will have a constant representing the
+ // degree of parallelism for the final order by map-reduce job
+ // this will either come from a "order by parallel x" in the script
+ // or will be the default number of reducers for the cluster if
+ // "parallel x" is not used in the script
+ PhysicalPlan rpep = new PhysicalPlan();
+ ConstantExpression rpce = new ConstantExpression(new
OperatorKey(scope, nig.getNextNodeId(scope)));
+ rpce.setRequestedParallelism(rp);
+
+ // We temporarily set it to rp and will adjust it at runtime, because
the final degree of parallelism
+ // is unknown until we are ready to submit it. See PIG-2779.
+ rpce.setValue(rp);
+
+ rpce.setResultType(DataType.INTEGER);
+ rpep.add(rpce);
+
+ List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
+ genEps.add(rpep);
+ genEps.add(fe2Plan);
+
+ List<Boolean> flattened2 = new ArrayList<Boolean>();
+ flattened2.add(false);
+ flattened2.add(false);
+
+ POForEach nfe2 = new POForEach(new OperatorKey(scope,
nig.getNextNodeId(scope)), -1, genEps, flattened2);
+ sampleOperator.physicalPlan.addAsLeaf(nfe2);
+
+ // Let's connect the output from the foreach containing
+ // number of quantiles and the sorted bag of samples to
+ // another foreach with the FindQuantiles udf. The input
+ // to the FindQuantiles udf is a project(*) which takes the
+ // foreach input and gives it to the udf
+ PhysicalPlan ep4 = new PhysicalPlan();
+ POProject prjStar4 = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ prjStar4.setResultType(DataType.TUPLE);
+ prjStar4.setStar(true);
+ ep4.add(prjStar4);
+
+ List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+ ufInps.add(prjStar4);
+
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,
nig.getNextNodeId(scope)), -1, ufInps,
+ new FuncSpec(udfClassName, udfArgs));
+ ep4.add(uf);
+ ep4.connect(prjStar4, uf);
+
+ List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+ ep4s.add(ep4);
+ List<Boolean> flattened3 = new ArrayList<Boolean>();
+ flattened3.add(false);
+ POForEach nfe3 = new POForEach(new OperatorKey(scope,
nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+
+ sampleOperator.physicalPlan.addAsLeaf(nfe3);
+
+ sampleOperator.requestedParallelism = 1;
+ sampleOperator.markSampler();
+ return sampleOperator;
+ }
+
+ private Pair<POProject, Byte>[] getSortCols(List<PhysicalPlan> plans)
throws PlanException, ExecException {
+ if (plans != null) {
+ @SuppressWarnings("unchecked")
+ Pair<POProject, Byte>[] ret = new Pair[plans.size()];
+ int i = -1;
+ for (PhysicalPlan plan : plans) {
+ PhysicalOperator op = plan.getLeaves().get(0);
+ POProject proj;
+ if (op instanceof POProject) {
+ if (((POProject) op).isStar()) return null;
+ proj = (POProject) op;
+ } else {
+ proj = null;
+ }
+ byte type = op.getResultType();
+ ret[++i] = new Pair<POProject, Byte>(proj, type);
+ }
+ return ret;
+ }
+ int errCode = 2026;
+ String msg = "No expression plan found in POSort.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ /**
+ * Create Sampling job for skewed join.
+ */
+ private SparkOperator getSkewedJoinSampleJob(POSkewedJoin skewedJoin)
throws PlanException, VisitorException {
+ try {
+ SparkOperator sampleOperator = new SparkOperator(new
OperatorKey(scope, nig.getNextNodeId(scope)));
+ sampleOperator.physicalPlan =
compiledInputs[0].physicalPlan.clone();
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans =
skewedJoin.getJoinPlans();
+
+ List<PhysicalOperator> l =
physicalPlan.getPredecessors(skewedJoin);
+ List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+ List<Boolean> ascCol = new ArrayList<Boolean>();
+ for (int i = 0; i < groups.size(); i++) {
+ ascCol.add(false);
+ }
+
+ POSort sort = new POSort(skewedJoin.getOperatorKey(),
skewedJoin.getRequestedParallelism(), null, groups,
+ ascCol, null);
+
+ // set up transform plan to get keys and memory size of input
tuples
+ // it first adds all the plans to get key columns,
+ List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+ transformPlans.addAll(groups);
+
+ // then it adds a column for memory size
+ POProject prjStar = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
+ prjStar.setResultType(DataType.TUPLE);
+ prjStar.setStar(true);
+
+ List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+ ufInps.add(prjStar);
+
+ PhysicalPlan ep = new PhysicalPlan();
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,
nig.getNextNodeId(scope)), -1, ufInps,
+ new FuncSpec(GetMemNumRows.class.getName(), (String[])
null));
+ uf.setResultType(DataType.TUPLE);
+ ep.add(uf);
+ ep.add(prjStar);
+ ep.connect(prjStar, uf);
+
+ transformPlans.add(ep);
+ // pass configurations to the User Function
+ String per =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
+
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
+ String mc =
pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+
+ return getSamplingJob(sort, sampleOperator, transformPlans,
skewedJoin.getRequestedParallelism(),
+ PartitionSkewedKeys.class.getName(), new String[]{per,
mc});
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " +
+ skewedJoin.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG,
e);
+ }
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1783062&r1=1783061&r2=1783062&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Wed Feb 15 04:28:47 2017
@@ -84,6 +84,15 @@ public class SparkOperator extends Opera
private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap
= new MultiMap<OperatorKey, OperatorKey>();
+ // Indicates if a UDF comparator is used
+ boolean isUDFComparatorUsed = false;
+
+ //The quantiles file name if globalSort is true
+ private String quantFile;
+
+ //Indicates if this job is an order by job
+ private boolean globalSort = false;
+
public SparkOperator(OperatorKey k) {
super(k);
physicalPlan = new PhysicalPlan();
@@ -277,4 +286,13 @@ public class SparkOperator extends Opera
public MultiMap<OperatorKey, OperatorKey>
getMultiQueryOptimizeConnectionItem() {
return multiQueryOptimizeConnectionMap;
}
+
+ public void setGlobalSort(boolean globalSort) {
+ this.globalSort = globalSort;
+ }
+
+ public boolean isGlobalSort() {
+ return globalSort;
+ }
+
}