Updated Branches:
  refs/heads/master 4a83dae35 -> db0203696

DRILL-281 Add broadcast sender


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4a872263
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4a872263
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4a872263

Branch: refs/heads/master
Commit: 4a87226371c81e0186347ca8d8dead50de5c2de5
Parents: 4a83dae
Author: Timothy Chen <[email protected]>
Authored: Tue Nov 19 23:40:14 2013 +0800
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Jan 14 08:20:08 2014 -0800

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   5 +
 .../exec/physical/base/PhysicalVisitor.java     |   1 +
 .../exec/physical/config/BroadcastExchange.java |  79 +++++++++++
 .../exec/physical/config/BroadcastSender.java   |  66 ++++++++++
 .../drill/exec/physical/impl/ScreenCreator.java |   7 +-
 .../broadcastsender/BroadcastSenderCreator.java |  36 +++++
 .../BroadcastSenderRootExec.java                | 131 +++++++++++++++++++
 .../apache/drill/exec/record/WritableBatch.java |   5 +
 .../apache/drill/exec/rpc/bit/BitTunnel.java    |  28 ++++
 .../physical/impl/TestBroadcastExchange.java    |  87 ++++++++++++
 .../resources/sender/broadcast_exchange.json    |  55 ++++++++
 .../sender/broadcast_exchange_long_run.json     |  53 ++++++++
 12 files changed, 548 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index b8a7247..ec7244a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -131,6 +131,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitBroadcastSender(BroadcastSender op, X value) throws E {
+    return visitSender(op, value);
+  }
+
+  @Override
   public T visitScreen(Screen op, X value) throws E {
     return visitStore(op, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 040a495..2e2d7fd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -53,6 +53,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) 
throws EXCEP;
   public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) 
throws EXCEP;
   public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP;
+  public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws 
EXCEP;
   public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
   public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
   public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
new file mode 100644
index 0000000..256d3d9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -0,0 +1,79 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.List;
+
+import static org.apache.drill.exec.proto.CoordinationProtos.*;
+
+@JsonTypeName("broadcast-exchange")
+public class BroadcastExchange extends AbstractExchange {
+
+  private List<DrillbitEndpoint> senderLocations;
+  private List<DrillbitEndpoint> receiverLocations;
+
+  @JsonCreator
+  public BroadcastExchange(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws 
PhysicalOperatorSetupException {
+    this.senderLocations = senderLocations;
+  }
+
+  @Override
+  protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> 
receiverLocations) throws PhysicalOperatorSetupException {
+    this.receiverLocations = receiverLocations;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new BroadcastExchange(child);
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) throws 
PhysicalOperatorSetupException {
+    return new BroadcastSender(receiverMajorFragmentId, child, 
receiverLocations);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return new RandomReceiver(senderMajorFragmentId, senderLocations);
+  }
+
+  @Override
+  public int getMaxSendWidth() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean supportsSelectionVector() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
new file mode 100644
index 0000000..9c0388a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+import java.util.List;
+
+import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+@JsonTypeName("broadcast-sender")
+public class BroadcastSender extends AbstractSender {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BroadcastSender.class);
+  private final List<DrillbitEndpoint> destinations;
+
+  @JsonCreator
+  public BroadcastSender(@JsonProperty("receiver-major-fragment") int 
oppositeMajorFragmentId,
+                         @JsonProperty("child") PhysicalOperator child,
+                         @JsonProperty("destinations") List<DrillbitEndpoint> 
destinations) {
+    super(oppositeMajorFragmentId, child);
+    this.destinations = destinations;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(child.getSize().getAggSize() * destinations.size(),
+                            0, 1000, child.getSize().getRecordCount());
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new BroadcastSender(oppositeMajorFragmentId, child, destinations);
+  }
+
+  @Override
+  public List<DrillbitEndpoint> getDestinations() {
+    return destinations;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitBroadcastSender(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index e1fb3ae..d02396b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,10 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import io.netty.buffer.ByteBuf;
-
-import java.util.List;
-
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
@@ -36,7 +33,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.foreman.ErrorHelper;
 
-import com.google.common.base.Preconditions;
+import java.util.List;
 
 public class ScreenCreator implements RootCreator<Screen>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
new file mode 100644
index 0000000..add5117
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.broadcastsender;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.BroadcastSender;
+import org.apache.drill.exec.physical.impl.RootCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class BroadcastSenderCreator implements RootCreator<BroadcastSender> {
+  @Override
+  public RootExec getRoot(FragmentContext context, BroadcastSender config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    assert children != null && children.size() == 1;
+    return new BroadcastSenderRootExec(context, 
Iterators.getOnlyElement(children.iterator()), config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
new file mode 100644
index 0000000..65cadb5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -0,0 +1,131 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.physical.impl.broadcastsender;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.BroadcastSender;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+import java.util.List;
+
+import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+/**
+ * Broadcast Sender broadcasts incoming batches to all receivers (one or more).
+ * This is useful in cases such as broadcast join where sending the entire 
table to join
+ * to all nodes is cheaper than merging and computing all the joins in the 
same node.
+ */
+public class BroadcastSenderRootExec implements RootExec {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
+  private final FragmentContext context;
+  private final BroadcastSender config;
+  private final BitTunnel[] tunnels;
+  private final ExecProtos.FragmentHandle handle;
+  private volatile boolean ok;
+  private final RecordBatch incoming;
+  private final DrillRpcFuture[] responseFutures;
+
+  public BroadcastSenderRootExec(FragmentContext context,
+                                 RecordBatch incoming,
+                                 BroadcastSender config) {
+    this.ok = true;
+    this.context = context;
+    this.incoming = incoming;
+    this.config = config;
+    this.handle = context.getHandle();
+    List<DrillbitEndpoint> destinations = config.getDestinations();
+    this.tunnels = new BitTunnel[destinations.size()];
+    for(int i = 0; i < destinations.size(); ++i) {
+      tunnels[i] = context.getCommunicator().getTunnel(destinations.get(i));
+    }
+    responseFutures = new DrillRpcFuture[destinations.size()];
+  }
+
+  @Override
+  public boolean next() {
+    if(!ok) {
+      return false;
+    }
+
+    RecordBatch.IterOutcome out = incoming.next();
+    logger.debug("Outcome of sender next {}", out);
+    switch(out){
+      case STOP:
+      case NONE:
+        for (int i = 0; i < tunnels.length; ++i) {
+          FragmentWritableBatch b2 = 
FragmentWritableBatch.getEmptyLast(handle.getQueryId(), 
handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
config.getOppositeMajorFragmentId(), i);
+          responseFutures[i] = tunnels[i].sendRecordBatch(context, b2);
+        }
+
+        waitAllFutures(false);
+        return false;
+
+      case OK_NEW_SCHEMA:
+      case OK:
+        WritableBatch writableBatch = incoming.getWritableBatch();
+        for (int i = 0; i < tunnels.length; ++i) {
+          FragmentWritableBatch batch = new FragmentWritableBatch(false, 
handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
config.getOppositeMajorFragmentId(), i, writableBatch);
+          if(i > 0) {
+            writableBatch.retainBuffers();
+          }
+          responseFutures[i] = tunnels[i].sendRecordBatch(context, batch);
+        }
+
+        return waitAllFutures(true);
+
+      case NOT_YET:
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  private boolean waitAllFutures(boolean haltOnError) {
+    for (DrillRpcFuture<?> responseFuture : responseFutures) {
+      try {
+        GeneralRPCProtos.Ack ack = (GeneralRPCProtos.Ack) 
responseFuture.checkedGet();
+        if(!ack.getOk()) {
+          ok = false;
+          if (haltOnError) {
+            return false;
+          }
+        }
+      } catch (RpcException e) {
+        logger.error("Error sending batch to receiver: " + e);
+        ok = false;
+        if (haltOnError) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void stop() {
+      ok = false;
+      incoming.kill();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index e9b56db..945df41 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -153,4 +153,9 @@ public class WritableBatch {
     return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);
   }
 
+  public void retainBuffers() {
+    for (ByteBuf buf : buffers) {
+      buf.retain();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 4ce826c..bbeb39d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
@@ -49,6 +50,12 @@ public class BitTunnel {
     manager.runCommand(b);
   }
 
+  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, 
FragmentWritableBatch batch) {
+    SendBatchAsync b = new SendBatchAsync(batch, context);
+    manager.runCommand(b);
+    return b.getFuture();
+  }
+
   public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, 
PlanFragment fragment){
     SendFragment b = new SendFragment(outcomeListener, fragment);
     manager.runCommand(b);
@@ -66,6 +73,27 @@ public class BitTunnel {
     return b.getFuture();
   }
 
+  public static class SendBatchAsync extends FutureBitCommand<Ack> {
+    final FragmentWritableBatch batch;
+    final FragmentContext context;
+
+    public SendBatchAsync(FragmentWritableBatch batch, FragmentContext 
context) {
+      super();
+      this.batch = batch;
+      this.context = context;
+    }
+
+    @Override
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, 
BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, 
batch.getHeader(), Ack.class, batch.getBuffers());
+    }
+
+    @Override
+    public String toString() {
+      return "SendBatch [batch.header=" + batch.getHeader() + "]";
+    }
+  }
+
   public static class SendBatch extends ListeningBitCommand<Ack> {
     final FragmentWritableBatch batch;
     final FragmentContext context;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
new file mode 100644
index 0000000..b55681d
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestBroadcastExchange extends PopUnitTestBase {
+  @Test
+  public void TestSingleBroadcastExchangeWithTwoScans() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator())) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+
+      String physicalPlan = Files.toString(
+              FileUtils.getResourceAsFile("/sender/broadcast_exchange.json"), 
Charsets.UTF_8)
+              .replace("#{LEFT_FILE}", 
FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString())
+              .replace("#{RIGHT_FILE}", 
FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString());
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL, physicalPlan);
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(25, count);
+    }
+  }
+
+  @Test
+  public void TestMultipleSendLocationBroadcastExchange() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator())) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+
+      String physicalPlan = Files.toString(
+          
FileUtils.getResourceAsFile("/sender/broadcast_exchange_long_run.json"), 
Charsets.UTF_8);
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL, physicalPlan);
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      System.out.println(count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/resources/sender/broadcast_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/broadcast_exchange.json 
b/exec/java-exec/src/test/resources/sender/broadcast_exchange.json
new file mode 100644
index 0000000..64f1606
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/broadcast_exchange.json
@@ -0,0 +1,55 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+           {
+              @id:1,
+              pop:"json-scan",
+              entries: [
+                 {
+                    path : "#{LEFT_FILE}"
+                 }
+              ],
+              storageengine: {
+                 "type": "json",
+                 "dfsName": "file:///"
+              }
+            },
+            {
+               @id:2,
+               pop:"json-scan",
+               entries: [
+                              {
+                                  path : "#{RIGHT_FILE}"
+                              }
+                          ],
+               storageengine: {
+                              "type": "json",
+                              "dfsName": "file:///"
+               }
+            },
+        {
+            @id: 3,
+            child: 1,
+            pop: "broadcast-exchange"
+        },
+        {
+            pop : "merge-join",
+            @id : 4,
+            left: 3,
+            right: 2,
+            join-type: "LEFT",
+            join-conditions: [ { relationship: "==", left: "a", right: "aa" } ]
+          },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json 
b/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json
new file mode 100644
index 0000000..aa15541
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json
@@ -0,0 +1,53 @@
+{
+    "head": {
+        "type": "APACHE_DRILL_PHYSICAL",
+        "version": "1",
+        "generator": {
+            "type": "manual"
+        }
+    },
+    "graph": [
+        {
+            "@id": 1,
+            "pop": "mock-scan",
+            "url": "http://apache.org";,
+            "entries": [
+                {
+                    "records": 10000,
+                    "types": [
+                        {
+                            "name": "blue",
+                            "type": "INT",
+                            "mode": "REQUIRED"
+                        },
+                        {
+                            "name": "red",
+                            "type": "BIGINT",
+                            "mode": "REQUIRED"
+                        },
+                        {
+                            "name": "green",
+                            "type": "INT",
+                            "mode": "REQUIRED"
+                        }
+                    ]
+                }
+            ]
+        },
+        {
+            "@id": 2,
+            "child": 1,
+            "pop": "broadcast-exchange"
+        },
+        {
+            @id: 3,
+            child: 2,
+            pop: "union-exchange"
+        },
+        {
+            "@id": 4,
+            "child": 3,
+            "pop": "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to