[ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383580#comment-16383580
 ] 

ASF GitHub Bot commented on FLINK-8517:
---------------------------------------

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/5621

    [FLINK-8517] fix missing synchronization in TaskEventDispatcher

    ## What is the purpose of the change
    
    The `TaskEventDispatcher` was missing synchronization accessing the 
`registeredHandlers` field for the new `subscribeToEvent()` and `publish()` 
methods. This was causing the 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()` test to 
sporadically fail (reproducible after running a couple of times).
    
    Please merge into `master` and `release-1.5` after accepting.
    
    ## Brief change log
    
    - add synchronization around `TaskEventDispatcher#subscribeToEvent()`'s 
access to `registeredHandlers`
    - add synchronization around `TaskEventDispatcher#publish()`'s access to 
`registeredHandlers`
    
    ## Verifying this change
    
    This change is already covered by existing tests (indirectly), such as 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()`. I ran it almost 
24000 times and could not reproduce it anymore with the change
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
      - The S3 file system connector: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-8517

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5621
    
----
commit aabeb89dd1259174c786f19b7e97c4c50038610f
Author: Nico Kruber <nico@...>
Date:   2018-03-02T13:38:20Z

    [FLINK-8517] fix missing synchronization in TaskEventDispatcher

----


> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8517
>                 URL: https://issues.apache.org/jira/browse/FLINK-8517
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API, TaskManager, Tests
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.5.0, 1.4.3
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>       at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>       at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>       at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/333360156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to