DRILL-1069: Rename RandomReceiver to UnorderedRecevier.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/16808f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/16808f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/16808f4a

Branch: refs/heads/master
Commit: 16808f4a485418d2974b0be17c2aa9fab2514d38
Parents: 8773f84
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sat Jun 21 11:24:42 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Jun 25 09:08:41 2014 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   4 +-
 .../exec/physical/base/PhysicalVisitor.java     |   4 +-
 .../exec/physical/config/BroadcastExchange.java |   2 +-
 .../physical/config/HashToRandomExchange.java   |   2 +-
 .../config/OrderedPartitionExchange.java        |   2 +-
 .../exec/physical/config/RandomReceiver.java    |  64 -------
 .../exec/physical/config/UnionExchange.java     |   2 +-
 .../exec/physical/config/UnorderedReceiver.java |  64 +++++++
 .../physical/impl/RandomReceiverCreator.java    |  46 -----
 .../exec/physical/impl/WireRecordBatch.java     | 181 -------------------
 .../UnorderedReceiverBatch.java                 | 181 +++++++++++++++++++
 .../UnorderedReceiverCreator.java               |  47 +++++
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +-
 13 files changed, 301 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 14cfe81..d7d7c12 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
@@ -146,7 +146,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
-  public T visitRandomReceiver(RandomReceiver op, X value) throws E {
+  public T visitUnorderedReceiver(UnorderedReceiver op, X value) throws E {
     return visitReceiver(op, value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 3eaf026..6d5a6cb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.physical.config.RangeSender;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SingleSender;
@@ -71,7 +71,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   
   public RETURN visitHashPartitionSender(HashPartitionSender op, EXTRA value) 
throws EXCEP;
   public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA 
value) throws EXCEP;
-  public RETURN visitRandomReceiver(RandomReceiver op, EXTRA value) throws 
EXCEP;
+  public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) 
throws EXCEP;
   public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) 
throws EXCEP;
   public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) 
throws EXCEP;
   public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index 5e67bcc..2eed4c4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -64,7 +64,7 @@ public class BroadcastExchange extends AbstractExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new RandomReceiver(senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index 7ad2b65..dddaf83 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -71,7 +71,7 @@ public class HashToRandomExchange extends AbstractExchange{
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new RandomReceiver(senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index 2b80262..8e1526a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -93,7 +93,7 @@ public class OrderedPartitionExchange extends 
AbstractExchange {
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new RandomReceiver(senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(senderMajorFragmentId, senderLocations);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
deleted file mode 100644
index b04db40..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.config;
-
-import java.util.List;
-
-import org.apache.drill.exec.physical.base.AbstractReceiver;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("random-receiver")
-public class RandomReceiver extends AbstractReceiver{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
-
-  private List<DrillbitEndpoint> senders;
-
-  @JsonCreator
-  public RandomReceiver(@JsonProperty("sender-major-fragment") int 
oppositeMajorFragmentId,
-                        @JsonProperty("senders") List<DrillbitEndpoint> 
senders) {
-    super(oppositeMajorFragmentId);
-    this.senders = senders;
-  }
-
-  @Override
-  @JsonProperty("senders")
-  public List<DrillbitEndpoint> getProvidingEndpoints() {
-    return senders;
-  }
-
-  @Override
-  public boolean supportsOutOfOrderExchange() {
-    return true;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
-    return physicalVisitor.visitRandomReceiver(this, value);
-  }
-
-  @Override
-  public int getOperatorType() {
-    return CoreOperatorType.RANDOM_RECEIVER_VALUE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index a638741..cafdbdd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -59,7 +59,7 @@ public class UnionExchange extends AbstractExchange{
 
   @Override
   public Receiver getReceiver(int minorFragmentId) {
-    return new RandomReceiver(this.senderMajorFragmentId, senderLocations);
+    return new UnorderedReceiver(this.senderMajorFragmentId, senderLocations);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
new file mode 100644
index 0000000..a204752
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.AbstractReceiver;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("unordered-receiver")
+public class UnorderedReceiver extends AbstractReceiver{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnorderedReceiver.class);
+
+  private List<DrillbitEndpoint> senders;
+
+  @JsonCreator
+  public UnorderedReceiver(@JsonProperty("sender-major-fragment") int 
oppositeMajorFragmentId,
+                        @JsonProperty("senders") List<DrillbitEndpoint> 
senders) {
+    super(oppositeMajorFragmentId);
+    this.senders = senders;
+  }
+
+  @Override
+  @JsonProperty("senders")
+  public List<DrillbitEndpoint> getProvidingEndpoints() {
+    return senders;
+  }
+
+  @Override
+  public boolean supportsOutOfOrderExchange() {
+    return true;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitUnorderedReceiver(this, value);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.RANDOM_RECEIVER_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
deleted file mode 100644
index 4ff5831..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.batch.RawBatchBuffer;
-
-public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RandomReceiverCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, RandomReceiver 
receiver, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    assert children == null || children.isEmpty();
-    IncomingBuffers bufHolder = context.getBuffers();
-    assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
-    
-    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
-    assert buffers.length == 1;
-    RawBatchBuffer buffer = buffers[0];
-    return new WireRecordBatch(context, buffer, receiver);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
deleted file mode 100644
index 1eae0c9..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OpProfileDef;
-import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-public class WireRecordBatch implements RecordBatch {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
-
-  private RecordBatchLoader batchLoader;
-  private RawFragmentBatchProvider fragProvider;
-  private FragmentContext context;
-  private BatchSchema schema;
-  private OperatorStats stats;
-  private boolean first = true;
-
-
-  public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider 
fragProvider, RandomReceiver config) throws OutOfMemoryException {
-    this.fragProvider = fragProvider;
-    this.context = context;
-    // In normal case, batchLoader does not require an allocator. However, in 
case of splitAndTransfer of a value vector,
-    // we may need an allocator for the new offset vector. Therefore, here we 
pass the context's allocator to batchLoader.
-    this.batchLoader = new RecordBatchLoader(context.getAllocator());
-    this.stats = context.getStats().getOperatorStats(new 
OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0));
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return batchLoader.getRecordCount();
-  }
-
-  @Override
-  public void kill() {
-    fragProvider.kill(context);
-  }
-
-  @Override
-  public Iterator<VectorWrapper<?>> iterator() {
-    return batchLoader.iterator();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return batchLoader.getValueVectorId(path);
-  }
-
-  @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-    return batchLoader.getValueAccessorById(clazz, ids);
-  }
-
-  @Override
-  public IterOutcome next() {
-    stats.startProcessing();
-    try{
-      RawFragmentBatch batch;
-      try {
-        stats.startWait();
-        batch = fragProvider.getNext();
-
-        // skip over empty batches. we do this since these are basically 
control messages.
-        while(batch != null && !batch.getHeader().getIsOutOfMemory() && 
batch.getHeader().getDef().getRecordCount() == 0 && !first){
-          if (first) {
-            first = false;
-            RecordBatchDef rbd = batch.getHeader().getDef();
-            batchLoader.load(rbd, batch.getBody());
-            batch.release();
-            schema = batchLoader.getSchema().clone();
-            batchLoader.clear();
-          }
-          batch = fragProvider.getNext();
-        }
-      } finally {
-        stats.stopWait();
-      }
-
-      first = false;
-
-      if (batch == null){
-        batchLoader.clear();
-        return IterOutcome.NONE;
-      }
-
-      if (batch.getHeader().getIsOutOfMemory()) {
-        return IterOutcome.OUT_OF_MEMORY;
-      }
-
-
-//      logger.debug("Next received batch {}", batch);
-
-      RecordBatchDef rbd = batch.getHeader().getDef();
-      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
-//      System.out.println(rbd.getRecordCount());
-      batch.release();
-      if(schemaChanged){
-        this.schema = batchLoader.getSchema();
-        return IterOutcome.OK_NEW_SCHEMA;
-      }else{
-        return IterOutcome.OK;
-      }
-    }catch(SchemaChangeException | IOException ex){
-      context.fail(ex);
-      return IterOutcome.STOP;
-    } finally {
-      stats.stopProcessing();
-    }
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    return batchLoader.getWritableBatch();
-  }
-
-  @Override
-  public void cleanup() {
-    batchLoader.clear();
-    fragProvider.cleanup();
-  }
-
-  @Override
-  public VectorContainer getOutgoingContainer() {
-    throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
new file mode 100644
index 0000000..3d9e470
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unorderedreceiver;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class UnorderedReceiverBatch implements RecordBatch {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+
+  private RecordBatchLoader batchLoader;
+  private RawFragmentBatchProvider fragProvider;
+  private FragmentContext context;
+  private BatchSchema schema;
+  private OperatorStats stats;
+  private boolean first = true;
+
+
+  public UnorderedReceiverBatch(FragmentContext context, 
RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws 
OutOfMemoryException {
+    this.fragProvider = fragProvider;
+    this.context = context;
+    // In normal case, batchLoader does not require an allocator. However, in 
case of splitAndTransfer of a value vector,
+    // we may need an allocator for the new offset vector. Therefore, here we 
pass the context's allocator to batchLoader.
+    this.batchLoader = new RecordBatchLoader(context.getAllocator());
+    this.stats = context.getStats().getOperatorStats(new 
OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0), null);
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return batchLoader.getRecordCount();
+  }
+
+  @Override
+  public void kill() {
+    fragProvider.kill(context);
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return batchLoader.iterator();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return batchLoader.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return batchLoader.getValueAccessorById(clazz, ids);
+  }
+
+  @Override
+  public IterOutcome next() {
+    stats.startProcessing();
+    try{
+      RawFragmentBatch batch;
+      try {
+        stats.startWait();
+        batch = fragProvider.getNext();
+
+        // skip over empty batches. we do this since these are basically 
control messages.
+        while(batch != null && !batch.getHeader().getIsOutOfMemory() && 
batch.getHeader().getDef().getRecordCount() == 0 && !first){
+          if (first) {
+            first = false;
+            RecordBatchDef rbd = batch.getHeader().getDef();
+            batchLoader.load(rbd, batch.getBody());
+            batch.release();
+            schema = batchLoader.getSchema().clone();
+            batchLoader.clear();
+          }
+          batch = fragProvider.getNext();
+        }
+      } finally {
+        stats.stopWait();
+      }
+
+      first = false;
+
+      if (batch == null){
+        batchLoader.clear();
+        return IterOutcome.NONE;
+      }
+
+      if (batch.getHeader().getIsOutOfMemory()) {
+        return IterOutcome.OUT_OF_MEMORY;
+      }
+
+
+//      logger.debug("Next received batch {}", batch);
+
+      RecordBatchDef rbd = batch.getHeader().getDef();
+      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+//      System.out.println(rbd.getRecordCount());
+      batch.release();
+      if(schemaChanged){
+        this.schema = batchLoader.getSchema();
+        return IterOutcome.OK_NEW_SCHEMA;
+      }else{
+        return IterOutcome.OK;
+      }
+    }catch(SchemaChangeException | IOException ex){
+      context.fail(ex);
+      return IterOutcome.STOP;
+    } finally {
+      stats.stopProcessing();
+    }
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return batchLoader.getWritableBatch();
+  }
+
+  @Override
+  public void cleanup() {
+    batchLoader.clear();
+    fragProvider.cleanup();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
new file mode 100644
index 0000000..6be8714
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.unorderedreceiver;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.batch.RawBatchBuffer;
+
+public class UnorderedReceiverCreator implements 
BatchCreator<UnorderedReceiver>{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, UnorderedReceiver 
receiver, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    IncomingBuffers bufHolder = context.getBuffers();
+    assert bufHolder != null : "IncomingBuffers must be defined for any place 
a receiver is declared.";
+    
+    RawBatchBuffer[] buffers = 
bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    assert buffers.length == 1;
+    RawBatchBuffer buffer = buffers[0];
+    return new UnorderedReceiverBatch(context, buffer, receiver);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/16808f4a/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index fc6f1b5..6fd73db 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -178,7 +178,7 @@ enum CoreOperatorType {
   MERGING_RECEIVER = 8;
   ORDERED_PARTITION_SENDER = 9;
   PROJECT = 10;
-  RANDOM_RECEIVER = 11;
+  UNORDERED_RECEIVER = 11;
   RANGE_SENDER = 12;
   SCREEN = 13;
   SELECTION_VECTOR_REMOVER = 14;

Reply via email to