Sent from my iPhone
On May 13, 2013, at 18:52, [email protected] wrote: > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java > > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java > new file mode 100644 > index 0000000..42a15ae > --- /dev/null > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java > @@ -0,0 +1,90 @@ > +/******************************************************************************* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + > ******************************************************************************/ > +package org.apache.drill.exec.physical.base; > + > +import java.util.List; > + > +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; > +import org.apache.drill.exec.physical.OperatorCost; > +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; > + > +import com.fasterxml.jackson.annotation.JsonIgnore; > +import com.fasterxml.jackson.annotation.JsonProperty; > + > +public abstract class AbstractExchange extends AbstractSingle implements > Exchange { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(AbstractExchange.class); > + > + protected int senderMajorFragmentId; > + protected int receiverMajorFragmentId; > + > + public AbstractExchange(PhysicalOperator child) { > + super(child); > + } > + > + /** > + * Exchanges are not executable. The Execution layer first has to set > their parallelization and convert them into > + * something executable > + */ > + @Override > + public boolean isExecutable() { > + return false; > + } > + > + protected abstract void setupSenders(List<DrillbitEndpoint> > senderLocations) throws PhysicalOperatorSetupException ; > + protected abstract void setupReceivers(List<DrillbitEndpoint> > senderLocations) throws PhysicalOperatorSetupException ; > + > + @Override > + public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> > senderLocations) throws PhysicalOperatorSetupException { > + this.senderMajorFragmentId = majorFragmentId; > + setupSenders(senderLocations); > + } > + > + > + @Override > + public final void setupReceivers(int majorFragmentId, > List<DrillbitEndpoint> receiverLocations) throws > PhysicalOperatorSetupException { > + this.receiverMajorFragmentId = majorFragmentId; > + setupReceivers(receiverLocations); > + } > + > + @Override > + public OperatorCost getAggregateSendCost() { > + return getExchangeCost().getSendCost(); > + } > + > + @Override > + public OperatorCost getAggregateReceiveCost() { > + return getExchangeCost().getReceiveCost(); > + } > + > + @Override > + public final <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> > physicalVisitor, X value) throws E { > + return physicalVisitor.visitExchange(this, value); > + } > + > + @Override > + public ExchangeCost getExchangeCost(){ > + return ExchangeCost.getSimpleEstimate(getSize()); > + } > + > + @JsonIgnore > + @Override > + public OperatorCost getCost() { > + return getExchangeCost().getCombinedCost(); > + } > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java > > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java > new file mode 100644 > index 0000000..f782325 > --- /dev/null > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java > @@ -0,0 +1,124 @@ > +/******************************************************************************* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + > ******************************************************************************/ > +package org.apache.drill.exec.physical.base; > + > +import org.apache.drill.exec.physical.config.Filter; > +import org.apache.drill.exec.physical.config.HashPartitionSender; > +import org.apache.drill.exec.physical.config.HashToRandomExchange; > +import org.apache.drill.exec.physical.config.Project; > +import org.apache.drill.exec.physical.config.RandomReceiver; > +import org.apache.drill.exec.physical.config.RangeSender; > +import org.apache.drill.exec.physical.config.Screen; > +import org.apache.drill.exec.physical.config.SingleSender; > +import org.apache.drill.exec.physical.config.Sort; > +import org.apache.drill.exec.physical.config.UnionExchange; > + > +public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> > implements PhysicalVisitor<T, X, E> { > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); > + > + @Override > + public T visitExchange(Exchange exchange, X value) throws E{ > + return visitOp(exchange, value); > + } > + > + @Override > + public T visitFilter(Filter filter, X value) throws E{ > + return visitOp(filter, value); > + } > + > + @Override > + public T visitProject(Project project, X value) throws E{ > + return visitOp(project, value); > + } > + > + @Override > + public T visitSort(Sort sort, X value) throws E{ > + return visitOp(sort, value); > + } > + > + @Override > + public T visitSender(Sender sender, X value) throws E { > + return visitOp(sender, value); > + } > + > + @Override > + public T visitReceiver(Receiver receiver, X value) throws E { > + return visitOp(receiver, value); > + } > + > + @Override > + public T visitScan(Scan<?> scan, X value) throws E{ > + return visitOp(scan, value); > + } > + > + @Override > + public T visitStore(Store store, X value) throws E{ > + return visitOp(store, value); > + } > + > + > + public T visitChildren(PhysicalOperator op, X value) throws E{ > + for(PhysicalOperator child : op){ > + child.accept(this, value); > + } > + return null; > + } > + > + @Override > + public T visitHashPartitionSender(HashPartitionSender op, X value) throws > E { > + return visitSender(op, value); > + } > + > + @Override > + public T visitRandomReceiver(RandomReceiver op, X value) throws E { > + return visitReceiver(op, value); > + } > + > + @Override > + public T visitHashPartitionSender(HashToRandomExchange op, X value) throws > E { > + return visitExchange(op, value); > + } > + > + @Override > + public T visitRangeSender(RangeSender op, X value) throws E { > + return visitSender(op, value); > + } > + > + @Override > + public T visitScreen(Screen op, X value) throws E { > + return visitStore(op, value); > + } > + > + @Override > + public T visitSingleSender(SingleSender op, X value) throws E { > + return visitSender(op, value); > + } > + > + @Override > + public T visitUnionExchange(UnionExchange op, X value) throws E { > + return visitExchange(op, value); > + } > + > + @Override > + public T visitOp(PhysicalOperator op, X value) throws E{ > + throw new UnsupportedOperationException(String.format( > + "The PhysicalVisitor of type %s does not currently support visiting > the PhysicalOperator type %s.", this > + .getClass().getCanonicalName(), > op.getClass().getCanonicalName())); > + } > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java > > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java > new file mode 100644 > index 0000000..e8ba19c > --- /dev/null > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java > @@ -0,0 +1,63 @@ > +/******************************************************************************* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + > ******************************************************************************/ > +package org.apache.drill.exec.physical.base; > + > +import java.util.Iterator; > +import java.util.List; > + > +import org.apache.drill.exec.physical.OperatorCost; > + > +import com.fasterxml.jackson.annotation.JsonIgnore; > +import com.fasterxml.jackson.annotation.JsonProperty; > +import com.google.common.base.Preconditions; > +import com.google.common.collect.Iterators; > + > +public abstract class AbstractReceiver extends AbstractBase implements > Receiver{ > + > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(AbstractReceiver.class); > + > + private final int oppositeMajorFragmentId; > + > + public AbstractReceiver(int oppositeMajorFragmentId){ > + this.oppositeMajorFragmentId = oppositeMajorFragmentId; > + } > + > + @Override > + public Iterator<PhysicalOperator> iterator() { > + return Iterators.emptyIterator(); > + } > + > + @Override > + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> > physicalVisitor, X value) throws E { > + return physicalVisitor.visitReceiver(this, value); > + } > + > + @Override > + public final PhysicalOperator getNewWithChildren(List<PhysicalOperator> > children) { > + Preconditions.checkArgument(children.isEmpty()); > + //rewriting is unnecessary since the inputs haven't changed. > + return this; > + } > + > + @JsonProperty("sender-major-fragment") > + public int getOppositeMajorFragmentId() { > + return oppositeMajorFragmentId; > + } > + > +} > + > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java > > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java > new file mode 100644 > index 0000000..dbde9c5 > --- /dev/null > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java > @@ -0,0 +1,84 @@ > +/******************************************************************************* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + > ******************************************************************************/ > +package org.apache.drill.exec.physical.base; > + > +import java.util.Iterator; > +import java.util.List; > + > +import org.apache.drill.exec.physical.OperatorCost; > +import org.apache.drill.exec.physical.ReadEntry; > + > +import com.fasterxml.jackson.annotation.JsonIgnore; > +import com.fasterxml.jackson.annotation.JsonProperty; > +import com.google.common.collect.Iterators; > + > +public abstract class AbstractScan<R extends ReadEntry> extends AbstractBase > implements Scan<R>{ > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(AbstractScan.class); > + > + protected final List<R> readEntries; > + private final OperatorCost cost; > + private final Size size; > + > + public AbstractScan(List<R> readEntries) { > + this.readEntries = readEntries; > + OperatorCost cost = new OperatorCost(0,0,0,0); > + Size size = new Size(0,0); > + for(R r : readEntries){ > + cost = cost.add(r.getCost()); > + size = size.add(r.getSize()); > + } > + this.cost = cost; > + this.size = size; > + } > + > + @Override > + @JsonProperty("entries") > + public List<R> getReadEntries() { > + return readEntries; > + } > + > + @Override > + public Iterator<PhysicalOperator> iterator() { > + return Iterators.emptyIterator(); > + } > + > + @Override > + public boolean isExecutable() { > + return true; > + } > + > + @Override > + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> > physicalVisitor, X value) throws E{ > + return physicalVisitor.visitScan(this, value); > + } > + > + @Override > + public OperatorCost getCost() { > + return cost; > + } > + > + @Override > + public Size getSize() { > + return size; > + } > + > + > + > + > + > +} > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java > > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java > new file mode 100644 > index 0000000..f8c22b3 > --- /dev/null > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java > @@ -0,0 +1,53 @@ > +/******************************************************************************* > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See
