[ https://issues.apache.org/jira/browse/HIVE-15168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653721#comment-15653721 ]
Barna Zsombor Klara commented on HIVE-15168: -------------------------------------------- The problem is similar to the one fixed before for this test, we have a race condition between the listeners being registered (rpc.addListenre and promis.addListener) and the submit of the message through RPC. If you take a look at SparkClientImpl#ClientProtocol#submit you will see that currently the driverRpc.call is invoked before the listeners are registered. To reproduce the test failure one needs to add for example a Thread.sleep after the driverRpc.call and before the listeners are being registered. This would probably never or almost never happen in real life, because the execution of the spark job and the network latency should easily take longer than the time needed for the code with the listeners to run. But in the unit test it is a cause of intermittent failures. > Flaky test: TestSparkClient.testJobSubmission (still flaky) > ----------------------------------------------------------- > > Key: HIVE-15168 > URL: https://issues.apache.org/jira/browse/HIVE-15168 > Project: Hive > Issue Type: Sub-task > Reporter: Barna Zsombor Klara > Assignee: Barna Zsombor Klara > Attachments: HIVE-15168.patch > > > [HIVE-14910|https://issues.apache.org/jira/browse/HIVE-14910] already > addressed one source of flakyness bud sadly not all it seems. > In JobHandleImpl the listeners are registered after the job has been > submitted. > This may end up in a racecondition. > {code} > // Link the RPC and the promise so that events from one are propagated to > the other as > // needed. > rpc.addListener(new > GenericFutureListener<io.netty.util.concurrent.Future<Void>>() { > @Override > public void operationComplete(io.netty.util.concurrent.Future<Void> > f) { > if (f.isSuccess()) { > handle.changeState(JobHandle.State.QUEUED); > } else if (!promise.isDone()) { > promise.setFailure(f.cause()); > } > } > }); > promise.addListener(new GenericFutureListener<Promise<T>>() { > @Override > public void operationComplete(Promise<T> p) { > if (jobId != null) { > jobs.remove(jobId); > } > if (p.isCancelled() && !rpc.isDone()) { > rpc.cancel(true); > } > } > }); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)