[
https://issues.apache.org/jira/browse/DRILL-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sorabh Hamirwasia updated DRILL-6746:
-------------------------------------
Reviewer: salim achouche
> Query can hang when PartitionSender task thread sees a connection failure
> while sending data batches to remote fragment
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: DRILL-6746
> URL: https://issues.apache.org/jira/browse/DRILL-6746
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Flow
> Affects Versions: 1.13.0
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Fix For: 1.15.0
>
>
> An UnorderedMuxExchange is implemented using UnorderedReceiver and
> HashPartitionSender. Muxer is used to improve the memory usage, such that
> when multiple minor fragments (let say n) running on a node is sending data
> to multiple other remote nodes minor fragments (let say m), then each sending
> fragment has to create m buffers for m receivers. In total on a single node
> that means creating mn buffers. Whereas with use of muxer what can be done is
> all the data from m minor fragment can be sent to 1 local minor fragment and
> that local minor fragment will instead send data to m remote fragments/nodes.
> Hence total buffer to use will be m only.
> There is a shared queue which is filled with RecordBatches by all the m
> sending minor fragments (which is received on Data client channel and
> ultimately populated by netty thread) and then it is consumed by local minor
> fragment which has PartitionSender—> UnorderedReceiver with each next() call.
> Hence the queue is filled and consumed by different thread. When
> PartitionSender receives an incoming batch then based on some heuristics it
> creates multiple PartitionTasks threads which all goes over this incoming
> batch and populates rows that falls in their range to their outgoing batch.
> The main local minor fragment thread waits until all task thread is completed
> or in an event of interrupt. After which it gets next() incoming batch. In
> this process once the output batch is full then it's sent to the remote
> nodes. All the sends are done asynchronously.
> In this case while sending the outgoing batch by task thread if there is any
> failure then the executor state of the main local fragment thread (running
> partitionSender and Unordered receiver) is set to FAILED state
> asynchronously. Meanwhile next() call is made to get new incoming batch.
> There is a race condition between the check of executor thread state with
> next() call and when the FAILED state is set. Hence next() can be called
> before state is actually updated. With this next() call if there is no
> RecordBatch present in the queue then the main local fragment thread will
> call take() on buffer queue and will wait until it get's a new batch.
> Meanwhile the executor state might get updated and the the netty thread which
> receives the batch and tries to enqueue it in queue will see the updated
> state and release the received batch without putting it in shared queue.
> Since no new batch will be stored in shared queue going forward the main
> local minor fragment thread will be stuck forever unless a cancellation is
> explicitly done which will interrupt the stuck thread. This can result in
> query hang.
> *Logs for above investigation:*
> It looks like the intermediate fragment 2:3 started executing when it
> receives the record batch.
> {code:java}
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested
> AWAITING_ALLOCATION --> RUNNING
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.f.FragmentStatusReporter -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State to report: RUNNING
> {code}
> But later while sending a record batch downstream to a remote node it
> receives failure as Data Connection is not established since remote Drillbit
> was not running then.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] ERROR
> o.a.d.e.rpc.ConnectionMultiListener - Failed to establish connection
> java.util.concurrent.ExecutionException:
> io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection
> refused: drillbit2/10.10.10.10:31012
> at
> io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:54)
> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:90)
> [drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:77)
> [drill-rpc-1.13.0.jar:1.13.0]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:278)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:294)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: drillbit2/10.10.10.10:31012
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.8.0_144]
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_144]
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:258)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> ... 6 common frames omitted
> Caused by: java.net.ConnectException: Connection refused
> ... 10 common frames omitted
> {code}
> Then due to this failure while *sendingRecordBatch*, the status handler set’s
> the Fragment Executor state to FAILED but cleanup is not performed as that
> will be done when main next() loop of operators is terminated.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested RUNNING -->
> FAILED
> {code}
> Ideally the operators should have seen the executor state is FAILED and
> should have started the cleanup process of FragmentExecutor. During cleanup
> the FragmentExecutor removes itself from *runningFragments* list and also
> removes its *FragmentManager* (being an intermediate Fragment). But looks
> like somewhere the executor thread was stuck and cleanup was not completed.
> The reason is because after 8 hours when cancellation was requested the
> Fragment Manager for this minor fragment was still found on that node and
> cancellation was performed properly.
> {code:java}
> 2018-08-08 08:51:23,207 [BitServer-1] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED -->
> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,207 [BitServer-1] WARN
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: Ignoring unexpected state
> transition FAILED --> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,209 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED -->
> FINISHED
> 2018-08-08 08:51:23,216 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] ERROR
> o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ConnectException:
> Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
> org.apache.drill.common.exceptions.UserException: SYSTEM ERROR:
> ConnectException: Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
> at
> org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
> ~[drill-common-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:300)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:160)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:266)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
> [drill-common-1.13.0.jar:1.13.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [na:1.8.0_144]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [na:1.8.0_144]
> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: org.apache.drill.exec.rpc.RpcException: Command failed while
> establishing connection. Failure type CONNECTION.
> at
> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:67)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ListeningCommand.connectionFailed(ListeningCommand.java:66)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.data.DataTunnel$SendBatchAsyncListen.connectionFailed(DataTunnel.java:145)
> ~[drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ReconnectingConnection$ConnectionListeningFuture.connectionFailed(ReconnectingConnection.java:152)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:119)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> Caused by: java.net.ConnectException: Connection refused
> ... 10 common frames omitted
> {code}
> Cleanup can be stuck for 8 hours (before cancellation was triggered) because
> of below reasons.
> # This intermediate Fragment was running Unordered Receiver and
> PartitionSender. So if there is a logic in PartitionSender such that in some
> case it will keep waiting for next incoming batch without checking the
> fragment executor state using (shouldContinue()) then we can get stuck. Since
> only after checking shouldContinue() state it will know that it has to exit
> and not wait for any more batches. Still need to look into PartitionSender
> code and understand it.
> Also I looked into profile and based on my understanding major fragment 2's
> unordered receiver should get all the batches from major fragment 4 single
> sender. When I add batches in single sender to receiver there is mismatch.
> Basically sender has sent more batches than what receiver has received:
> *Sender Stats:*
> Batches: 113,528
> Row: 62,554,675
> *Receiver Stats:*
> Batches: 111,372
> Row: 61,137,838
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)