DRILL-1055: Add ProducerConsumer operator to scans

This can be disabled. The queue size is configurable


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/208d511b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/208d511b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/208d511b

Branch: refs/heads/master
Commit: 208d511beebf2369ff453df09843e16f1433fd36
Parents: af29c25
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Sat Jun 21 17:11:31 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Jun 25 09:10:12 2014 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   6 +
 .../exec/physical/base/PhysicalVisitor.java     |   2 +
 .../exec/physical/config/ProducerConsumer.java  |  58 ++++++
 .../impl/producer/ProducerConsumerBatch.java    | 199 +++++++++++++++++++
 .../producer/ProducerConsumerBatchCreator.java  |  36 ++++
 .../exec/planner/physical/PlannerSettings.java  |   3 +
 .../planner/physical/ProducerConsumerPrel.java  |  78 ++++++++
 .../drill/exec/planner/physical/ScanPrel.java   |   2 +-
 .../visitor/ProducerConsumerPrelVisitor.java    |  59 ++++++
 .../planner/sql/handlers/DefaultSqlHandler.java |  13 +-
 .../server/options/SystemOptionManager.java     |   2 +
 .../apache/drill/exec/proto/UserBitShared.java  |  16 +-
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 14 files changed, 473 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index d7d7c12..5e85425 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.RangeSender;
@@ -186,6 +187,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitProducerConsumer(ProducerConsumer op, X value) throws E {
+    return visitOp(op, value);
+  }
+
+  @Override
   public T visitIteratorValidator(IteratorValidator op, X value) throws E {
     return visitOp(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 6d5a6cb..f0b0b9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
+import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.RangeSender;
@@ -79,6 +80,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
   public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+  public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws 
EXCEP;
   
   public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) 
throws EXCEP;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java
new file mode 100644
index 0000000..87655d1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+@JsonTypeName("producer-consumer")
+public class ProducerConsumer extends AbstractSingle{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProducerConsumer.class);
+
+  private final int size;
+
+  @JsonCreator
+  public ProducerConsumer(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("size") int size) {
+    super(child);
+    this.size = size;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitProducerConsumer(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new ProducerConsumer(child, size);
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PRODUCER_CONSUMER_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
new file mode 100644
index 0000000..9ec07de
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.producer;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.ProducerConsumer;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+public class ProducerConsumerBatch extends AbstractRecordBatch {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
+
+  private RecordBatch incoming;
+  private Thread producer = new Thread(new Producer(), 
Thread.currentThread().getName() + " - Producer Thread");
+  private boolean running = false;
+  private BlockingDeque<RecordBatchDataWrapper> queue;
+  private int recordCount;
+  private BatchSchema schema;
+  private boolean stop = false;
+
+  protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
+    super(popConfig, context);
+    this.incoming = incoming;
+    this.queue = new LinkedBlockingDeque<>(popConfig.getSize());
+  }
+
+  @Override
+  public IterOutcome innerNext() {
+    if (!running) {
+      producer.start();
+      running = true;
+    }
+    RecordBatchDataWrapper wrapper;
+    try {
+      stats.startWait();
+      wrapper = queue.take();
+      logger.debug("Got batch from queue");
+    } catch (InterruptedException e) {
+      context.fail(e);
+      return IterOutcome.STOP;
+    } finally {
+      stats.stopWait();
+    }
+    if (wrapper.finished) {
+      return IterOutcome.NONE;
+    }
+
+    recordCount = wrapper.batch.getRecordCount();
+    boolean newSchema = load(wrapper.batch);
+
+    return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+  }
+
+  private boolean load(RecordBatchData batch) {
+    VectorContainer newContainer = batch.getContainer();
+    if (schema != null && newContainer.getSchema().equals(schema)) {
+      container.zeroVectors();
+      BatchSchema schema = container.getSchema();
+      for (int i = 0; i < container.getNumberOfColumns(); i++) {
+        MaterializedField field = schema.getColumn(i);
+        MajorType type = field.getType();
+        ValueVector vOut = 
container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(),
 type.getMode()),
+                
container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
+        ValueVector vIn = 
newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(),
 type.getMode()),
+                
newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector();
+        TransferPair tp = vIn.makeTransferPair(vOut);
+        tp.transfer();
+      }
+      return false;
+    } else {
+      container.clear();
+      for (VectorWrapper w : newContainer) {
+        container.add(w.getValueVector());
+      }
+      container.buildSchema(SelectionVectorMode.NONE);
+      schema = container.getSchema();
+      return true;
+    }
+  }
+
+  private class Producer implements Runnable {
+
+    @Override
+    public void run() {
+      if (stop) return;
+      outer: while (true) {
+        IterOutcome upstream = incoming.next();
+        switch (upstream) {
+          case NONE:
+            break outer;
+          case STOP:
+            try {
+              queue.putFirst(new RecordBatchDataWrapper(null, false, true));
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          case OK_NEW_SCHEMA:
+          case OK:
+            try {
+              if (!stop) queue.put(new RecordBatchDataWrapper(new 
RecordBatchData(incoming), false, false));
+            } catch (InterruptedException e) {
+              context.fail(e);
+              try {
+                queue.putFirst(new RecordBatchDataWrapper(null, false, true));
+              } catch (InterruptedException e1) {
+                throw new RuntimeException(e1);
+              }
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException();
+        }
+      }
+      try {
+        queue.put(new RecordBatchDataWrapper(null, true, false));
+      } catch (InterruptedException e) {
+        context.fail(e);
+        try {
+          queue.putFirst(new RecordBatchDataWrapper(null, false, true));
+        } catch (InterruptedException e1) {
+          throw new RuntimeException(e1);
+        }
+      }
+      logger.debug("Producer thread finished");
+    }
+  }
+
+  private void clearQueue() {
+    RecordBatchDataWrapper wrapper;
+    while ((wrapper = queue.poll()) != null) {
+      wrapper.batch.getContainer().clear();
+    }
+  }
+
+  @Override
+  protected void killIncoming() {
+    incoming.kill();
+  }
+
+  @Override
+  public void cleanup() {
+    stop = true;
+    clearQueue();
+    super.cleanup();
+    incoming.cleanup();
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  private static class RecordBatchDataWrapper {
+    RecordBatchData batch;
+    boolean finished;
+    boolean failed;
+
+    RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean 
failed) {
+      this.batch = batch;
+      this.finished = finished;
+      this.failed = failed;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
new file mode 100644
index 0000000..0fcf4f3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.producer;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.ProducerConsumer;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.producer.ProducerConsumerBatch;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class ProducerConsumerBatchCreator implements 
BatchCreator<ProducerConsumer> {
+  @Override
+  public RecordBatch getBatch(FragmentContext context, ProducerConsumer 
config, List<RecordBatch> children) throws ExecutionSetupException {
+    return new ProducerConsumerBatch(config, context, 
Iterables.getOnlyElement(children));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index e10b620..ae0ac32 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
+import org.apache.drill.exec.server.options.TypeValidators.LongValidator;
 import 
org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator;
 import 
org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator;
 
@@ -43,6 +44,8 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator BROADCAST = new 
BooleanValidator("planner.enable_broadcast_join", true);
   public static final OptionValidator BROADCAST_THRESHOLD = new 
PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 
1000000);
   public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new 
RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d);
+  public static final OptionValidator PRODUCER_CONSUMER = new 
BooleanValidator("planner.add_producer_consumer", true);
+  public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new 
LongValidator("planner.producer_consumer_queue_size", 10);
 
   public OptionManager options = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java
new file mode 100644
index 0000000..35aebdc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ProducerConsumer;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class ProducerConsumerPrel extends SingleRel implements DrillRelNode, 
Prel {
+
+  private int queueSize;
+
+  public ProducerConsumerPrel(RelNode child, int queueSize) {
+    super(child.getCluster(), child.getTraitSet(), child);
+    this.queueSize = queueSize;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new ProducerConsumerPrel(sole(inputs), queueSize);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    Prel child = (Prel) this.getChild();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    ProducerConsumer pop = new ProducerConsumer(childPOP, queueSize);
+
+    return creator.addMetadata(this, pop);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 9bc71c0..fc62331 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -134,7 +134,7 @@ public class ScanPrel extends AbstractRelNode implements 
DrillScanPrel {
 
   @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
+    return logicalVisitor.visitScan(this, value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java
new file mode 100644
index 0000000..575b4b4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProducerConsumerPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.physical.SelectionVectorRemoverPrel;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+
+import java.util.List;
+
+
+public class ProducerConsumerPrelVisitor extends BasePrelVisitor<Prel, Void, 
RuntimeException>{
+
+  private int queueSize;
+
+  public static Prel addProducerConsumerToScans(Prel prel, int queueSize){
+
+    return prel.accept(new ProducerConsumerPrelVisitor(queueSize), null);
+  }
+
+  public ProducerConsumerPrelVisitor(int queueSize) {
+    this.queueSize = queueSize;
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      children.add(child.accept(this, null));
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitScan(ScanPrel scanPrel, Void value) {
+    return new ProducerConsumerPrel(scanPrel, queueSize);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 21420df..14db66c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -40,12 +40,14 @@ import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
 import 
org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
 import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
+import 
org.apache.drill.exec.planner.physical.visitor.ProducerConsumerPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import 
org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
@@ -178,6 +180,15 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     phyRelNode = 
SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
 
     /* 5.)
+     * Add ProducerConsumer after each scan if the option is set
+     * Use the configured queueSize
+     */
+    if 
(context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER.getOptionName()).bool_val)
 {
+      long queueSize = 
context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE.getOptionName()).num_val;
+      phyRelNode = 
ProducerConsumerPrelVisitor.addProducerConsumerToScans(phyRelNode, (int) 
queueSize);
+    }
+
+    /* 6.)
      * if the client does not support complex types (Map, Repeated)
      * insert a project which which would convert
      */
@@ -186,7 +197,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
     }
 
-    /* 6.)
+    /* 7.)
      * Finally, Make sure that the no rels are repeats.
      * This could happen in the case of querying the same table twice as Optiq 
may canonicalize these.
      */

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a42640f..424d7ff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager{
       PlannerSettings.BROADCAST,
       PlannerSettings.BROADCAST_THRESHOLD,
       PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR,
+      PlannerSettings.PRODUCER_CONSUMER,
+      PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
       ExecConstants.SLICE_TARGET_OPTION,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 2f4a58c..64cbcbf 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -458,6 +458,10 @@ public final class UserBitShared {
      * <code>COMPLEX_TO_JSON = 31;</code>
      */
     COMPLEX_TO_JSON(31, 31),
+    /**
+     * <code>PRODUCER_CONSUMER = 32;</code>
+     */
+    PRODUCER_CONSUMER(32, 32),
     ;
 
     /**
@@ -588,6 +592,10 @@ public final class UserBitShared {
      * <code>COMPLEX_TO_JSON = 31;</code>
      */
     public static final int COMPLEX_TO_JSON_VALUE = 31;
+    /**
+     * <code>PRODUCER_CONSUMER = 32;</code>
+     */
+    public static final int PRODUCER_CONSUMER_VALUE = 32;
 
 
     public final int getNumber() { return value; }
@@ -626,6 +634,7 @@ public final class UserBitShared {
         case 29: return JSON_SUB_SCAN;
         case 30: return INFO_SCHEMA_SUB_SCAN;
         case 31: return COMPLEX_TO_JSON;
+        case 32: return PRODUCER_CONSUMER;
         default: return null;
       }
     }
@@ -16530,7 +16539,7 @@ public final class UserBitShared {
       
"\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen"
 +
       "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
       
"ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE"
 +
-      
"LLED\020\004\022\n\n\006FAILED\020\005*\372\004\n\020CoreOperatorType\022" +
+      
"LLED\020\004\022\n\n\006FAILED\020\005*\221\005\n\020CoreOperatorType\022" +
       "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
       
"\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
       "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" 
+
@@ -16546,8 +16555,9 @@ public final class UserBitShared {
       
"UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
       "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
       
"B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" +
-      "MPLEX_TO_JSON\020\037B.\n\033org.apache.drill.exec" +
-      ".protoB\rUserBitSharedH\001"
+      "MPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 B." +
+      "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
+      "aredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 0485a95..e8039c9 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -53,7 +53,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     TEXT_SUB_SCAN(28),
     JSON_SUB_SCAN(29),
     INFO_SCHEMA_SUB_SCAN(30),
-    COMPLEX_TO_JSON(31);
+    COMPLEX_TO_JSON(31),
+    PRODUCER_CONSUMER(32);
     
     public final int number;
     
@@ -103,6 +104,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 29: return JSON_SUB_SCAN;
             case 30: return INFO_SCHEMA_SUB_SCAN;
             case 31: return COMPLEX_TO_JSON;
+            case 32: return PRODUCER_CONSUMER;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index 6fd73db..10dce1f 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -199,4 +199,5 @@ enum CoreOperatorType {
   JSON_SUB_SCAN = 29;
   INFO_SCHEMA_SUB_SCAN = 30;
   COMPLEX_TO_JSON = 31;
+  PRODUCER_CONSUMER = 32;
 }

Reply via email to