Update Data Collection and batch buffers to one or all streams for auto read 
depending on use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb2091ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb2091ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb2091ad

Branch: refs/heads/master
Commit: fb2091ad74bc677b309e5d32d375729dd5bf45bc
Parents: 8adb527
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sun Jun 8 09:35:28 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Sun Jun 8 09:41:19 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/logical/DrillJoinRule.java     |  2 +-
 .../exec/work/batch/AbstractDataCollector.java  | 39 +++++++++++---
 .../drill/exec/work/batch/RawBatchBuffer.java   |  2 +-
 .../drill/exec/work/batch/ReadController.java   | 23 +++++++++
 .../exec/work/batch/SpoolingRawBatchBuffer.java |  4 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     | 53 ++++++++++++--------
 6 files changed, 93 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
index e940b2e..0f63140 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
@@ -90,7 +90,7 @@ public class DrillJoinRule extends RelOptRule {
         }
         newJoinCondition = RexUtil.composeConjunction(builder, equijoinList, 
false);
       } else {
-        tracer.warning("Non-equijoins are only supported in the presence of an 
equijoin.");
+//        tracer.warning("Non-equijoins are only supported in the presence of 
an equijoin.");
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 76db1ed..83e697d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -33,16 +34,17 @@ import org.apache.drill.exec.rpc.RemoteConnection;
 
 import com.google.common.base.Preconditions;
 
-public abstract class AbstractDataCollector implements DataCollector{
+public abstract class AbstractDataCollector implements DataCollector, 
ReadController{
 
   private final List<DrillbitEndpoint> incoming;
   private final int oppositeMajorFragmentId;
   private final AtomicIntegerArray remainders;
   private final AtomicInteger remainingRequired;
   protected final RawBatchBuffer[] buffers;
+  private final AtomicReferenceArray<RemoteConnection> connections;
   private final AtomicInteger parentAccounter;
   private final AtomicInteger finishedStreams = new AtomicInteger();
-  
+
   public AbstractDataCollector(AtomicInteger parentAccounter, Receiver 
receiver, int minInputsRequired, FragmentContext context) {
     Preconditions.checkArgument(minInputsRequired > 0);
     Preconditions.checkNotNull(receiver);
@@ -50,14 +52,15 @@ public abstract class AbstractDataCollector implements 
DataCollector{
 
     this.parentAccounter = parentAccounter;
     this.incoming = receiver.getProvidingEndpoints();
+    this.connections = new AtomicReferenceArray<>(incoming.size());
     this.remainders = new AtomicIntegerArray(incoming.size());
     this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
     this.buffers = new RawBatchBuffer[minInputsRequired];
     try {
       String bufferClassName = 
context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
-      Constructor<?> bufferConstructor = 
Class.forName(bufferClassName).getConstructor(FragmentContext.class);
+      Constructor<?> bufferConstructor = 
Class.forName(bufferClassName).getConstructor(FragmentContext.class, 
ReadController.class, int.class);
       for(int i = 0; i < buffers.length; i++) {
-          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context);
+          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, 
this, incoming.size());
       }
     } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException |
             NoSuchMethodException | ClassNotFoundException e) {
@@ -77,15 +80,35 @@ public abstract class AbstractDataCollector implements 
DataCollector{
   public RawBatchBuffer[] getBuffers(){
     return buffers;
   }
-  
+
+
+  public void setAutoRead(boolean enabled){
+    for(int i = 0; i < connections.length(); i++){
+      setAutoRead(i, enabled);
+    }
+  }
+
+  public void setAutoRead(int minorFragmentId, boolean enabled){
+    RemoteConnection c = connections.get(minorFragmentId);
+    if(c != null) c.setAutoRead(enabled);
+  }
+
   public abstract void streamFinished(int minorFragmentId);
-  
+
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch)  
throws IOException {
+
+    // if we received an out of memory, add an item to all the buffer queues.
     if (batch.getHeader().getIsOutOfMemory()) {
       for (RawBatchBuffer buffer : buffers) {
         buffer.enqueue(batch);
       }
     }
+
+    // add the connection to the connection list if this is the first time 
we're seeing it.
+    // TODO: move this to a better location (e.g. connection setup).
+    if(connections.compareAndSet(minorFragmentId, null, 
batch.getConnection()));
+
+    // check to see if we have enough fragments reporting to proceed.
     boolean decremented = false;
     if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
       int rem = remainingRequired.decrementAndGet();
@@ -94,9 +117,13 @@ public abstract class AbstractDataCollector implements 
DataCollector{
         decremented = true;
       }
     }
+
+    // mark stream finished if we got the last batch.
     if(batch.getHeader().getIsLastBatch()){
       streamFinished(minorFragmentId);
     }
+
+
     getBuffer(minorFragmentId).enqueue(batch);
     return decremented;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 5579658..23443fa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -32,7 +32,7 @@ public interface RawBatchBuffer extends 
RawFragmentBatchProvider {
 
   /**
    * Add the next new raw fragment batch to the buffer.
-   * 
+   *
    * @param batch
    *          Batch to enqueue
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java
new file mode 100644
index 0000000..d23eb4c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ReadController.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+public interface ReadController {
+  public void setAutoRead(boolean enabled);
+  public void setAutoRead(int minorFragmentId, boolean enabled);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index c8a1525..8e6eaf7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -77,7 +77,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer 
{
   private boolean closed = false;
   private FragmentManager fragmentManager;
 
-  public SpoolingRawBatchBuffer(FragmentContext context) throws IOException, 
OutOfMemoryException {
+  public SpoolingRawBatchBuffer(FragmentContext context, ReadController 
readController, int fragmentCount) throws IOException, OutOfMemoryException {
     this.context = context;
     this.allocator = 
context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, 
ALLOCATOR_MAX_RESERVATION);
     this.threshold = 
context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
@@ -133,7 +133,7 @@ public class SpoolingRawBatchBuffer implements 
RawBatchBuffer {
     allocator.close();
   }
 
-  
+
   @Override
   public void finished() {
     finished = true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb2091ad/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 4853d32..a726a82 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.RemoteConnection;
 
 import com.google.common.collect.Queues;
 
@@ -32,22 +31,36 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
 
   private final LinkedBlockingDeque<RawFragmentBatch> buffer;
   private volatile boolean finished = false;
-  private int softlimit;
-  private int startlimit;
-  private AtomicBoolean overlimit = new AtomicBoolean(false);
-
-  public UnlimitedRawBatchBuffer(FragmentContext context) {
-    softlimit = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
-    startlimit = softlimit/2;
-    buffer = Queues.newLinkedBlockingDeque();
+  private final int softlimit;
+  private final int startlimit;
+  private final AtomicBoolean overlimit = new AtomicBoolean(false);
+  private final ReadController readController;
+  private final boolean multiFragment;
+
+  public UnlimitedRawBatchBuffer(FragmentContext context, ReadController 
readController, int fragmentCount) {
+    int bufferSizePerSocket = 
context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+
+    this.multiFragment = fragmentCount > 1;
+    this.readController = readController;
+    this.softlimit = bufferSizePerSocket * fragmentCount;
+    this.startlimit = Math.max(softlimit/2, 1);
+    this.buffer = Queues.newLinkedBlockingDeque();
   }
-  
+
+  private void setRead(int minorFragmentId, boolean setting){
+    if(multiFragment){
+      readController.setAutoRead(setting);
+    }else{
+      readController.setAutoRead(minorFragmentId, setting);
+    }
+  }
+
   @Override
   public void enqueue(RawFragmentBatch batch) {
     buffer.add(batch);
     if(buffer.size() == softlimit){
       overlimit.set(true);
-      batch.getConnection().setAutoRead(false);
+      setRead(batch.getHeader().getSendingMinorFragmentId(), false);
     }
   }
 
@@ -64,7 +77,6 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
     }
   }
 
-  
   @Override
   public void finished() {
     finished = true;
@@ -72,11 +84,11 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
 
   @Override
   public RawFragmentBatch getNext(){
-    
+
     RawFragmentBatch b = null;
-    
+
     b = buffer.poll();
-    
+
     // if we didn't get a buffer, block on waiting for buffer.
     if(b == null && !finished){
       try {
@@ -85,18 +97,19 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
         return null;
       }
     }
-    
+
     // if we are in the overlimit condition and aren't finished, check if 
we've passed the start limit.  If so, turn off the overlimit condition and set 
auto read to true (start reading from socket again).
     if(!finished && overlimit.get()){
       if(buffer.size() == startlimit){
         overlimit.set(false);
-        b.getConnection().setAutoRead(true);
+        setRead(b.getHeader().getSendingMinorFragmentId(), true);
+        readController.setAutoRead(true);
       }
     }
-    
+
     return b;
-    
+
   }
 
-  
+
 }

Reply via email to