DRILL-1069: Rename RandomReceiver to UnorderedRecevier.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/16808f4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/16808f4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/16808f4a Branch: refs/heads/master Commit: 16808f4a485418d2974b0be17c2aa9fab2514d38 Parents: 8773f84 Author: Jacques Nadeau <jacq...@apache.org> Authored: Sat Jun 21 11:24:42 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Jun 25 09:08:41 2014 -0700 ---------------------------------------------------------------------- .../physical/base/AbstractPhysicalVisitor.java | 4 +- .../exec/physical/base/PhysicalVisitor.java | 4 +- .../exec/physical/config/BroadcastExchange.java | 2 +- .../physical/config/HashToRandomExchange.java | 2 +- .../config/OrderedPartitionExchange.java | 2 +- .../exec/physical/config/RandomReceiver.java | 64 ------- .../exec/physical/config/UnionExchange.java | 2 +- .../exec/physical/config/UnorderedReceiver.java | 64 +++++++ .../physical/impl/RandomReceiverCreator.java | 46 ----- .../exec/physical/impl/WireRecordBatch.java | 181 ------------------- .../UnorderedReceiverBatch.java | 181 +++++++++++++++++++ .../UnorderedReceiverCreator.java | 47 +++++ protocol/src/main/protobuf/UserBitShared.proto | 2 +- 13 files changed, 301 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 14cfe81..d7d7c12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -29,7 +29,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; +import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; @@ -146,7 +146,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override - public T visitRandomReceiver(RandomReceiver op, X value) throws E { + public T visitUnorderedReceiver(UnorderedReceiver op, X value) throws E { return visitReceiver(op, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 3eaf026..6d5a6cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -29,7 +29,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; +import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; @@ -71,7 +71,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitHashPartitionSender(HashPartitionSender op, EXTRA value) throws EXCEP; public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA value) throws EXCEP; - public RETURN visitRandomReceiver(RandomReceiver op, EXTRA value) throws EXCEP; + public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) throws EXCEP; public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP; public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP; public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java index 5e67bcc..2eed4c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java @@ -64,7 +64,7 @@ public class BroadcastExchange extends AbstractExchange { @Override public Receiver getReceiver(int minorFragmentId) { - return new RandomReceiver(senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(senderMajorFragmentId, senderLocations); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java index 7ad2b65..dddaf83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java @@ -71,7 +71,7 @@ public class HashToRandomExchange extends AbstractExchange{ @Override public Receiver getReceiver(int minorFragmentId) { - return new RandomReceiver(senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(senderMajorFragmentId, senderLocations); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java index 2b80262..8e1526a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java @@ -93,7 +93,7 @@ public class OrderedPartitionExchange extends AbstractExchange { @Override public Receiver getReceiver(int minorFragmentId) { - return new RandomReceiver(senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(senderMajorFragmentId, senderLocations); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java deleted file mode 100644 index b04db40..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.config; - -import java.util.List; - -import org.apache.drill.exec.physical.base.AbstractReceiver; -import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; - -@JsonTypeName("random-receiver") -public class RandomReceiver extends AbstractReceiver{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class); - - private List<DrillbitEndpoint> senders; - - @JsonCreator - public RandomReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, - @JsonProperty("senders") List<DrillbitEndpoint> senders) { - super(oppositeMajorFragmentId); - this.senders = senders; - } - - @Override - @JsonProperty("senders") - public List<DrillbitEndpoint> getProvidingEndpoints() { - return senders; - } - - @Override - public boolean supportsOutOfOrderExchange() { - return true; - } - - @Override - public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { - return physicalVisitor.visitRandomReceiver(this, value); - } - - @Override - public int getOperatorType() { - return CoreOperatorType.RANDOM_RECEIVER_VALUE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java index a638741..cafdbdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java @@ -59,7 +59,7 @@ public class UnionExchange extends AbstractExchange{ @Override public Receiver getReceiver(int minorFragmentId) { - return new RandomReceiver(this.senderMajorFragmentId, senderLocations); + return new UnorderedReceiver(this.senderMajorFragmentId, senderLocations); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java new file mode 100644 index 0000000..a204752 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; + +import org.apache.drill.exec.physical.base.AbstractReceiver; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("unordered-receiver") +public class UnorderedReceiver extends AbstractReceiver{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class); + + private List<DrillbitEndpoint> senders; + + @JsonCreator + public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, + @JsonProperty("senders") List<DrillbitEndpoint> senders) { + super(oppositeMajorFragmentId); + this.senders = senders; + } + + @Override + @JsonProperty("senders") + public List<DrillbitEndpoint> getProvidingEndpoints() { + return senders; + } + + @Override + public boolean supportsOutOfOrderExchange() { + return true; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitUnorderedReceiver(this, value); + } + + @Override + public int getOperatorType() { + return CoreOperatorType.RANDOM_RECEIVER_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java deleted file mode 100644 index 4ff5831..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl; - -import java.util.List; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.work.batch.IncomingBuffers; -import org.apache.drill.exec.work.batch.RawBatchBuffer; - -public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiverCreator.class); - - @Override - public RecordBatch getBatch(FragmentContext context, RandomReceiver receiver, List<RecordBatch> children) - throws ExecutionSetupException { - assert children == null || children.isEmpty(); - IncomingBuffers bufHolder = context.getBuffers(); - assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); - assert buffers.length == 1; - RawBatchBuffer buffer = buffers[0]; - return new WireRecordBatch(context, buffer, receiver); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java deleted file mode 100644 index 1eae0c9..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.OpProfileDef; -import org.apache.drill.exec.ops.OperatorStats; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.record.RawFragmentBatchProvider; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; - -public class WireRecordBatch implements RecordBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class); - - private RecordBatchLoader batchLoader; - private RawFragmentBatchProvider fragProvider; - private FragmentContext context; - private BatchSchema schema; - private OperatorStats stats; - private boolean first = true; - - - public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException { - this.fragProvider = fragProvider; - this.context = context; - // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, - // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. - this.batchLoader = new RecordBatchLoader(context.getAllocator()); - this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0)); - } - - @Override - public FragmentContext getContext() { - return context; - } - - @Override - public BatchSchema getSchema() { - return schema; - } - - @Override - public int getRecordCount() { - return batchLoader.getRecordCount(); - } - - @Override - public void kill() { - fragProvider.kill(context); - } - - @Override - public Iterator<VectorWrapper<?>> iterator() { - return batchLoader.iterator(); - } - - @Override - public SelectionVector2 getSelectionVector2() { - throw new UnsupportedOperationException(); - } - - @Override - public SelectionVector4 getSelectionVector4() { - throw new UnsupportedOperationException(); - } - - @Override - public TypedFieldId getValueVectorId(SchemaPath path) { - return batchLoader.getValueVectorId(path); - } - - @Override - public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { - return batchLoader.getValueAccessorById(clazz, ids); - } - - @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - RawFragmentBatch batch; - try { - stats.startWait(); - batch = fragProvider.getNext(); - - // skip over empty batches. we do this since these are basically control messages. - while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first){ - if (first) { - first = false; - RecordBatchDef rbd = batch.getHeader().getDef(); - batchLoader.load(rbd, batch.getBody()); - batch.release(); - schema = batchLoader.getSchema().clone(); - batchLoader.clear(); - } - batch = fragProvider.getNext(); - } - } finally { - stats.stopWait(); - } - - first = false; - - if (batch == null){ - batchLoader.clear(); - return IterOutcome.NONE; - } - - if (batch.getHeader().getIsOutOfMemory()) { - return IterOutcome.OUT_OF_MEMORY; - } - - -// logger.debug("Next received batch {}", batch); - - RecordBatchDef rbd = batch.getHeader().getDef(); - boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); -// System.out.println(rbd.getRecordCount()); - batch.release(); - if(schemaChanged){ - this.schema = batchLoader.getSchema(); - return IterOutcome.OK_NEW_SCHEMA; - }else{ - return IterOutcome.OK; - } - }catch(SchemaChangeException | IOException ex){ - context.fail(ex); - return IterOutcome.STOP; - } finally { - stats.stopProcessing(); - } - } - - @Override - public WritableBatch getWritableBatch() { - return batchLoader.getWritableBatch(); - } - - @Override - public void cleanup() { - batchLoader.clear(); - fragProvider.cleanup(); - } - - @Override - public VectorContainer getOutgoingContainer() { - throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java new file mode 100644 index 0000000..3d9e470 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unorderedreceiver; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RawFragmentBatch; +import org.apache.drill.exec.record.RawFragmentBatchProvider; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +public class UnorderedReceiverBatch implements RecordBatch { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); + + private RecordBatchLoader batchLoader; + private RawFragmentBatchProvider fragProvider; + private FragmentContext context; + private BatchSchema schema; + private OperatorStats stats; + private boolean first = true; + + + public UnorderedReceiverBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws OutOfMemoryException { + this.fragProvider = fragProvider; + this.context = context; + // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, + // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. + this.batchLoader = new RecordBatchLoader(context.getAllocator()); + this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0), null); + } + + @Override + public FragmentContext getContext() { + return context; + } + + @Override + public BatchSchema getSchema() { + return schema; + } + + @Override + public int getRecordCount() { + return batchLoader.getRecordCount(); + } + + @Override + public void kill() { + fragProvider.kill(context); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return batchLoader.iterator(); + } + + @Override + public SelectionVector2 getSelectionVector2() { + throw new UnsupportedOperationException(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + throw new UnsupportedOperationException(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return batchLoader.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return batchLoader.getValueAccessorById(clazz, ids); + } + + @Override + public IterOutcome next() { + stats.startProcessing(); + try{ + RawFragmentBatch batch; + try { + stats.startWait(); + batch = fragProvider.getNext(); + + // skip over empty batches. we do this since these are basically control messages. + while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first){ + if (first) { + first = false; + RecordBatchDef rbd = batch.getHeader().getDef(); + batchLoader.load(rbd, batch.getBody()); + batch.release(); + schema = batchLoader.getSchema().clone(); + batchLoader.clear(); + } + batch = fragProvider.getNext(); + } + } finally { + stats.stopWait(); + } + + first = false; + + if (batch == null){ + batchLoader.clear(); + return IterOutcome.NONE; + } + + if (batch.getHeader().getIsOutOfMemory()) { + return IterOutcome.OUT_OF_MEMORY; + } + + +// logger.debug("Next received batch {}", batch); + + RecordBatchDef rbd = batch.getHeader().getDef(); + boolean schemaChanged = batchLoader.load(rbd, batch.getBody()); +// System.out.println(rbd.getRecordCount()); + batch.release(); + if(schemaChanged){ + this.schema = batchLoader.getSchema(); + return IterOutcome.OK_NEW_SCHEMA; + }else{ + return IterOutcome.OK; + } + }catch(SchemaChangeException | IOException ex){ + context.fail(ex); + return IterOutcome.STOP; + } finally { + stats.stopProcessing(); + } + } + + @Override + public WritableBatch getWritableBatch() { + return batchLoader.getWritableBatch(); + } + + @Override + public void cleanup() { + batchLoader.clear(); + fragProvider.cleanup(); + } + + @Override + public VectorContainer getOutgoingContainer() { + throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java new file mode 100644 index 0000000..6be8714 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unorderedreceiver; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.batch.RawBatchBuffer; + +public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) + throws ExecutionSetupException { + assert children == null || children.isEmpty(); + IncomingBuffers bufHolder = context.getBuffers(); + assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; + + RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + assert buffers.length == 1; + RawBatchBuffer buffer = buffers[0]; + return new UnorderedReceiverBatch(context, buffer, receiver); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index fc6f1b5..6fd73db 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -178,7 +178,7 @@ enum CoreOperatorType { MERGING_RECEIVER = 8; ORDERED_PARTITION_SENDER = 9; PROJECT = 10; - RANDOM_RECEIVER = 11; + UNORDERED_RECEIVER = 11; RANGE_SENDER = 12; SCREEN = 13; SELECTION_VECTOR_REMOVER = 14;