DRILL-1055: Add ProducerConsumer operator to scans This can be disabled. The queue size is configurable
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/208d511b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/208d511b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/208d511b Branch: refs/heads/master Commit: 208d511beebf2369ff453df09843e16f1433fd36 Parents: af29c25 Author: Steven Phillips <sphill...@maprtech.com> Authored: Sat Jun 21 17:11:31 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Jun 25 09:10:12 2014 -0700 ---------------------------------------------------------------------- .../physical/base/AbstractPhysicalVisitor.java | 6 + .../exec/physical/base/PhysicalVisitor.java | 2 + .../exec/physical/config/ProducerConsumer.java | 58 ++++++ .../impl/producer/ProducerConsumerBatch.java | 199 +++++++++++++++++++ .../producer/ProducerConsumerBatchCreator.java | 36 ++++ .../exec/planner/physical/PlannerSettings.java | 3 + .../planner/physical/ProducerConsumerPrel.java | 78 ++++++++ .../drill/exec/planner/physical/ScanPrel.java | 2 +- .../visitor/ProducerConsumerPrelVisitor.java | 59 ++++++ .../planner/sql/handlers/DefaultSqlHandler.java | 13 +- .../server/options/SystemOptionManager.java | 2 + .../apache/drill/exec/proto/UserBitShared.java | 16 +- .../exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 14 files changed, 473 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index d7d7c12..5e85425 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; +import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; @@ -186,6 +187,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitProducerConsumer(ProducerConsumer op, X value) throws E { + return visitOp(op, value); + } + + @Override public T visitIteratorValidator(IteratorValidator op, X value) throws E { return visitOp(op, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 6d5a6cb..f0b0b9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; +import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; @@ -79,6 +80,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; + public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java new file mode 100644 index 0000000..87655d1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +@JsonTypeName("producer-consumer") +public class ProducerConsumer extends AbstractSingle{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumer.class); + + private final int size; + + @JsonCreator + public ProducerConsumer(@JsonProperty("child") PhysicalOperator child, @JsonProperty("size") int size) { + super(child); + this.size = size; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitProducerConsumer(this, value); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new ProducerConsumer(child, size); + } + + public int getSize() { + return size; + } + + @Override + public int getOperatorType() { + return CoreOperatorType.PRODUCER_CONSUMER_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java new file mode 100644 index 0000000..9ec07de --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.producer; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.ProducerConsumer; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +public class ProducerConsumerBatch extends AbstractRecordBatch { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class); + + private RecordBatch incoming; + private Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread"); + private boolean running = false; + private BlockingDeque<RecordBatchDataWrapper> queue; + private int recordCount; + private BatchSchema schema; + private boolean stop = false; + + protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + super(popConfig, context); + this.incoming = incoming; + this.queue = new LinkedBlockingDeque<>(popConfig.getSize()); + } + + @Override + public IterOutcome innerNext() { + if (!running) { + producer.start(); + running = true; + } + RecordBatchDataWrapper wrapper; + try { + stats.startWait(); + wrapper = queue.take(); + logger.debug("Got batch from queue"); + } catch (InterruptedException e) { + context.fail(e); + return IterOutcome.STOP; + } finally { + stats.stopWait(); + } + if (wrapper.finished) { + return IterOutcome.NONE; + } + + recordCount = wrapper.batch.getRecordCount(); + boolean newSchema = load(wrapper.batch); + + return newSchema ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + } + + private boolean load(RecordBatchData batch) { + VectorContainer newContainer = batch.getContainer(); + if (schema != null && newContainer.getSchema().equals(schema)) { + container.zeroVectors(); + BatchSchema schema = container.getSchema(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + MaterializedField field = schema.getColumn(i); + MajorType type = field.getType(); + ValueVector vOut = container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()), + container.getValueVectorId(field.getPath()).getFieldIds()).getValueVector(); + ValueVector vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()), + newContainer.getValueVectorId(field.getPath()).getFieldIds()).getValueVector(); + TransferPair tp = vIn.makeTransferPair(vOut); + tp.transfer(); + } + return false; + } else { + container.clear(); + for (VectorWrapper w : newContainer) { + container.add(w.getValueVector()); + } + container.buildSchema(SelectionVectorMode.NONE); + schema = container.getSchema(); + return true; + } + } + + private class Producer implements Runnable { + + @Override + public void run() { + if (stop) return; + outer: while (true) { + IterOutcome upstream = incoming.next(); + switch (upstream) { + case NONE: + break outer; + case STOP: + try { + queue.putFirst(new RecordBatchDataWrapper(null, false, true)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + case OK_NEW_SCHEMA: + case OK: + try { + if (!stop) queue.put(new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false)); + } catch (InterruptedException e) { + context.fail(e); + try { + queue.putFirst(new RecordBatchDataWrapper(null, false, true)); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + } + break; + default: + throw new UnsupportedOperationException(); + } + } + try { + queue.put(new RecordBatchDataWrapper(null, true, false)); + } catch (InterruptedException e) { + context.fail(e); + try { + queue.putFirst(new RecordBatchDataWrapper(null, false, true)); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + } + logger.debug("Producer thread finished"); + } + } + + private void clearQueue() { + RecordBatchDataWrapper wrapper; + while ((wrapper = queue.poll()) != null) { + wrapper.batch.getContainer().clear(); + } + } + + @Override + protected void killIncoming() { + incoming.kill(); + } + + @Override + public void cleanup() { + stop = true; + clearQueue(); + super.cleanup(); + incoming.cleanup(); + } + + @Override + public int getRecordCount() { + return recordCount; + } + + private static class RecordBatchDataWrapper { + RecordBatchData batch; + boolean finished; + boolean failed; + + RecordBatchDataWrapper(RecordBatchData batch, boolean finished, boolean failed) { + this.batch = batch; + this.finished = finished; + this.failed = failed; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java new file mode 100644 index 0000000..0fcf4f3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.producer; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.ProducerConsumer; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.producer.ProducerConsumerBatch; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { + @Override + public RecordBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException { + return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index e10b620..ae0ac32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValidator; import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; +import org.apache.drill.exec.server.options.TypeValidators.LongValidator; import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator; import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator; @@ -43,6 +44,8 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true); public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000); public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d); + public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true); + public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public OptionManager options = null; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java new file mode 100644 index 0000000..35aebdc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProducerConsumerPrel.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.ProducerConsumer; +import org.apache.drill.exec.planner.common.DrillRelNode; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SingleRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class ProducerConsumerPrel extends SingleRel implements DrillRelNode, Prel { + + private int queueSize; + + public ProducerConsumerPrel(RelNode child, int queueSize) { + super(child.getCluster(), child.getTraitSet(), child); + this.queueSize = queueSize; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new ProducerConsumerPrel(sole(inputs), queueSize); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + ProducerConsumer pop = new ProducerConsumer(childPOP, queueSize); + + return creator.addMetadata(this, pop); + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 9bc71c0..fc62331 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -134,7 +134,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { @Override public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { - return logicalVisitor.visitPrel(this, value); + return logicalVisitor.visitScan(this, value); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java new file mode 100644 index 0000000..575b4b4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ProducerConsumerPrelVisitor.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ProducerConsumerPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.planner.physical.SelectionVectorRemoverPrel; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.eigenbase.rel.RelNode; + +import java.util.List; + + +public class ProducerConsumerPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ + + private int queueSize; + + public static Prel addProducerConsumerToScans(Prel prel, int queueSize){ + + return prel.accept(new ProducerConsumerPrelVisitor(queueSize), null); + } + + public ProducerConsumerPrelVisitor(int queueSize) { + this.queueSize = queueSize; + } + + @Override + public Prel visitPrel(Prel prel, Void value) throws RuntimeException { + List<RelNode> children = Lists.newArrayList(); + for(Prel child : prel){ + children.add(child.accept(this, null)); + } + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + @Override + public Prel visitScan(ScanPrel scanPrel, Void value) { + return new ProducerConsumerPrel(scanPrel, queueSize); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 21420df..14db66c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -40,12 +40,14 @@ import org.apache.drill.exec.planner.logical.DrillScreenRel; import org.apache.drill.exec.planner.logical.DrillStoreRel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.explain.PrelSequencer; import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier; import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer; import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor; +import org.apache.drill.exec.planner.physical.visitor.ProducerConsumerPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor; import org.apache.drill.exec.planner.sql.DrillSqlWorker; @@ -178,6 +180,15 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode); /* 5.) + * Add ProducerConsumer after each scan if the option is set + * Use the configured queueSize + */ + if (context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER.getOptionName()).bool_val) { + long queueSize = context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE.getOptionName()).num_val; + phyRelNode = ProducerConsumerPrelVisitor.addProducerConsumerToScans(phyRelNode, (int) queueSize); + } + + /* 6.) * if the client does not support complex types (Map, Repeated) * insert a project which which would convert */ @@ -186,7 +197,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode); } - /* 6.) + /* 7.) * Finally, Make sure that the no rels are repeats. * This could happen in the case of querying the same table twice as Optiq may canonicalize these. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index a42640f..424d7ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager{ PlannerSettings.BROADCAST, PlannerSettings.BROADCAST_THRESHOLD, PlannerSettings.JOIN_ROW_COUNT_ESTIMATE_FACTOR, + PlannerSettings.PRODUCER_CONSUMER, + PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, ExecConstants.SLICE_TARGET_OPTION, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 2f4a58c..64cbcbf 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -458,6 +458,10 @@ public final class UserBitShared { * <code>COMPLEX_TO_JSON = 31;</code> */ COMPLEX_TO_JSON(31, 31), + /** + * <code>PRODUCER_CONSUMER = 32;</code> + */ + PRODUCER_CONSUMER(32, 32), ; /** @@ -588,6 +592,10 @@ public final class UserBitShared { * <code>COMPLEX_TO_JSON = 31;</code> */ public static final int COMPLEX_TO_JSON_VALUE = 31; + /** + * <code>PRODUCER_CONSUMER = 32;</code> + */ + public static final int PRODUCER_CONSUMER_VALUE = 32; public final int getNumber() { return value; } @@ -626,6 +634,7 @@ public final class UserBitShared { case 29: return JSON_SUB_SCAN; case 30: return INFO_SCHEMA_SUB_SCAN; case 31: return COMPLEX_TO_JSON; + case 32: return PRODUCER_CONSUMER; default: return null; } } @@ -16530,7 +16539,7 @@ public final class UserBitShared { "\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" + "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI", "ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" + - "LLED\020\004\022\n\n\006FAILED\020\005*\372\004\n\020CoreOperatorType\022" + + "LLED\020\004\022\n\n\006FAILED\020\005*\221\005\n\020CoreOperatorType\022" + "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" + "\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" + "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" + @@ -16546,8 +16555,9 @@ public final class UserBitShared { "UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" + "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" + "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" + - "MPLEX_TO_JSON\020\037B.\n\033org.apache.drill.exec" + - ".protoB\rUserBitSharedH\001" + "MPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 B." + + "\n\033org.apache.drill.exec.protoB\rUserBitSh" + + "aredH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 0485a95..e8039c9 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -53,7 +53,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO TEXT_SUB_SCAN(28), JSON_SUB_SCAN(29), INFO_SCHEMA_SUB_SCAN(30), - COMPLEX_TO_JSON(31); + COMPLEX_TO_JSON(31), + PRODUCER_CONSUMER(32); public final int number; @@ -103,6 +104,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 29: return JSON_SUB_SCAN; case 30: return INFO_SCHEMA_SUB_SCAN; case 31: return COMPLEX_TO_JSON; + case 32: return PRODUCER_CONSUMER; default: return null; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/208d511b/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 6fd73db..10dce1f 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -199,4 +199,5 @@ enum CoreOperatorType { JSON_SUB_SCAN = 29; INFO_SCHEMA_SUB_SCAN = 30; COMPLEX_TO_JSON = 31; + PRODUCER_CONSUMER = 32; }