DRILL-1106: Fix race condition in incoming buffers
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1e1e438d Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1e1e438d Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1e1e438d Branch: refs/heads/master Commit: 1e1e438db84648217b785c020c0e79968b3c50c9 Parents: 22709c2 Author: Steven Phillips <sphill...@maprtech.com> Authored: Wed Jul 2 23:46:27 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Thu Jul 3 08:59:53 2014 -0700 ---------------------------------------------------------------------- .../exec/work/batch/AbstractDataCollector.java | 13 ++----- .../drill/exec/work/batch/MergingCollector.java | 9 ----- .../exec/work/batch/PartitionedCollector.java | 9 ----- .../work/batch/UnlimitedRawBatchBuffer.java | 40 +++++++++++++++++++- 4 files changed, 43 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 23794d9..6eafe57 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -42,6 +42,7 @@ public abstract class AbstractDataCollector implements DataCollector{ protected final RawBatchBuffer[] buffers; private final AtomicInteger parentAccounter; private final AtomicInteger finishedStreams = new AtomicInteger(); + private final FragmentContext context; public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) { Preconditions.checkArgument(minInputsRequired > 0); @@ -53,11 +54,12 @@ public abstract class AbstractDataCollector implements DataCollector{ this.remainders = new AtomicIntegerArray(incoming.size()); this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId(); this.buffers = new RawBatchBuffer[minInputsRequired]; + this.context = context; try { String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL); Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, int.class); for(int i = 0; i < buffers.length; i++) { - buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, incoming.size()); + buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, receiver.supportsOutOfOrderExchange() ? incoming.size() : 1); } } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { @@ -79,8 +81,6 @@ public abstract class AbstractDataCollector implements DataCollector{ } - public abstract void streamFinished(int minorFragmentId); - public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException { // if we received an out of memory, add an item to all the buffer queues. @@ -100,13 +100,8 @@ public abstract class AbstractDataCollector implements DataCollector{ } } - // mark stream finished if we got the last batch. - if(batch.getHeader().getIsLastBatch()){ - streamFinished(minorFragmentId); - } - - getBuffer(minorFragmentId).enqueue(batch); + return decremented; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java index 806b115..5248bb1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java @@ -35,13 +35,4 @@ public class MergingCollector extends AbstractDataCollector{ protected RawBatchBuffer getBuffer(int minorFragmentId) { return buffers[0]; } - - - public void streamFinished(int minorFragmentId) { - if(streamsRunning.decrementAndGet() == 0) buffers[0].finished(); - } - - - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java index c1f4fa5..5190d84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java @@ -32,13 +32,4 @@ public class PartitionedCollector extends AbstractDataCollector{ protected RawBatchBuffer getBuffer(int minorFragmentId) { return buffers[minorFragmentId]; } - - @Override - public void streamFinished(int minorFragmentId) { - buffers[minorFragmentId].finished(); - } - - - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 11e20c7..9d24c66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; @@ -38,6 +39,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ private final AtomicBoolean overlimit = new AtomicBoolean(false); private final AtomicBoolean outOfMemory = new AtomicBoolean(false); private final ResponseSenderQueue readController = new ResponseSenderQueue(); + private int streamCounter; + private FragmentContext context; public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) { int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE); @@ -45,10 +48,15 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ this.softlimit = bufferSizePerSocket * fragmentCount; this.startlimit = Math.max(softlimit/2, 1); this.buffer = Queues.newLinkedBlockingDeque(); + this.streamCounter = fragmentCount; + this.context = context; } @Override public void enqueue(RawFragmentBatch batch) { + if (finished) { + throw new RuntimeException("Attempted to enqueue batch after finished"); + } if (batch.getHeader().getIsOutOfMemory()) { logger.debug("Setting autoread false"); if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory()) { @@ -68,7 +76,24 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void cleanup() { + if (!finished) { + context.fail(new IllegalStateException("Cleanup before finished")); + } + if (!buffer.isEmpty()) { + if (!context.isFailed()) { + context.fail(new IllegalStateException("Batches still in queue during cleanup")); + logger.error("{} Batches in queue.", buffer.size()); + RawFragmentBatch batch; + while ((batch = buffer.poll()) != null) { + logger.error("Batch left in queue: {}", batch); + } + } + RawFragmentBatch batch; + while ((batch = buffer.poll()) != null) { + if (batch.getBody() != null) batch.getBody().release(); + } + } } @Override @@ -82,6 +107,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void finished() { finished = true; + if (!buffer.isEmpty()) { + throw new IllegalStateException("buffer not empty when finished"); + } } @Override @@ -98,7 +126,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ b = buffer.poll(); // if we didn't get a buffer, block on waiting for buffer. - if(b == null && !finished){ + if(b == null && (!finished || !buffer.isEmpty())){ try { b = buffer.take(); } catch (InterruptedException e) { @@ -120,6 +148,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } } + if (b != null && b.getHeader().getIsLastBatch()) { + streamCounter--; + if (streamCounter == 0) { + finished(); + } + } + + if (b == null && buffer.size() > 0) { + throw new IllegalStateException("Returning null when there are batches left in queue"); + } return b; }