[ 
https://issues.apache.org/jira/browse/DRILL-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sorabh Hamirwasia updated DRILL-6746:
-------------------------------------
    Labels: ready-to-commit  (was: )

> Query can hang when PartitionSender task thread sees a connection failure 
> while sending data batches to remote fragment
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6746
>                 URL: https://issues.apache.org/jira/browse/DRILL-6746
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 1.13.0
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>
> An UnorderedMuxExchange is implemented using UnorderedReceiver and 
> HashPartitionSender. Muxer is used to improve the memory usage, such that 
> when multiple minor fragments (let say n) running on a node is sending data 
> to multiple other remote nodes minor fragments (let say m), then each sending 
> fragment has to create m buffers for m receivers. In total on a single node 
> that means creating mn buffers. Whereas with use of muxer what can be done is 
> all the data from m minor fragment can be sent to 1 local minor fragment and 
> that local minor fragment will instead send data to m remote fragments/nodes. 
> Hence total buffer to use will be m only. 
> There is a shared queue which is filled with RecordBatches by all the m 
> sending minor fragments (which is received on Data client channel and 
> ultimately populated by netty thread) and then it is consumed by local minor 
> fragment which has PartitionSender—> UnorderedReceiver with each next() call. 
> Hence the queue is filled and consumed by different thread. When 
> PartitionSender receives an incoming batch then based on some heuristics it 
> creates multiple PartitionTasks threads which all goes over this incoming 
> batch and populates rows that falls in their range to their outgoing batch. 
> The main local minor fragment thread waits until all task thread is completed 
> or in an event of interrupt. After which it gets next() incoming batch. In 
> this process once the output batch is full then it's sent to the remote 
> nodes. All the sends are done asynchronously.
> In this case while sending the outgoing batch by task thread if there is any 
> failure then the executor state of the main local fragment thread (running 
> partitionSender and Unordered receiver) is set to FAILED state 
> asynchronously. Meanwhile next() call is made to get new incoming batch. 
> There is a race condition between the check of executor thread state with 
> next() call and when the FAILED state is set. Hence next() can be called 
> before state is actually updated. With this next() call if there is no 
> RecordBatch present in the queue then the main local fragment thread will 
> call take() on buffer queue and will wait until it get's a new batch. 
> Meanwhile the executor state might get updated and the the netty thread which 
> receives the batch and tries to enqueue it in queue will see the updated 
> state and release the received batch without putting it in shared queue. 
> Since no new batch will be stored in shared queue going forward the main 
> local minor fragment thread will be stuck forever unless a cancellation is 
> explicitly done which will interrupt the stuck thread. This can result in 
> query hang.
> *Logs for above investigation:*
> It looks like the intermediate fragment 2:3 started executing when it 
> receives the record batch.
> {code:java}
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210 
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
> o.a.d.e.w.fragment.FragmentExecutor - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested 
> AWAITING_ALLOCATION --> RUNNING
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210 
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
> o.a.d.e.w.f.FragmentStatusReporter - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State to report: RUNNING
> {code}
> But later while sending a record batch downstream to a remote node it 
> receives failure as Data Connection is not established since remote Drillbit 
> was not running then.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] ERROR 
> o.a.d.e.rpc.ConnectionMultiListener - Failed to establish connection
> java.util.concurrent.ExecutionException: 
> io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection 
> refused: drillbit2/10.10.10.10:31012
>         at 
> io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:54) 
> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:90)
>  [drill-rpc-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:77)
>  [drill-rpc-1.13.0.jar:1.13.0]
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
>  [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
>  [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
>  [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
>  [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122) 
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:278)
>  [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:294)
>  [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) 
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>  [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) 
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>  [netty-common-4.0.48.Final.jar:4.0.48.Final]
>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: 
> Connection refused: drillbit2/10.10.10.10:31012
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[na:1.8.0_144]
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[na:1.8.0_144]
>         at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:258)
>  ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
>  [netty-transport-4.0.48.Final.jar:4.0.48.Final]
>         ... 6 common frames omitted
> Caused by: java.net.ConnectException: Connection refused
>         ... 10 common frames omitted
> {code}
> Then due to this failure while *sendingRecordBatch*, the status handler set’s 
> the Fragment Executor state to FAILED but cleanup is not performed as that 
> will be done when main next() loop of operators is terminated.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] INFO  
> o.a.d.e.w.fragment.FragmentExecutor - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested RUNNING --> 
> FAILED
> {code}
> Ideally the operators should have seen the executor state is FAILED and 
> should have started the cleanup process of FragmentExecutor. During cleanup 
> the FragmentExecutor removes itself from *runningFragments* list and also 
> removes its *FragmentManager* (being an intermediate Fragment). But looks 
> like somewhere the executor thread was stuck and cleanup was not completed. 
> The reason is because after 8 hours when cancellation was requested the 
> Fragment Manager for this minor fragment was still found on that node and 
> cancellation was performed properly.
> {code:java}
> 2018-08-08 08:51:23,207 [BitServer-1] INFO  
> o.a.d.e.w.fragment.FragmentExecutor - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED --> 
> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,207 [BitServer-1] WARN  
> o.a.d.e.w.fragment.FragmentExecutor - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: Ignoring unexpected state 
> transition FAILED --> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,209 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
> o.a.d.e.w.fragment.FragmentExecutor - 
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED --> 
> FINISHED
> 2018-08-08 08:51:23,216 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] ERROR 
> o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ConnectException: 
> Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
> org.apache.drill.common.exceptions.UserException: SYSTEM ERROR: 
> ConnectException: Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
>         at 
> org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
>  ~[drill-common-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:300)
>  [drill-java-exec-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:160)
>  [drill-java-exec-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:266)
>  [drill-java-exec-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
>  [drill-common-1.13.0.jar:1.13.0]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [na:1.8.0_144]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [na:1.8.0_144]
>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: org.apache.drill.exec.rpc.RpcException: Command failed while 
> establishing connection.  Failure type CONNECTION.
>         at 
> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:67) 
> ~[drill-rpc-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.rpc.ListeningCommand.connectionFailed(ListeningCommand.java:66)
>  ~[drill-rpc-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.rpc.data.DataTunnel$SendBatchAsyncListen.connectionFailed(DataTunnel.java:145)
>  ~[drill-java-exec-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.rpc.ReconnectingConnection$ConnectionListeningFuture.connectionFailed(ReconnectingConnection.java:152)
>  ~[drill-rpc-1.13.0.jar:1.13.0]
>         at 
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:119)
>  ~[drill-rpc-1.13.0.jar:1.13.0]
>         at 
> Caused by: java.net.ConnectException: Connection refused
>         ... 10 common frames omitted
> {code}
> Cleanup can be stuck for 8 hours (before cancellation was triggered) because 
> of below reasons.
>  # This intermediate Fragment was running Unordered Receiver and 
> PartitionSender. So if there is a logic in PartitionSender such that in some 
> case it will keep waiting for next incoming batch without checking the 
> fragment executor state using (shouldContinue()) then we can get stuck. Since 
> only after checking shouldContinue() state it will know that it has to exit 
> and not wait for any more batches. Still need to look into PartitionSender 
> code and understand it.
> Also I looked into profile and based on my understanding major fragment 2's 
> unordered receiver should get all the batches from major fragment 4 single 
> sender. When I add batches in single sender to receiver there is mismatch. 
> Basically sender has sent more batches than what receiver has received:
> *Sender Stats:*
>  Batches: 113,528
>  Row: 62,554,675
> *Receiver Stats:*
>  Batches: 111,372
>  Row: 61,137,838



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to