Author: xuefu
Date: Tue Dec 13 03:41:31 2016
New Revision: 1773917
URL: http://svn.apache.org/viewvc?rev=1773917&view=rev
Log:
PIG-4952: Calculate the value of parallism for spark mode (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue Dec 13 03:41:31 2016
@@ -664,9 +664,9 @@ public class SparkLauncher extends Launc
SchemaTupleBackend.initialize(jobConf, pigContext);
Utils.setDefaultTimeZone(jobConf);
PigMapReduce.sJobConfInternal.set(jobConf);
- String sparkReducers =
pigContext.getProperties().getProperty("spark.reducers");
- if (sparkReducers != null) {
- SparkUtil.setSparkReducers(Integer.parseInt(sparkReducers));
+ String parallelism =
pigContext.getProperties().getProperty("spark.default.parallelism");
+ if (parallelism != null) {
+
SparkUtil.setSparkDefaultParallelism(Integer.parseInt(parallelism));
}
}
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Tue Dec 13 03:41:31 2016
@@ -59,7 +59,7 @@ import org.apache.spark.rdd.RDD;
public class SparkUtil {
- private static ThreadLocal<Integer> SPARK_REDUCERS = null;
+ private static ThreadLocal<Integer> SPARK_DEFAULT_PARALLELISM = null;
private static ConcurrentHashMap<String, Broadcast<List<Tuple>>>
broadcastedVars = new ConcurrentHashMap() ;
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -131,17 +131,25 @@ public class SparkUtil {
public static int getParallelism(List<RDD<Tuple>> predecessors,
PhysicalOperator physicalOperator) {
- if (SPARK_REDUCERS != null) {
- return getSparkReducers();
+ if (SPARK_DEFAULT_PARALLELISM != null) {
+ return getSparkDefaultParallelism();
}
int parallelism = physicalOperator.getRequestedParallelism();
if (parallelism <= 0) {
- // Parallelism wasn't set in Pig, so set it to whatever Spark
thinks
- // is reasonable.
- parallelism = predecessors.get(0).context().defaultParallelism();
+ //Spark automatically sets the number of "map" tasks to run on
each file according to its size (though
+ // you can control it through optional parameters to
SparkContext.textFile, etc), and for distributed
+ //"reduce" operations, such as groupByKey and reduceByKey, it uses
the largest parent RDD's number of
+ // partitions.
+ int maxParallism = 0;
+ for (int i = 0; i < predecessors.size(); i++) {
+ int tmpParallelism = predecessors.get(i).getNumPartitions();
+ if (tmpParallelism > maxParallism) {
+ maxParallism = tmpParallelism;
+ }
+ }
+ parallelism = maxParallism;
}
-
return parallelism;
}
@@ -179,11 +187,11 @@ public class SparkUtil {
}
- public static int getSparkReducers() {
- return SPARK_REDUCERS.get();
+ public static int getSparkDefaultParallelism() {
+ return SPARK_DEFAULT_PARALLELISM.get();
}
- public static void setSparkReducers(int sparkReducers) {
- SPARK_REDUCERS.set(sparkReducers);
+ public static void setSparkDefaultParallelism(int sparkDefaultParallelism)
{
+ SPARK_DEFAULT_PARALLELISM.set(sparkDefaultParallelism);
}
}
\ No newline at end of file
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
Tue Dec 13 03:41:31 2016
@@ -45,21 +45,22 @@ public class RankConverter implements RD
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, PORank poRank)
throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
+ int parallelism = SparkUtil.getParallelism(predecessors, poRank);
+ SparkUtil.assertPredecessorSize(predecessors, poRank, 1);
RDD<Tuple> rdd = predecessors.get(0);
- JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
- .mapToPair(new ToPairRdd());
- JavaPairRDD<Integer, Iterable<Long>> groupedByIndex =
javaPairRdd
- .groupByKey();
- JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
- .mapToPair(new IndexCounters());
- JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
- .sortByKey(true);
- Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
- JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
- .map(new RankFunction(new HashMap<Integer,
Long>(counts)));
- return finalRdd.rdd();
- }
+ JavaPairRDD<Integer, Long> javaPairRdd = rdd.toJavaRDD()
+ .mapToPair(new ToPairRdd());
+ JavaPairRDD<Integer, Iterable<Long>> groupedByIndex = javaPairRdd
+ .groupByKey(parallelism);
+ JavaPairRDD<Integer, Long> countsByIndex = groupedByIndex
+ .mapToPair(new IndexCounters());
+ JavaPairRDD<Integer, Long> sortedCountsByIndex = countsByIndex
+ .sortByKey(true, parallelism);
+ Map<Integer, Long> counts = sortedCountsByIndex.collectAsMap();
+ JavaRDD<Tuple> finalRdd = rdd.toJavaRDD()
+ .map(new RankFunction(new HashMap<Integer, Long>(counts)));
+ return finalRdd.rdd();
+ }
@SuppressWarnings("serial")
private static class ToPairRdd implements
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Tue Dec 13 03:41:31 2016
@@ -78,9 +78,10 @@ public class SkewedJoinConverter impleme
rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class));
+ int parallelism = SparkUtil.getParallelism(predecessors, poSkewedJoin);
// join() returns (key, (t1, t2)) where (key, t1) is in this and (key,
t2) is in other
JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue =
rdd1Pair_javaRDD
- .join(rdd2Pair_javaRDD);
+ .join(rdd2Pair_javaRDD, parallelism);
// map to get JavaRDD<Tuple> from JAvaPairRDD<IndexedKey,
Tuple2<Tuple, Tuple>> by
// ignoring the key (of type IndexedKey) and appending the values (the
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1773917&r1=1773916&r2=1773917&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
Tue Dec 13 03:41:31 2016
@@ -46,6 +46,7 @@ public class SortConverter implements RD
throws IOException {
SparkUtil.assertPredecessorSize(predecessors, sortOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
+ int parallelism = SparkUtil.getParallelism(predecessors, sortOperator);
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyValueFunction(),
SparkUtil.<Tuple, Object> getTuple2Manifest());
@@ -53,8 +54,9 @@ public class SortConverter implements RD
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class));
+
JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
- sortOperator.getMComparator(), true);
+ sortOperator.getMComparator(), true, parallelism);
JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
return mapped.rdd();