[ https://issues.apache.org/jira/browse/SPARK-21935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173110#comment-16173110 ]
Nikolaos Tsipas commented on SPARK-21935: ----------------------------------------- After a few trial and error iterations managed to find a set of config params (executors number, executor memory, memoryOverhead) that finishes the job without failing. In the process realised the following: - It looks like our problem had to do with the amount of memory available per task in the executor. Specifically, on an 8 core 15GB slave node we were using 1 executor with 8 tasks/cores, 4GB heap and 6GB off heap. This always ended with ExecutorLostFailure errors. After the number of tasks was reduced to 3 (of course we were underutilising cpu resources in this case) the job completed successfully. -- Theoretically the above could be also fixed by increasing parallelism and consequently reducing the amount of memory required per task but that approach didn't work when it was investigated. - Dynamic allocation is based on the amount of MEMORY available on the slave nodes. i.e. if 15GB and 8 cores are available and you ask for 4 executors with 1GB and 2 cores spark/yarn will give you back 15 executors with 2 cores and 1 GB memory because it will try to optimise resources utilisation for memory (i.e. 1GB * 15 executors = 15GB). Of course you will be running 15*2=30 tasks in parallel which is too much for the 8 cores you have available. Keeping things balanced is a trial and error procedure. (Yarn can be tuned in spark to take into account the available cores as well and iirc I've done it in the past but I can't find it now) > Pyspark UDF causing ExecutorLostFailure > ---------------------------------------- > > Key: SPARK-21935 > URL: https://issues.apache.org/jira/browse/SPARK-21935 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.1.0 > Reporter: Nikolaos Tsipas > Labels: pyspark, udf > Attachments: cpu.png, Screen Shot 2017-09-06 at 11.30.28.png, Screen > Shot 2017-09-06 at 11.31.13.png, Screen Shot 2017-09-06 at 11.31.31.png > > > Hi, > I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as > follows: > {code} > from pyspark.sql.functions import col, udf > from pyspark.sql.types import StringType > path = 's3://some/parquet/dir/myfile.parquet' > df = spark.read.load(path) > def _test_udf(useragent): > return useragent.upper() > test_udf = udf(_test_udf, StringType()) > df = df.withColumn('test_field', test_udf(col('std_useragent'))) > df.write.parquet('/output.parquet') > {code} > The following config is used in {{spark-defaults.conf}} (using > {{maximizeResourceAllocation}} in EMR) > {code} > ... > spark.executor.instances 4 > spark.executor.cores 8 > spark.driver.memory 8G > spark.executor.memory 9658M > spark.default.parallelism 64 > spark.driver.maxResultSize 3G > ... > {code} > The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, > 15 GiB memory, 160 SSD GB storage > The above example fails every single time with errors like the following: > {code} > 17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, > ip-172-31-7-125.eu-west-1.compute.internal, executor 10): ExecutorLostFailure > (executor 10 exited caused by one of the running tasks) Reason: Container > killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical > memory used. Consider boosting spark.yarn.executor.memoryOverhead. > {code} > I tried to increase the {{spark.yarn.executor.memoryOverhead}} to 3000 which > delays the errors but eventually I get them before the end of the job. The > job eventually fails. > !Screen Shot 2017-09-06 at 11.31.31.png|width=800! > If I run the above job in scala everything works as expected (without having > to adjust the memoryOverhead) > {code} > import org.apache.spark.sql.functions.udf > val upper: String => String = _.toUpperCase > val df = spark.read.load("s3://some/parquet/dir/myfile.parquet") > val upperUDF = udf(upper) > val newdf = df.withColumn("test_field", upperUDF(col("std_useragent"))) > newdf.write.parquet("/output.parquet") > {code} > !Screen Shot 2017-09-06 at 11.31.13.png|width=800! > Cpu utilisation is very bad with pyspark > !cpu.png|width=800! > Is this a known bug with pyspark and udfs or is it a matter of bad > configuration? > Looking forward to suggestions. Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org