[ 
https://issues.apache.org/jira/browse/BEAM-13981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497800#comment-17497800
 ] 

Ismaël Mejía commented on BEAM-13981:
-------------------------------------

Hi Jozef, Thanks for reporting this. The referred PR is mostly a refactor and a 
simplification on how to activate or not of the eventLog via the Spark default 
configuration. The real issue was introduced on 
[https://github.com/apache/beam/pull/13743/] that is the PR that introduced the 
extra EventLoggingListener.

I suppose we did not hit the race condition before because the metrics tests on 
Spark are not currently covering the event logging path (eventLog enabled 
option) and also because maybe Spark+Hadoop initiates things differently.

I agree with you we should not be overriding this on the runner side. This is 
absolutely error-prone. I removed all the extra EventLoggingListener code and 
validated that the metrics are reported correctly. I can create a PR that 
rollback the behavior of BEAM-11213 because it is better to have metrics even 
if not clear than no metrics (aka the previous behavior), but I might be 
missing maybe the reason for this code, so let's double check with [~ibzib] who 
reviewed the original PR to see if he may remember the reasons for this change 
and what he thinks we should do.

> Job event log empty on Spark History Server
> -------------------------------------------
>
>                 Key: BEAM-13981
>                 URL: https://issues.apache.org/jira/browse/BEAM-13981
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.33.0
>            Reporter: Jozef Vilcek
>            Priority: P2
>
> After upgrade from Beam 2.24.0 -> 2.33.0 Spark jobs run on YARN after 
> complete shows empty data on History server.
> The problem seems to be a race condition and 2 
> {noformat}
> 22/02/22 10:51:11 INFO EventLoggingListener: Logging events to 
> hdfs:/user/spark/jobhistory/application_1553109013416_12079975_1
> ...
> 22/02/22 10:51:41 INFO EventLoggingListener: Logging events to 
> hdfs:/user/spark/jobhistory/application_1553109013416_12079975_1{noformat}
> At the end failure:
> {noformat}
> 22/02/22 11:17:57 INFO SchedulerExtensionServices: Stopping 
> SchedulerExtensionServices
> (serviceOption=None,
>  services=List(),
>  started=false)
> 22/02/22 11:17:57 ERROR Utils: Uncaught exception in thread Driver
> java.io.IOException: Target log file already exists 
> (hdfs:/user/spark/jobhistory/application_1553109013416_12079975_1)
>       at 
> org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:255)
>       at 
> org.apache.spark.SparkContext.$anonfun$stop$13(SparkContext.scala:1960)
>       at 
> org.apache.spark.SparkContext.$anonfun$stop$13$adapted(SparkContext.scala:1960)
>       at scala.Option.foreach(Option.scala:274)
>       at 
> org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:1960)
>       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>       at org.apache.spark.SparkContext.stop(SparkContext.scala:1960)
>       at 
> org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:654)
>       at 
> org.apache.beam.runners.spark.translation.SparkContextFactory.stopSparkContext(SparkContextFactory.java:73)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult$BatchMode.stop(SparkPipelineResult.java:133)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.offerNewState(SparkPipelineResult.java:234)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:99)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>       at 
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver.$anonfun$main$1(PipelineDriver.scala:41)
>       at scala.util.Try$.apply(Try.scala:213)
>       at 
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver.main(PipelineDriver.scala:34)
>       at 
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver.main$(PipelineDriver.scala:17)
>       at 
> com.zetaglobal.dp.dsp.jobs.dealstats.DealStatsDriver$.main(DealStatsDriver.scala:18)
>       at 
> com.zetaglobal.dp.dsp.jobs.dealstats.DealStatsDriver.main(DealStatsDriver.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684){noformat}
> This ends up with very empty file in HDFS and empty job details in history 
> server.
>  
> Problem seems to by introduced by this change:
> [https://github.com/apache/beam/pull/14409]
> Why Beam runs concurrent event listener to what spark is doing internally? 
> When I roll back change for SparkRunner, problem disappear for me.
> I am running native SparkRunner with Spark 2.4.4
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to