This is an automated email from the ASF dual-hosted git repository. timothyfarkas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit e82f5ef689f1051b6cffc17921462e8e5524629b Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Sun Sep 16 21:01:15 2018 -0700 DRILL-6746: Query can hang when PartitionSender task thread sees a connection failure while sending data batches to remote fragment closes #1470 --- .../drill/exec/work/batch/BaseRawBatchBuffer.java | 39 ++++++++++++++++++---- .../exec/work/batch/SpoolingRawBatchBuffer.java | 17 ++++++---- .../exec/work/batch/UnlimitedRawBatchBuffer.java | 10 ++++++ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index 43abd8e..6d77d63 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.work.batch; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -36,8 +37,9 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { protected interface BufferQueue<T> { void addOomBatch(RawFragmentBatch batch); - RawFragmentBatch poll() throws IOException; + RawFragmentBatch poll() throws IOException, InterruptedException; RawFragmentBatch take() throws IOException, InterruptedException; + RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException; boolean checkForOutOfMemory(); int size(); boolean isEmpty(); @@ -127,17 +129,24 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { * responses pending */ private void clearBufferWithBody() { + RawFragmentBatch batch; while (!bufferQueue.isEmpty()) { - final RawFragmentBatch batch; + batch = null; try { batch = bufferQueue.poll(); assertAckSent(batch); } catch (IOException e) { context.getExecutorState().fail(e); continue; - } - if (batch.getBody() != null) { - batch.getBody().release(); + } catch (InterruptedException e) { + context.getExecutorState().fail(e); + // keep the state that the thread is interrupted + Thread.currentThread().interrupt(); + continue; + } finally { + if (batch != null && batch.getBody() != null) { + batch.getBody().release(); + } } } } @@ -167,7 +176,25 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { // if we didn't get a batch, block on waiting for queue. if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) { - b = bufferQueue.take(); + // We shouldn't block infinitely here. There can be a condition such that due to a failure FragmentExecutor + // state is changed to FAILED and queue is empty. Because of this the minor fragment main thread will block + // here waiting for next batch to arrive. Meanwhile when next batch arrived and was enqueued it sees + // FragmentExecutor failure state and doesn't enqueue the batch and cleans up the buffer queue. Hence this + // thread will stuck forever. So we pool for 5 seconds until we get a batch or FragmentExecutor state is in + // error condition. + while (b == null) { + b = bufferQueue.poll(5, TimeUnit.SECONDS); + if (!context.getExecutorState().shouldContinue()) { + kill(context); + if (b != null) { + assertAckSent(b); + if (b.getBody() != null) { + b.getBody().release(); + } + b = null; + } + } // else b will be assigned a valid batch + } } } catch (final InterruptedException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index 5d4b3a1..50f582d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -102,14 +102,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB } @Override - public RawFragmentBatch poll() throws IOException { + public RawFragmentBatch poll() throws IOException, InterruptedException { RawFragmentBatchWrapper batchWrapper = buffer.poll(); if (batchWrapper != null) { - try { - return batchWrapper.get(); - } catch (InterruptedException e) { - return null; - } + return batchWrapper.get(); } return null; } @@ -120,6 +116,15 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB } @Override + public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException { + RawFragmentBatchWrapper batchWrapper = buffer.poll(timeout, timeUnit); + if (batchWrapper != null) { + return batchWrapper.get(); + } + return null; + } + + @Override public boolean checkForOutOfMemory() { return buffer.peek().isOutOfMemory(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index bf14a74..0d36d5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch; import java.io.IOException; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RawFragmentBatch; @@ -64,6 +65,15 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch } @Override + public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException { + RawFragmentBatch batch = buffer.poll(timeout, timeUnit); + if (batch != null) { + batch.sendOk(); + } + return batch; + } + + @Override public boolean checkForOutOfMemory() { return context.getAllocator().isOverLimit(); }
