DRILL-311: Replace OrderedPartitionBatchCreator with OrderedPartitionSenderCreator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ba5e6520 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ba5e6520 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ba5e6520 Branch: refs/heads/master Commit: ba5e65207bac38519bc199ed95535932abab2908 Parents: 622aad0 Author: Aditya Kishore <[email protected]> Authored: Sun Dec 1 19:35:46 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Sun Dec 1 19:35:46 2013 -0800 ---------------------------------------------------------------------- .../drill/exec/physical/impl/ImplCreator.java | 27 +++++++---- .../OrderedPartitionBatchCreator.java | 39 ---------------- .../OrderedPartitionSenderCreator.java | 47 ++++++++++++++++++++ .../PartitionSenderRootExec.java | 2 +- 4 files changed, 67 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index efc0f5b..3e4c1eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -26,12 +26,27 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.SubScan; -import org.apache.drill.exec.physical.config.*; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.IteratorValidator; +import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.MergeJoinPOP; +import org.apache.drill.exec.physical.config.MergingReceiverPOP; +import org.apache.drill.exec.physical.config.OrderedPartitionSender; +import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.config.RandomReceiver; +import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.physical.config.SelectionVectorRemover; +import org.apache.drill.exec.physical.config.SingleSender; +import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.config.Trace; +import org.apache.drill.exec.physical.config.Union; import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator; import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator; import org.apache.drill.exec.physical.impl.join.MergeJoinCreator; import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator; -import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionBatchCreator; +import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionSenderCreator; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator; import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator; import org.apache.drill.exec.physical.impl.sort.SortBatchCreator; @@ -64,10 +79,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private MergingReceiverCreator mrc = new MergingReceiverCreator(); private RandomReceiverCreator rrc = new RandomReceiverCreator(); private PartitionSenderCreator hsc = new PartitionSenderCreator(); - private OrderedPartitionBatchCreator opc = new OrderedPartitionBatchCreator(); + private OrderedPartitionSenderCreator opsc = new OrderedPartitionSenderCreator(); private SingleSenderCreator ssc = new SingleSenderCreator(); private ProjectBatchCreator pbc = new ProjectBatchCreator(); - private OrderedPartitionBatchCreator smplbc = new OrderedPartitionBatchCreator(); private FilterBatchCreator fbc = new FilterBatchCreator(); private LimitBatchCreator lbc = new LimitBatchCreator(); private UnionBatchCreator unionbc = new UnionBatchCreator(); @@ -150,10 +164,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo @Override public RecordBatch visitOrderedPartitionSender(OrderedPartitionSender op, FragmentContext context) throws ExecutionSetupException { - List<RecordBatch> children = Lists.newArrayList(); - children.add(opc.getBatch(context, op, getChildren(op, context))); - HashPartitionSender config = new HashPartitionSender(op.getOppositeMajorFragmentId(), op, op.getRef(),op.getDestinations()); - root = hsc.getRoot(context, config, children); + root = opsc.getRoot(context, op, getChildren(op, context)); return null; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java deleted file mode 100644 index 615cf21..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionBatchCreator.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.orderedpartitioner; - -import com.google.common.base.Preconditions; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.OrderedPartitionSender; -import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.record.RecordBatch; - -import java.util.List; - -public class OrderedPartitionBatchCreator implements BatchCreator<OrderedPartitionSender>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionBatchCreator.class); - - @Override - public RecordBatch getBatch(FragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { - Preconditions.checkArgument(children.size() == 1); - return new OrderedPartitionRecordBatch(config, children.iterator().next(), context); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java new file mode 100644 index 0000000..c0ba8f9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.orderedpartitioner; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.config.OrderedPartitionSender; +import org.apache.drill.exec.physical.impl.RootCreator; +import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> { + + @Override + public RootExec getRoot(FragmentContext context, OrderedPartitionSender config, + List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + + List<RecordBatch> ordered_children = Lists.newArrayList(); + ordered_children.add(new OrderedPartitionRecordBatch(config, children.iterator().next(), context)); + HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations()); + return new PartitionSenderRootExec(context, ordered_children.iterator().next(), hpc); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ba5e6520/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 19adee7..bc53bd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -45,7 +45,7 @@ import com.sun.codemodel.JMod; import com.sun.codemodel.JType; import com.sun.codemodel.JVar; -class PartitionSenderRootExec implements RootExec { +public class PartitionSenderRootExec implements RootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming;
