[
https://issues.apache.org/jira/browse/OOZIE-2482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15267198#comment-15267198
]
Satish Subhashrao Saley commented on OOZIE-2482:
------------------------------------------------
Earlier I tried pyspark with {{yarn-cluster}} on single node cluster on my mac
and it was very easy. But running pyspark with {{yarn-cluster}} mode on
multinode cluster needs few more things.
1. When we submit a spark job, [Spark code |
https://github.com/apache/spark/blob/branch-1.6/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1047]
checks for {{PYSPARK_ARCHIVES_PATH}}. If {{PYSPARK_ARCHIVES_PATH}} is not
present then it looks for {{SPARK_HOME}}. Therefore, we must have at least one
of them set up correctly.
We can set this environment variable using {{oozie.launcher.mapred.child.env}}
property.
2. The py4j-0.9-src.zip and pyspark.zip (versions may vary based on spark
version) are necessary to run python script in spark. Therefore, we need both
of them present in classpath while executing the script. Simple way is to put
them under lib/ directory of our workflow.
3. [--py-files option |
https://github.com/apache/spark/blob/30e980ad8e6443dddd54f3c2d48b3904499545cf/docs/submitting-applications.md#bundling-your-applications-dependencies]
must be configured and passed in {{<spark-opts>}}
Settings would look like -
{code}
<spark>
<configuration>
.....
.....
<property>
<name>oozie.launcher.mapred.child.env</name>
<value>PYSPARK_ARCHIVES_PATH=pyspark.zip</value>
</property>
</configuration>
<master>yarn-cluster</master>
<name>pyspark example</name>
<jar>/hdfs/path/to/pi.py</jar>
<spark-opts>--queue satishq --conf
spark.yarn.historyServer.address=http://spark.yarn.hsaddress.com:#port --conf
spark.ui.view.acls=* --conf
spark.eventLog.dir=hdfs://hdfspath/mapred/sparkhistory --py-files
pyspark.zip,py4j-0.9-src.zip</spark-opts>
</spark>
{code}
Oozie can do some extra work to make user's life easy by setting
{{PYSPARK_ARCHIVES_PATH}}, adding --py-files option automatically by figuring
out location of pyspark.zip and py4j-0.9-src.zip based on the mapping file
provided by user in {{oozie.service.ShareLibService.mapping.file}} or from
default sharelib location if user has not provided any mapping file.
> Pyspark job fails with Oozie
> ----------------------------
>
> Key: OOZIE-2482
> URL: https://issues.apache.org/jira/browse/OOZIE-2482
> Project: Oozie
> Issue Type: Bug
> Components: core, workflow
> Affects Versions: 4.2.0
> Environment: Hadoop 2.7.2, Spark 1.6.0 on Yarn, Oozie 4.2.0
> Cluster secured with Kerberos
> Reporter: Alexandre Linte
> Assignee: Satish Subhashrao Saley
>
> Hello,
> I'm trying to run pi.py example in a pyspark job with Oozie. Every try I made
> failed for the same reason: key not found: SPARK_HOME.
> Note: A scala job works well in the environment with Oozie.
> The logs on the executors are:
> {noformat}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/mnt/hd4/hadoop/yarn/local/filecache/145/slf4j-log4j12-1.6.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/mnt/hd2/hadoop/yarn/local/filecache/155/spark-assembly-1.6.0-hadoop2.7.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/application/Hadoop/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> log4j:ERROR setFile(null,true) call failed.
> java.io.FileNotFoundException:
> /mnt/hd7/hadoop/yarn/log/application_1454673025841_13136/container_1454673025841_13136_01_000001
> (Is a directory)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:142)
> at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
> at
> org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
> at
> org.apache.hadoop.yarn.ContainerLogAppender.activateOptions(ContainerLogAppender.java:55)
> at
> org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
> at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
> at
> org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
> at
> org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:809)
> at
> org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:735)
> at
> org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:615)
> at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:502)
> at
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:547)
> at
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483)
> at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
> at
> org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
> at
> org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
> at
> org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
> at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:275)
> at
> org.apache.hadoop.service.AbstractService.<clinit>(AbstractService.java:43)
> Using properties file: null
> Parsed arguments:
> master yarn-master
> deployMode cluster
> executorMemory null
> executorCores null
> totalExecutorCores null
> propertiesFile null
> driverMemory null
> driverCores null
> driverExtraClassPath null
> driverExtraLibraryPath null
> driverExtraJavaOptions null
> supervise false
> queue null
> numExecutors null
> files null
> pyFiles null
> archives null
> mainClass null
> primaryResource
> hdfs://hadoopsandbox/User/toto/WORK/Oozie/pyspark/lib/pi.py
> name Pysparkpi example
> childArgs [100]
> jars null
> packages null
> packagesExclusions null
> repositories null
> verbose true
> Spark properties used, including those specified through
> --conf and those from the properties file null:
> spark.executorEnv.SPARK_HOME -> /opt/application/Spark/current
> spark.executorEnv.PYTHONPATH -> /opt/application/Spark/current/python
> spark.yarn.appMasterEnv.SPARK_HOME -> /opt/application/Spark/current
> Main class:
> org.apache.spark.deploy.yarn.Client
> Arguments:
> --name
> Pysparkpi example
> --primary-py-file
> hdfs://hadoopsandbox/User/toto/WORK/Oozie/pyspark/lib/pi.py
> --class
> org.apache.spark.deploy.PythonRunner
> --arg
> 100
> System properties:
> spark.executorEnv.SPARK_HOME -> /opt/application/Spark/current
> spark.executorEnv.PYTHONPATH -> /opt/application/Spark/current/python
> SPARK_SUBMIT -> true
> spark.app.name -> Pysparkpi example
> spark.submit.deployMode -> cluster
> spark.yarn.appMasterEnv.SPARK_HOME -> /opt/application/Spark/current
> spark.yarn.isPython -> true
> spark.master -> yarn-cluster
> Classpath elements:
> Failing Oozie Launcher, Main class
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, key not
> found: SPARK_HOME
> java.util.NoSuchElementException: key not found: SPARK_HOME
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at
> org.apache.spark.deploy.yarn.Client$$anonfun$findPySparkArchives$2.apply(Client.scala:1045)
> at
> org.apache.spark.deploy.yarn.Client$$anonfun$findPySparkArchives$2.apply(Client.scala:1044)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.deploy.yarn.Client.findPySparkArchives(Client.scala:1044)
> at
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:717)
> at
> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1016)
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1076)
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> at
> org.apache.oozie.action.hadoop.SparkMain.runSpark(SparkMain.java:104)
> at org.apache.oozie.action.hadoop.SparkMain.run(SparkMain.java:95)
> at
> org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:47)
> at org.apache.oozie.action.hadoop.SparkMain.main(SparkMain.java:38)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:236)
> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
> at
> org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:380)
> at
> org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:301)
> at
> org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access$200(LocalContainerLauncher.java:187)
> at
> org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler$1.run(LocalContainerLauncher.java:230)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> log4j:WARN No appenders could be found for logger
> (org.apache.hadoop.mapreduce.v2.app.MRAppMaster).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more
> info.
> {noformat}
> The workflow used for Oozie is the following:
> {noformat}
> <workflow-app xmlns='uri:oozie:workflow:0.5' name='PysparkPi-test'>
> <start to='spark-node' />
> <action name='spark-node'>
> <spark xmlns="uri:oozie:spark-action:0.1">
> <job-tracker>${jobTracker}</job-tracker>
> <name-node>${nameNode}</name-node>
> <master>${master}</master>
> <mode>${mode}</mode>
> <name>Pysparkpi example</name>
> <class></class>
>
> <jar>${nameNode}/User/toto/WORK/Oozie/pyspark/lib/pi.py</jar>
> <spark-opts>--conf
> spark.yarn.appMasterEnv.SPARK_HOME=/opt/application/Spark/current --conf
> spark.executorEnv.SPARK_HOME=/opt/application/Spark/current --conf
> spark.executorEnv.PYTHONPATH=/opt/application/Spark/current/python</spark-opts>
> <arg>100</arg>
> </spark>
> <ok to="end" />
> <error to="fail" />
> </action>
> <kill name="fail">
> <message>Workflow failed, error
> message[${wf:errorMessage(wf:lastErrorNode())}]</message>
> </kill>
> <end name='end' />
> </workflow-app>
> {noformat}
> I also created a JIRA for Spark:
> [https://issues.apache.org/jira/browse/SPARK-13679]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)