Author: xuefu
Date: Tue Oct 27 03:40:14 2015
New Revision: 1710718
URL: http://svn.apache.org/viewvc?rev=1710718&view=rev
Log:
PIG-4698: Enable dynamic resource allocation/de-allocation on Yarn backends
(Srikanth via Xuefu)
Modified:
pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1710718&r1=1710717&r2=1710718&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Tue
Oct 27 03:40:14 2015
@@ -140,7 +140,7 @@ Test the Pig installation with this simp
</li>
<li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a
Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x
tez).
</li>
-<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to
a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using
the -x flag (-x spark). In Spark execution mode, it is necessary to set
env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client -
yarn-client mode, mesos://host:port - spark on mesos or spark://host:port -
spark cluster. For more information refer to spark documentation on Master
Urls, <em>yarn-cluster mode is currently not supported</em>)
+<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to
a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using
the -x flag (-x spark). In Spark execution mode, it is necessary to set
env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client -
yarn-client mode, mesos://host:port - spark on mesos or spark://host:port -
spark cluster. For more information refer to spark documentation on Master
Urls, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run
on Spark can take advantage of the <a
href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation">dynamic
allocation</a> feature. The feature can be enabled by simply enabling
<em>spark.dynamicAllocation.enabled</em>. Refer to spark <a
href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation">configuration</a>
for additional configuration details. In general all properties in the pig
script prefixed with
<em>spark.</em> are copied to the Spark Application Configuration. Please
note that Yarn auxillary service need to be enabled on Spark for this to work.
See Spark documentation for additional details.
</li>
</ul>
<p></p>
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1710718&r1=1710717&r2=1710718&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue Oct 27 03:40:14 2015
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -110,6 +111,7 @@ import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
@@ -377,7 +379,7 @@ public class SparkLauncher extends Launc
Paths.get(localFile.getAbsolutePath()));
} else {
sparkContext.addFile(jarFile.toURI().toURL()
- .toExternalForm());
+ .toExternalForm());
}
}
@@ -454,14 +456,50 @@ public class SparkLauncher extends Launc
}
}
- sparkContext = new JavaSparkContext(master, "PigOnSpark",
sparkHome,
- jars.toArray(new String[jars.size()]));
+ SparkConf sparkConf = new SparkConf();
+ Properties pigCtxtProperties = pc.getProperties();
+
+ sparkConf.setMaster(master);
+ sparkConf.setAppName("PigOnSpark:" +
pigCtxtProperties.getProperty(PigContext.JOB_NAME));
+ sparkConf.setJars(jars.toArray(new String[jars.size()]));
+ if (sparkHome != null && !sparkHome.isEmpty()) {
+ sparkConf.setSparkHome(sparkHome);
+ } else {
+ LOG.warn("SPARK_HOME is not set");
+ }
+
+ //Copy all spark.* properties to SparkConf
+ for (String key : pigCtxtProperties.stringPropertyNames()) {
+ if (key.startsWith("spark.")) {
+ LOG.debug("Copying key " + key + " with value " +
+ pigCtxtProperties.getProperty(key) + " to SparkConf");
+ sparkConf.set(key, pigCtxtProperties.getProperty(key));
+ }
+ }
+
+ checkAndConfigureDynamicAllocation(master, sparkConf);
+
+ sparkContext = new JavaSparkContext(sparkConf);
sparkContext.sc().addSparkListener(new StatsReportListener());
sparkContext.sc().addSparkListener(new JobLogger());
sparkContext.sc().addSparkListener(jobMetricsListener);
}
}
+ private static void checkAndConfigureDynamicAllocation(String master,
SparkConf sparkConf) {
+ if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+ if (!master.startsWith("yarn")) {
+ LOG.warn("Dynamic allocation is enabled, but " +
+ "script isn't running on yarn. Ignoring ...");
+ }
+ if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false))
{
+ LOG.info("Spark shuffle service is being enabled as dynamic " +
+ "allocation is enabled");
+ sparkConf.set("spark.shuffle.service.enabled", "true");
+ }
+ }
+ }
+
// You can use this in unit tests to stop the SparkContext between tests.
static void stopSpark() {
if (sparkContext != null) {