Sorabh Hamirwasia created DRILL-6746:
----------------------------------------

             Summary: Query can hang when PartitionSender task thread sees a 
connection failure while sending data batches to remote fragment
                 Key: DRILL-6746
                 URL: https://issues.apache.org/jira/browse/DRILL-6746
             Project: Apache Drill
          Issue Type: Bug
          Components: Execution - Flow
    Affects Versions: 1.13.0
            Reporter: Sorabh Hamirwasia
            Assignee: Sorabh Hamirwasia
             Fix For: 1.15.0


An UnorderedMuxExchange is implemented using UnorderedReceiver and 
HashPartitionSender. Muxer is used to improve the memory usage, such that when 
multiple minor fragments (let say n) running on a node is sending data to 
multiple other remote nodes minor fragments (let say m), then each sending 
fragment has to create m buffers for m receivers. In total on a single node 
that means creating mn buffers. Whereas with use of muxer what can be done is 
all the data from m minor fragment can be sent to 1 local minor fragment and 
that local minor fragment will instead send data to m remote fragments/nodes. 
Hence total buffer to use will be m only. 

There is a shared queue which is filled with RecordBatches by all the m sending 
minor fragments (which is received on Data client channel and ultimately 
populated by netty thread) and then it is consumed by local minor fragment 
which has PartitionSender—> UnorderedReceiver with each next() call. Hence the 
queue is filled and consumed by different thread. When PartitionSender receives 
an incoming batch then based on some heuristics it creates multiple 
PartitionTasks threads which all goes over this incoming batch and populates 
rows that falls in their range to their outgoing batch. The main local minor 
fragment thread waits until all task thread is completed or in an event of 
interrupt. After which it gets next() incoming batch. In this process once the 
output batch is full then it's sent to the remote nodes. All the sends are done 
asynchronously.

In this case while sending the outgoing batch by task thread if there is any 
failure then the executor state of the main local fragment thread (running 
partitionSender and Unordered receiver) is set to FAILED state asynchronously. 
Meanwhile next() call is made to get new incoming batch. There is a race 
condition between the check of executor thread state with next() call and when 
the FAILED state is set. Hence next() can be called before state is actually 
updated. With this next() call if there is no RecordBatch present in the queue 
then the main local fragment thread will call take() on buffer queue and will 
wait until it get's a new batch. Meanwhile the executor state might get updated 
and the the netty thread which receives the batch and tries to enqueue it in 
queue will see the updated state and release the received batch without putting 
it in shared queue. Since no new batch will be stored in shared queue going 
forward the main local minor fragment thread will be stuck forever unless a 
cancellation is explicitly done which will interrupt the stuck thread. This can 
result in query hang.

*Logs for above investigation:*

It looks like the intermediate fragment 2:3 started executing when it receives 
the record batch.
{code:java}
drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210 
[249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
o.a.d.e.w.fragment.FragmentExecutor - 249580bc-5bca-b166-e906-084b35ecf30d:2:3: 
State change requested AWAITING_ALLOCATION --> RUNNING

drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210 
[249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
o.a.d.e.w.f.FragmentStatusReporter - 249580bc-5bca-b166-e906-084b35ecf30d:2:3: 
State to report: RUNNING
{code}
But later while sending a record batch downstream to a remote node it receives 
failure as Data Connection is not established since remote Drillbit was not 
running then.
{code:java}
2018-08-08 00:33:29,184 [BitClient-7] ERROR o.a.d.e.rpc.ConnectionMultiListener 
- Failed to establish connection
java.util.concurrent.ExecutionException: 
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: 
drillbit2/10.10.10.10:31012
        at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:54) 
~[netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:90)
 [drill-rpc-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:77)
 [drill-rpc-1.13.0.jar:1.13.0]
        at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
 [netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
 [netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
 [netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
 [netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122) 
[netty-common-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:278)
 [netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:294)
 [netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) 
[netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
 [netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) 
[netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) 
[netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
 [netty-common-4.0.48.Final.jar:4.0.48.Final]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: 
Connection refused: drillbit2/10.10.10.10:31012
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
~[na:1.8.0_144]
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
~[na:1.8.0_144]
        at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:258)
 ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
 [netty-transport-4.0.48.Final.jar:4.0.48.Final]
        ... 6 common frames omitted
Caused by: java.net.ConnectException: Connection refused
        ... 10 common frames omitted

{code}
Then due to this failure while *sendingRecordBatch*, the status handler set’s 
the Fragment Executor state to FAILED but cleanup is not performed as that will 
be done when main next() loop of operators is terminated.
{code:java}
2018-08-08 00:33:29,184 [BitClient-7] INFO  o.a.d.e.w.fragment.FragmentExecutor 
- 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested RUNNING --> 
FAILED
{code}
Ideally the operators should have seen the executor state is FAILED and should 
have started the cleanup process of FragmentExecutor. During cleanup the 
FragmentExecutor removes itself from *runningFragments* list and also removes 
its *FragmentManager* (being an intermediate Fragment). But looks like 
somewhere the executor thread was stuck and cleanup was not completed. The 
reason is because after 8 hours when cancellation was requested the Fragment 
Manager for this minor fragment was still found on that node and cancellation 
was performed properly.
{code:java}
2018-08-08 08:51:23,207 [BitServer-1] INFO  o.a.d.e.w.fragment.FragmentExecutor 
- 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED --> 
CANCELLATION_REQUESTED
2018-08-08 08:51:23,207 [BitServer-1] WARN  o.a.d.e.w.fragment.FragmentExecutor 
- 249580bc-5bca-b166-e906-084b35ecf30d:2:3: Ignoring unexpected state 
transition FAILED --> CANCELLATION_REQUESTED
2018-08-08 08:51:23,209 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO  
o.a.d.e.w.fragment.FragmentExecutor - 249580bc-5bca-b166-e906-084b35ecf30d:2:3: 
State change requested FAILED --> FINISHED

2018-08-08 08:51:23,216 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] ERROR 
o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ConnectException: 
Connection refused

Fragment 2:3

[Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
org.apache.drill.common.exceptions.UserException: SYSTEM ERROR: 
ConnectException: Connection refused

Fragment 2:3

[Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
        at 
org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
 ~[drill-common-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:300)
 [drill-java-exec-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:160)
 [drill-java-exec-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:266)
 [drill-java-exec-1.13.0.jar:1.13.0]
        at 
org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38) 
[drill-common-1.13.0.jar:1.13.0]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[na:1.8.0_144]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[na:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: org.apache.drill.exec.rpc.RpcException: Command failed while 
establishing connection.  Failure type CONNECTION.
        at 
org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:67) 
~[drill-rpc-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.rpc.ListeningCommand.connectionFailed(ListeningCommand.java:66)
 ~[drill-rpc-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.rpc.data.DataTunnel$SendBatchAsyncListen.connectionFailed(DataTunnel.java:145)
 ~[drill-java-exec-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.rpc.ReconnectingConnection$ConnectionListeningFuture.connectionFailed(ReconnectingConnection.java:152)
 ~[drill-rpc-1.13.0.jar:1.13.0]
        at 
org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:119)
 ~[drill-rpc-1.13.0.jar:1.13.0]
        at 
Caused by: java.net.ConnectException: Connection refused
        ... 10 common frames omitted
{code}
Cleanup can be stuck for 8 hours (before cancellation was triggered) because of 
below reasons.
 # This intermediate Fragment was running Unordered Receiver and 
PartitionSender. So if there is a logic in PartitionSender such that in some 
case it will keep waiting for next incoming batch without checking the fragment 
executor state using (shouldContinue()) then we can get stuck. Since only after 
checking shouldContinue() state it will know that it has to exit and not wait 
for any more batches. Still need to look into PartitionSender code and 
understand it.

Also I looked into profile and based on my understanding major fragment 2's 
unordered receiver should get all the batches from major fragment 4 single 
sender. When I add batches in single sender to receiver there is mismatch. 
Basically sender has sent more batches than what receiver has received:

*Sender Stats:*
 Batches: 113,528
 Row: 62,554,675

*Receiver Stats:*
 Batches: 111,372
 Row: 61,137,838



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to