[ 
https://issues.apache.org/jira/browse/HIVE-15168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653721#comment-15653721
 ] 

Barna Zsombor Klara commented on HIVE-15168:
--------------------------------------------

The problem is similar to the one fixed before for this test, we have a race 
condition between the listeners being registered (rpc.addListenre and 
promis.addListener) and the submit of the message through RPC.
If you take a look at SparkClientImpl#ClientProtocol#submit you will see that 
currently the driverRpc.call is invoked before the listeners are registered.
To reproduce the test failure one needs to add for example a Thread.sleep after 
the driverRpc.call and before the listeners are being registered.
This would probably never or almost never happen in real life, because the 
execution of the spark job and the network latency should easily take longer 
than the time needed for the code with the listeners to run. But in the unit 
test it is a cause of intermittent failures.

> Flaky test: TestSparkClient.testJobSubmission (still flaky)
> -----------------------------------------------------------
>
>                 Key: HIVE-15168
>                 URL: https://issues.apache.org/jira/browse/HIVE-15168
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Barna Zsombor Klara
>            Assignee: Barna Zsombor Klara
>         Attachments: HIVE-15168.patch
>
>
> [HIVE-14910|https://issues.apache.org/jira/browse/HIVE-14910] already 
> addressed one source of flakyness bud sadly not all it seems.
> In JobHandleImpl the listeners are registered after the job has been 
> submitted.
> This may end up in a racecondition.
> {code}
>  // Link the RPC and the promise so that events from one are propagated to 
> the other as
>       // needed.
>       rpc.addListener(new 
> GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
>         @Override
>         public void operationComplete(io.netty.util.concurrent.Future<Void> 
> f) {
>           if (f.isSuccess()) {
>             handle.changeState(JobHandle.State.QUEUED);
>           } else if (!promise.isDone()) {
>             promise.setFailure(f.cause());
>           }
>         }
>       });
>       promise.addListener(new GenericFutureListener<Promise<T>>() {
>         @Override
>         public void operationComplete(Promise<T> p) {
>           if (jobId != null) {
>             jobs.remove(jobId);
>           }
>           if (p.isCancelled() && !rpc.isDone()) {
>             rpc.cancel(true);
>           }
>         }
>       });
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to