[ 
https://issues.apache.org/jira/browse/DRILL-5599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057326#comment-16057326
 ] 

ASF GitHub Bot commented on DRILL-5599:
---------------------------------------

Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/857
  
    Reworded error message. @paul-rogers and @ppadma thanks for code review!


> Notify StatusHandlerListener that batch sending has failed even if channel is 
> still open 
> -----------------------------------------------------------------------------------------
>
>                 Key: DRILL-5599
>                 URL: https://issues.apache.org/jira/browse/DRILL-5599
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.11.0
>            Reporter: Arina Ielchiieva
>            Assignee: Arina Ielchiieva
>              Labels: ready-to-commit
>             Fix For: 1.11.0
>
>         Attachments: sample.json
>
>
> *Issue*
> Queries stay in CANCELLATION_REQUESTED state after connection with client was 
> interrupted. Jstack shows that threads for such queries are blocked and 
> waiting to semaphore to be released.
> {noformat}
> "26b70318-ddde-9ead-eee2-0828da97b59f:frag:0:0" daemon prio=10 
> tid=0x00007f56dc3c9000 nid=0x25fd waiting on condition [0x00007f56b31dc000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000006f4688ab0> (a 
> java.util.concurrent.Semaphore$NonfairSync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>       at java.util.concurrent.Semaphore.acquire(Semaphore.java:472)
>       at 
> org.apache.drill.exec.ops.SendingAccountor.waitForSendComplete(SendingAccountor.java:48)
>       - locked <0x00000006f4688a78> (a 
> org.apache.drill.exec.ops.SendingAccountor)
>       at 
> org.apache.drill.exec.ops.FragmentContext.waitForSendComplete(FragmentContext.java:486)
>       at 
> org.apache.drill.exec.physical.impl.BaseRootExec.close(BaseRootExec.java:134)
>       at 
> org.apache.drill.exec.physical.impl.ScreenCreator$ScreenRoot.close(ScreenCreator.java:141)
>       at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.closeOutResources(FragmentExecutor.java:313)
>       at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:155)
>       at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:264)
>       at 
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>    Locked ownable synchronizers:      - <0x000000073f800b68> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> {noformat}
> *Reproduce*
> Ran modified ConcurrencyTest.java referenced in DRILL-4338 and cancel after 
> 2-3 seconds. ConcurrencyTest.java should be modified as follows:
> {{ExecutorService executor = Executors.newFixedThreadPool(10);}} and execute 
> 200 queries  {{for (int i = 1; i <= 200; i++)}}.
> Query: {{select * from dfs.`sample.json`}}, data set is attached.
> *Problem description*
> Looks like the problem occurs when the server has sent data to the client and 
> waiting from the client confirmation that data was received. In this case 
> [{{ChannelListenerWithCoordinationId}}|https://github.com/apache/drill/blob/master/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java#L118]
>  is used for tracking. {{ChannelListenerWithCoordinationId}} contains 
> {{StatusHandler}} which keeps track of sent batches. It updates 
> {{SendingAccountor}} with information about how many batches were sent and 
> how many batches have reached the client (successfully or not).
> When sent operation is complete (successfully or not) 
> {{operationComplete(ChannelFuture future)}} is called. Given future contains 
> information if sent operation was successful or not, failure cause, channel 
> status etc. If sent operation was successful we do nothing since in this case 
> client sent us acknowledgment and when we received it, we notified 
> {{StatusHandlerListener}} that batch was received. But if sent operation has 
> failed, we need to notify {{StatusHandler}} that sent was unsuccessful.
> {{operationComplete(ChannelFuture future)}} code:
> {code}
>       if (!future.isSuccess()) {
>         removeFromMap(coordinationId);
>         if (future.channel().isActive()) {
>           throw new RpcException("Future failed");
>         } else {
>           setException(new ChannelClosedException());
>         }
>       }
>     }
> {code}
> Method {{setException}} notifies {{StatusHandler}} that batch sent has failed 
> but it's only called when channel is closed. When channel is still open we 
> just throw {{RpcException}}. This is where the problem occurs. 
> {{operationComplete(ChannelFuture future)}} is called via Netty 
> {{DefaultPromise.notifyListener0}} method which catches {{Throwable}} and 
> just logs it. So even of we throw exception nobody is notified about it, 
> especially {{StatusHandler}}.
> *Fix*
> Use {{setException}} even if channel is still open instead of throwing 
> exception.
> This problem was also raised in 
> [PR-463|https://github.com/apache/drill/pull/463] but was decided to be fixed 
> in the scope of new Jira.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to