[
https://issues.apache.org/jira/browse/DRILL-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626553#comment-16626553
]
ASF GitHub Bot commented on DRILL-6746:
---------------------------------------
ilooner closed pull request #1470: DRILL-6746: Query can hang when
PartitionSender task thread sees a co…
URL: https://github.com/apache/drill/pull/1470
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e705d..6d77d639f8f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.work.batch;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -36,8 +37,9 @@
protected interface BufferQueue<T> {
void addOomBatch(RawFragmentBatch batch);
- RawFragmentBatch poll() throws IOException;
+ RawFragmentBatch poll() throws IOException, InterruptedException;
RawFragmentBatch take() throws IOException, InterruptedException;
+ RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws
InterruptedException, IOException;
boolean checkForOutOfMemory();
int size();
boolean isEmpty();
@@ -127,17 +129,24 @@ public synchronized void kill(final FragmentContext
context) {
* responses pending
*/
private void clearBufferWithBody() {
+ RawFragmentBatch batch;
while (!bufferQueue.isEmpty()) {
- final RawFragmentBatch batch;
+ batch = null;
try {
batch = bufferQueue.poll();
assertAckSent(batch);
} catch (IOException e) {
context.getExecutorState().fail(e);
continue;
- }
- if (batch.getBody() != null) {
- batch.getBody().release();
+ } catch (InterruptedException e) {
+ context.getExecutorState().fail(e);
+ // keep the state that the thread is interrupted
+ Thread.currentThread().interrupt();
+ continue;
+ } finally {
+ if (batch != null && batch.getBody() != null) {
+ batch.getBody().release();
+ }
}
}
}
@@ -167,7 +176,25 @@ public RawFragmentBatch getNext() throws IOException {
// if we didn't get a batch, block on waiting for queue.
if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
- b = bufferQueue.take();
+ // We shouldn't block infinitely here. There can be a condition such
that due to a failure FragmentExecutor
+ // state is changed to FAILED and queue is empty. Because of this the
minor fragment main thread will block
+ // here waiting for next batch to arrive. Meanwhile when next batch
arrived and was enqueued it sees
+ // FragmentExecutor failure state and doesn't enqueue the batch and
cleans up the buffer queue. Hence this
+ // thread will stuck forever. So we pool for 5 seconds until we get a
batch or FragmentExecutor state is in
+ // error condition.
+ while (b == null) {
+ b = bufferQueue.poll(5, TimeUnit.SECONDS);
+ if (!context.getExecutorState().shouldContinue()) {
+ kill(context);
+ if (b != null) {
+ assertAckSent(b);
+ if (b.getBody() != null) {
+ b.getBody().release();
+ }
+ b = null;
+ }
+ } // else b will be assigned a valid batch
+ }
}
} catch (final InterruptedException e) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 5d4b3a1b8ad..50f582dfa38 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -102,14 +102,10 @@ public void addOomBatch(RawFragmentBatch batch) {
}
@Override
- public RawFragmentBatch poll() throws IOException {
+ public RawFragmentBatch poll() throws IOException, InterruptedException {
RawFragmentBatchWrapper batchWrapper = buffer.poll();
if (batchWrapper != null) {
- try {
- return batchWrapper.get();
- } catch (InterruptedException e) {
- return null;
- }
+ return batchWrapper.get();
}
return null;
}
@@ -119,6 +115,15 @@ public RawFragmentBatch take() throws IOException,
InterruptedException {
return buffer.take().get();
}
+ @Override
+ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws
InterruptedException, IOException {
+ RawFragmentBatchWrapper batchWrapper = buffer.poll(timeout, timeUnit);
+ if (batchWrapper != null) {
+ return batchWrapper.get();
+ }
+ return null;
+ }
+
@Override
public boolean checkForOutOfMemory() {
return buffer.peek().isOutOfMemory();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bf14a74b8cf..0d36d5d4083 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -63,6 +64,15 @@ public RawFragmentBatch take() throws IOException,
InterruptedException {
return batch;
}
+ @Override
+ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws
InterruptedException, IOException {
+ RawFragmentBatch batch = buffer.poll(timeout, timeUnit);
+ if (batch != null) {
+ batch.sendOk();
+ }
+ return batch;
+ }
+
@Override
public boolean checkForOutOfMemory() {
return context.getAllocator().isOverLimit();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Query can hang when PartitionSender task thread sees a connection failure
> while sending data batches to remote fragment
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: DRILL-6746
> URL: https://issues.apache.org/jira/browse/DRILL-6746
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Flow
> Affects Versions: 1.13.0
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.15.0
>
>
> An UnorderedMuxExchange is implemented using UnorderedReceiver and
> HashPartitionSender. Muxer is used to improve the memory usage, such that
> when multiple minor fragments (let say n) running on a node is sending data
> to multiple other remote nodes minor fragments (let say m), then each sending
> fragment has to create m buffers for m receivers. In total on a single node
> that means creating mn buffers. Whereas with use of muxer what can be done is
> all the data from m minor fragment can be sent to 1 local minor fragment and
> that local minor fragment will instead send data to m remote fragments/nodes.
> Hence total buffer to use will be m only.
> There is a shared queue which is filled with RecordBatches by all the m
> sending minor fragments (which is received on Data client channel and
> ultimately populated by netty thread) and then it is consumed by local minor
> fragment which has PartitionSender—> UnorderedReceiver with each next() call.
> Hence the queue is filled and consumed by different thread. When
> PartitionSender receives an incoming batch then based on some heuristics it
> creates multiple PartitionTasks threads which all goes over this incoming
> batch and populates rows that falls in their range to their outgoing batch.
> The main local minor fragment thread waits until all task thread is completed
> or in an event of interrupt. After which it gets next() incoming batch. In
> this process once the output batch is full then it's sent to the remote
> nodes. All the sends are done asynchronously.
> In this case while sending the outgoing batch by task thread if there is any
> failure then the executor state of the main local fragment thread (running
> partitionSender and Unordered receiver) is set to FAILED state
> asynchronously. Meanwhile next() call is made to get new incoming batch.
> There is a race condition between the check of executor thread state with
> next() call and when the FAILED state is set. Hence next() can be called
> before state is actually updated. With this next() call if there is no
> RecordBatch present in the queue then the main local fragment thread will
> call take() on buffer queue and will wait until it get's a new batch.
> Meanwhile the executor state might get updated and the the netty thread which
> receives the batch and tries to enqueue it in queue will see the updated
> state and release the received batch without putting it in shared queue.
> Since no new batch will be stored in shared queue going forward the main
> local minor fragment thread will be stuck forever unless a cancellation is
> explicitly done which will interrupt the stuck thread. This can result in
> query hang.
> *Logs for above investigation:*
> It looks like the intermediate fragment 2:3 started executing when it
> receives the record batch.
> {code:java}
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested
> AWAITING_ALLOCATION --> RUNNING
> drill-cluster/drillbit.1.log:2018-08-08 00:27:34,210
> [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.f.FragmentStatusReporter -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State to report: RUNNING
> {code}
> But later while sending a record batch downstream to a remote node it
> receives failure as Data Connection is not established since remote Drillbit
> was not running then.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] ERROR
> o.a.d.e.rpc.ConnectionMultiListener - Failed to establish connection
> java.util.concurrent.ExecutionException:
> io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection
> refused: drillbit2/10.10.10.10:31012
> at
> io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:54)
> ~[netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:90)
> [drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:77)
> [drill-rpc-1.13.0.jar:1.13.0]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:278)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:294)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> [netty-common-4.0.48.Final.jar:4.0.48.Final]
> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: drillbit2/10.10.10.10:31012
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.8.0_144]
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_144]
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:258)
> ~[netty-transport-4.0.48.Final.jar:4.0.48.Final]
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
> [netty-transport-4.0.48.Final.jar:4.0.48.Final]
> ... 6 common frames omitted
> Caused by: java.net.ConnectException: Connection refused
> ... 10 common frames omitted
> {code}
> Then due to this failure while *sendingRecordBatch*, the status handler set’s
> the Fragment Executor state to FAILED but cleanup is not performed as that
> will be done when main next() loop of operators is terminated.
> {code:java}
> 2018-08-08 00:33:29,184 [BitClient-7] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested RUNNING -->
> FAILED
> {code}
> Ideally the operators should have seen the executor state is FAILED and
> should have started the cleanup process of FragmentExecutor. During cleanup
> the FragmentExecutor removes itself from *runningFragments* list and also
> removes its *FragmentManager* (being an intermediate Fragment). But looks
> like somewhere the executor thread was stuck and cleanup was not completed.
> The reason is because after 8 hours when cancellation was requested the
> Fragment Manager for this minor fragment was still found on that node and
> cancellation was performed properly.
> {code:java}
> 2018-08-08 08:51:23,207 [BitServer-1] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED -->
> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,207 [BitServer-1] WARN
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: Ignoring unexpected state
> transition FAILED --> CANCELLATION_REQUESTED
> 2018-08-08 08:51:23,209 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] INFO
> o.a.d.e.w.fragment.FragmentExecutor -
> 249580bc-5bca-b166-e906-084b35ecf30d:2:3: State change requested FAILED -->
> FINISHED
> 2018-08-08 08:51:23,216 [249580bc-5bca-b166-e906-084b35ecf30d:frag:2:3] ERROR
> o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ConnectException:
> Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
> org.apache.drill.common.exceptions.UserException: SYSTEM ERROR:
> ConnectException: Connection refused
> Fragment 2:3
> [Error Id: 3f66fbfb-a93b-4b48-8008-ebb78e084905 on drillbit1:31010]
> at
> org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
> ~[drill-common-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:300)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:160)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:266)
> [drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
> [drill-common-1.13.0.jar:1.13.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [na:1.8.0_144]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [na:1.8.0_144]
> at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
> Caused by: org.apache.drill.exec.rpc.RpcException: Command failed while
> establishing connection. Failure type CONNECTION.
> at
> org.apache.drill.exec.rpc.RpcException.mapException(RpcException.java:67)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ListeningCommand.connectionFailed(ListeningCommand.java:66)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.data.DataTunnel$SendBatchAsyncListen.connectionFailed(DataTunnel.java:145)
> ~[drill-java-exec-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ReconnectingConnection$ConnectionListeningFuture.connectionFailed(ReconnectingConnection.java:152)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> org.apache.drill.exec.rpc.ConnectionMultiListener$ConnectionHandler.operationComplete(ConnectionMultiListener.java:119)
> ~[drill-rpc-1.13.0.jar:1.13.0]
> at
> Caused by: java.net.ConnectException: Connection refused
> ... 10 common frames omitted
> {code}
> Cleanup can be stuck for 8 hours (before cancellation was triggered) because
> of below reasons.
> # This intermediate Fragment was running Unordered Receiver and
> PartitionSender. So if there is a logic in PartitionSender such that in some
> case it will keep waiting for next incoming batch without checking the
> fragment executor state using (shouldContinue()) then we can get stuck. Since
> only after checking shouldContinue() state it will know that it has to exit
> and not wait for any more batches. Still need to look into PartitionSender
> code and understand it.
> Also I looked into profile and based on my understanding major fragment 2's
> unordered receiver should get all the batches from major fragment 4 single
> sender. When I add batches in single sender to receiver there is mismatch.
> Basically sender has sent more batches than what receiver has received:
> *Sender Stats:*
> Batches: 113,528
> Row: 62,554,675
> *Receiver Stats:*
> Batches: 111,372
> Row: 61,137,838
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)