Update Data Collection and batch buffers to one or all streams for auto read depending on use.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb2091ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb2091ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb2091ad Branch: refs/heads/master Commit: fb2091ad74bc677b309e5d32d375729dd5bf45bc Parents: 8adb527 Author: Jacques Nadeau <jacq...@apache.org> Authored: Sun Jun 8 09:35:28 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Sun Jun 8 09:41:19 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/logical/DrillJoinRule.java | 2 +- .../exec/work/batch/AbstractDataCollector.java | 39 +++++++++++--- .../drill/exec/work/batch/RawBatchBuffer.java | 2 +- .../drill/exec/work/batch/ReadController.java | 23 +++++++++ .../exec/work/batch/SpoolingRawBatchBuffer.java | 4 +- .../work/batch/UnlimitedRawBatchBuffer.java | 53 ++++++++++++-------- 6 files changed, 93 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java index e940b2e..0f63140 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java @@ -90,7 +90,7 @@ public class DrillJoinRule extends RelOptRule { } newJoinCondition = RexUtil.composeConjunction(builder, equijoinList, false); } else { - tracer.warning("Non-equijoins are only supported in the presence of an equijoin."); +// tracer.warning("Non-equijoins are only supported in the presence of an equijoin."); return; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 76db1ed..83e697d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; @@ -33,16 +34,17 @@ import org.apache.drill.exec.rpc.RemoteConnection; import com.google.common.base.Preconditions; -public abstract class AbstractDataCollector implements DataCollector{ +public abstract class AbstractDataCollector implements DataCollector, ReadController{ private final List<DrillbitEndpoint> incoming; private final int oppositeMajorFragmentId; private final AtomicIntegerArray remainders; private final AtomicInteger remainingRequired; protected final RawBatchBuffer[] buffers; + private final AtomicReferenceArray<RemoteConnection> connections; private final AtomicInteger parentAccounter; private final AtomicInteger finishedStreams = new AtomicInteger(); - + public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired, FragmentContext context) { Preconditions.checkArgument(minInputsRequired > 0); Preconditions.checkNotNull(receiver); @@ -50,14 +52,15 @@ public abstract class AbstractDataCollector implements DataCollector{ this.parentAccounter = parentAccounter; this.incoming = receiver.getProvidingEndpoints(); + this.connections = new AtomicReferenceArray<>(incoming.size()); this.remainders = new AtomicIntegerArray(incoming.size()); this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId(); this.buffers = new RawBatchBuffer[minInputsRequired]; try { String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL); - Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class); + Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, ReadController.class, int.class); for(int i = 0; i < buffers.length; i++) { - buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context); + buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, this, incoming.size()); } } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { @@ -77,15 +80,35 @@ public abstract class AbstractDataCollector implements DataCollector{ public RawBatchBuffer[] getBuffers(){ return buffers; } - + + + public void setAutoRead(boolean enabled){ + for(int i = 0; i < connections.length(); i++){ + setAutoRead(i, enabled); + } + } + + public void setAutoRead(int minorFragmentId, boolean enabled){ + RemoteConnection c = connections.get(minorFragmentId); + if(c != null) c.setAutoRead(enabled); + } + public abstract void streamFinished(int minorFragmentId); - + public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException { + + // if we received an out of memory, add an item to all the buffer queues. if (batch.getHeader().getIsOutOfMemory()) { for (RawBatchBuffer buffer : buffers) { buffer.enqueue(batch); } } + + // add the connection to the connection list if this is the first time we're seeing it. + // TODO: move this to a better location (e.g. connection setup). + if(connections.compareAndSet(minorFragmentId, null, batch.getConnection())); + + // check to see if we have enough fragments reporting to proceed. boolean decremented = false; if (remainders.compareAndSet(minorFragmentId, 0, 1)) { int rem = remainingRequired.decrementAndGet(); @@ -94,9 +117,13 @@ public abstract class AbstractDataCollector implements DataCollector{ decremented = true; } } + + // mark stream finished if we got the last batch. if(batch.getHeader().getIsLastBatch()){ streamFinished(minorFragmentId); } + + getBuffer(minorFragmentId).enqueue(batch); return decremented; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java index 5579658..23443fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java @@ -32,7 +32,7 @@ public interface RawBatchBuffer extends RawFragmentBatchProvider { /** * Add the next new raw fragment batch to the buffer. - * + * * @param batch * Batch to enqueue * @throws IOException http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java new file mode 100644 index 0000000..d23eb4c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.batch; + +public interface ReadController { + public void setAutoRead(boolean enabled); + public void setAutoRead(int minorFragmentId, boolean enabled); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index c8a1525..8e6eaf7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -77,7 +77,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { private boolean closed = false; private FragmentManager fragmentManager; - public SpoolingRawBatchBuffer(FragmentContext context) throws IOException, OutOfMemoryException { + public SpoolingRawBatchBuffer(FragmentContext context, ReadController readController, int fragmentCount) throws IOException, OutOfMemoryException { this.context = context; this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY); @@ -133,7 +133,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { allocator.close(); } - + @Override public void finished() { finished = true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 4853d32..a726a82 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.rpc.RemoteConnection; import com.google.common.collect.Queues; @@ -32,22 +31,36 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ private final LinkedBlockingDeque<RawFragmentBatch> buffer; private volatile boolean finished = false; - private int softlimit; - private int startlimit; - private AtomicBoolean overlimit = new AtomicBoolean(false); - - public UnlimitedRawBatchBuffer(FragmentContext context) { - softlimit = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE); - startlimit = softlimit/2; - buffer = Queues.newLinkedBlockingDeque(); + private final int softlimit; + private final int startlimit; + private final AtomicBoolean overlimit = new AtomicBoolean(false); + private final ReadController readController; + private final boolean multiFragment; + + public UnlimitedRawBatchBuffer(FragmentContext context, ReadController readController, int fragmentCount) { + int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE); + + this.multiFragment = fragmentCount > 1; + this.readController = readController; + this.softlimit = bufferSizePerSocket * fragmentCount; + this.startlimit = Math.max(softlimit/2, 1); + this.buffer = Queues.newLinkedBlockingDeque(); } - + + private void setRead(int minorFragmentId, boolean setting){ + if(multiFragment){ + readController.setAutoRead(setting); + }else{ + readController.setAutoRead(minorFragmentId, setting); + } + } + @Override public void enqueue(RawFragmentBatch batch) { buffer.add(batch); if(buffer.size() == softlimit){ overlimit.set(true); - batch.getConnection().setAutoRead(false); + setRead(batch.getHeader().getSendingMinorFragmentId(), false); } } @@ -64,7 +77,6 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } } - @Override public void finished() { finished = true; @@ -72,11 +84,11 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public RawFragmentBatch getNext(){ - + RawFragmentBatch b = null; - + b = buffer.poll(); - + // if we didn't get a buffer, block on waiting for buffer. if(b == null && !finished){ try { @@ -85,18 +97,19 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ return null; } } - + // if we are in the overlimit condition and aren't finished, check if we've passed the start limit. If so, turn off the overlimit condition and set auto read to true (start reading from socket again). if(!finished && overlimit.get()){ if(buffer.size() == startlimit){ overlimit.set(false); - b.getConnection().setAutoRead(true); + setRead(b.getHeader().getSendingMinorFragmentId(), true); + readController.setAutoRead(true); } } - + return b; - + } - + }