Updated Branches: refs/heads/master 4a83dae35 -> db0203696
DRILL-281 Add broadcast sender Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4a872263 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4a872263 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4a872263 Branch: refs/heads/master Commit: 4a87226371c81e0186347ca8d8dead50de5c2de5 Parents: 4a83dae Author: Timothy Chen <[email protected]> Authored: Tue Nov 19 23:40:14 2013 +0800 Committer: Jacques Nadeau <[email protected]> Committed: Tue Jan 14 08:20:08 2014 -0800 ---------------------------------------------------------------------- .../physical/base/AbstractPhysicalVisitor.java | 5 + .../exec/physical/base/PhysicalVisitor.java | 1 + .../exec/physical/config/BroadcastExchange.java | 79 +++++++++++ .../exec/physical/config/BroadcastSender.java | 66 ++++++++++ .../drill/exec/physical/impl/ScreenCreator.java | 7 +- .../broadcastsender/BroadcastSenderCreator.java | 36 +++++ .../BroadcastSenderRootExec.java | 131 +++++++++++++++++++ .../apache/drill/exec/record/WritableBatch.java | 5 + .../apache/drill/exec/rpc/bit/BitTunnel.java | 28 ++++ .../physical/impl/TestBroadcastExchange.java | 87 ++++++++++++ .../resources/sender/broadcast_exchange.json | 55 ++++++++ .../sender/broadcast_exchange_long_run.json | 53 ++++++++ 12 files changed, 548 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index b8a7247..ec7244a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -131,6 +131,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitBroadcastSender(BroadcastSender op, X value) throws E { + return visitSender(op, value); + } + + @Override public T visitScreen(Screen op, X value) throws E { return visitStore(op, value); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 040a495..2e2d7fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -53,6 +53,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP; public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP; public RETURN visitRangeSender(RangeSender op, EXTRA value) throws EXCEP; + public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws EXCEP; public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java new file mode 100644 index 0000000..256d3d9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java @@ -0,0 +1,79 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.AbstractExchange; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Receiver; +import org.apache.drill.exec.physical.base.Sender; +import org.apache.drill.exec.proto.CoordinationProtos; + +import java.util.List; + +import static org.apache.drill.exec.proto.CoordinationProtos.*; + +@JsonTypeName("broadcast-exchange") +public class BroadcastExchange extends AbstractExchange { + + private List<DrillbitEndpoint> senderLocations; + private List<DrillbitEndpoint> receiverLocations; + + @JsonCreator + public BroadcastExchange(@JsonProperty("child") PhysicalOperator child) { + super(child); + } + + @Override + protected void setupSenders(List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException { + this.senderLocations = senderLocations; + } + + @Override + protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { + this.receiverLocations = receiverLocations; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new BroadcastExchange(child); + } + + @Override + public Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException { + return new BroadcastSender(receiverMajorFragmentId, child, receiverLocations); + } + + @Override + public Receiver getReceiver(int minorFragmentId) { + return new RandomReceiver(senderMajorFragmentId, senderLocations); + } + + @Override + public int getMaxSendWidth() { + return Integer.MAX_VALUE; + } + + @Override + public boolean supportsSelectionVector() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java new file mode 100644 index 0000000..9c0388a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractSender; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; + +import java.util.List; + +import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +@JsonTypeName("broadcast-sender") +public class BroadcastSender extends AbstractSender { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSender.class); + private final List<DrillbitEndpoint> destinations; + + @JsonCreator + public BroadcastSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("destinations") List<DrillbitEndpoint> destinations) { + super(oppositeMajorFragmentId, child); + this.destinations = destinations; + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(child.getSize().getAggSize() * destinations.size(), + 0, 1000, child.getSize().getRecordCount()); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new BroadcastSender(oppositeMajorFragmentId, child, destinations); + } + + @Override + public List<DrillbitEndpoint> getDestinations() { + return destinations; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitBroadcastSender(this, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index e1fb3ae..d02396b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -17,10 +17,7 @@ */ package org.apache.drill.exec.physical.impl; -import io.netty.buffer.ByteBuf; - -import java.util.List; - +import com.google.common.base.Preconditions; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; @@ -36,7 +33,7 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.work.foreman.ErrorHelper; -import com.google.common.base.Preconditions; +import java.util.List; public class ScreenCreator implements RootCreator<Screen>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java new file mode 100644 index 0000000..add5117 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.broadcastsender; + +import com.google.common.collect.Iterators; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.BroadcastSender; +import org.apache.drill.exec.physical.impl.RootCreator; +import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class BroadcastSenderCreator implements RootCreator<BroadcastSender> { + @Override + public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException { + assert children != null && children.size() == 1; + return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java new file mode 100644 index 0000000..65cadb5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -0,0 +1,131 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.impl.broadcastsender; + +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.BroadcastSender; +import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.record.FragmentWritableBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.bit.BitTunnel; + +import java.util.List; + +import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +/** + * Broadcast Sender broadcasts incoming batches to all receivers (one or more). + * This is useful in cases such as broadcast join where sending the entire table to join + * to all nodes is cheaper than merging and computing all the joins in the same node. + */ +public class BroadcastSenderRootExec implements RootExec { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); + private final FragmentContext context; + private final BroadcastSender config; + private final BitTunnel[] tunnels; + private final ExecProtos.FragmentHandle handle; + private volatile boolean ok; + private final RecordBatch incoming; + private final DrillRpcFuture[] responseFutures; + + public BroadcastSenderRootExec(FragmentContext context, + RecordBatch incoming, + BroadcastSender config) { + this.ok = true; + this.context = context; + this.incoming = incoming; + this.config = config; + this.handle = context.getHandle(); + List<DrillbitEndpoint> destinations = config.getDestinations(); + this.tunnels = new BitTunnel[destinations.size()]; + for(int i = 0; i < destinations.size(); ++i) { + tunnels[i] = context.getCommunicator().getTunnel(destinations.get(i)); + } + responseFutures = new DrillRpcFuture[destinations.size()]; + } + + @Override + public boolean next() { + if(!ok) { + return false; + } + + RecordBatch.IterOutcome out = incoming.next(); + logger.debug("Outcome of sender next {}", out); + switch(out){ + case STOP: + case NONE: + for (int i = 0; i < tunnels.length; ++i) { + FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i); + responseFutures[i] = tunnels[i].sendRecordBatch(context, b2); + } + + waitAllFutures(false); + return false; + + case OK_NEW_SCHEMA: + case OK: + WritableBatch writableBatch = incoming.getWritableBatch(); + for (int i = 0; i < tunnels.length; ++i) { + FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch); + if(i > 0) { + writableBatch.retainBuffers(); + } + responseFutures[i] = tunnels[i].sendRecordBatch(context, batch); + } + + return waitAllFutures(true); + + case NOT_YET: + default: + throw new IllegalStateException(); + } + } + + private boolean waitAllFutures(boolean haltOnError) { + for (DrillRpcFuture<?> responseFuture : responseFutures) { + try { + GeneralRPCProtos.Ack ack = (GeneralRPCProtos.Ack) responseFuture.checkedGet(); + if(!ack.getOk()) { + ok = false; + if (haltOnError) { + return false; + } + } + } catch (RpcException e) { + logger.error("Error sending batch to receiver: " + e); + ok = false; + if (haltOnError) { + return false; + } + } + } + return true; + } + + @Override + public void stop() { + ok = false; + incoming.kill(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index e9b56db..945df41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -153,4 +153,9 @@ public class WritableBatch { return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2); } + public void retainBuffers() { + for (ByteBuf buf : buffers) { + buf.retain(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java index 4ce826c..bbeb39d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentStatus; import org.apache.drill.exec.proto.ExecProtos.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.RpcType; +import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.rpc.DrillRpcFuture; @@ -49,6 +50,12 @@ public class BitTunnel { manager.runCommand(b); } + public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) { + SendBatchAsync b = new SendBatchAsync(batch, context); + manager.runCommand(b); + return b.getFuture(); + } + public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){ SendFragment b = new SendFragment(outcomeListener, fragment); manager.runCommand(b); @@ -66,6 +73,27 @@ public class BitTunnel { return b.getFuture(); } + public static class SendBatchAsync extends FutureBitCommand<Ack> { + final FragmentWritableBatch batch; + final FragmentContext context; + + public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) { + super(); + this.batch = batch; + this.context = context; + } + + @Override + public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) { + connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers()); + } + + @Override + public String toString() { + return "SendBatch [batch.header=" + batch.getHeader() + "]"; + } + } + public static class SendBatch extends ListeningBitCommand<Ack> { final FragmentWritableBatch batch; final FragmentContext context; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java new file mode 100644 index 0000000..b55681d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestBroadcastExchange.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestBroadcastExchange extends PopUnitTestBase { + @Test + public void TestSingleBroadcastExchangeWithTwoScans() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + bit1.run(); + bit2.run(); + client.connect(); + + String physicalPlan = Files.toString( + FileUtils.getResourceAsFile("/sender/broadcast_exchange.json"), Charsets.UTF_8) + .replace("#{LEFT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.left.json").toURI().toString()) + .replace("#{RIGHT_FILE}", FileUtils.getResourceAsFile("/join/merge_single_batch.right.json").toURI().toString()); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, physicalPlan); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(25, count); + } + } + + @Test + public void TestMultipleSendLocationBroadcastExchange() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + bit1.run(); + bit2.run(); + client.connect(); + + String physicalPlan = Files.toString( + FileUtils.getResourceAsFile("/sender/broadcast_exchange_long_run.json"), Charsets.UTF_8); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, physicalPlan); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + System.out.println(count); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/resources/sender/broadcast_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/broadcast_exchange.json b/exec/java-exec/src/test/resources/sender/broadcast_exchange.json new file mode 100644 index 0000000..64f1606 --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/broadcast_exchange.json @@ -0,0 +1,55 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"json-scan", + entries: [ + { + path : "#{LEFT_FILE}" + } + ], + storageengine: { + "type": "json", + "dfsName": "file:///" + } + }, + { + @id:2, + pop:"json-scan", + entries: [ + { + path : "#{RIGHT_FILE}" + } + ], + storageengine: { + "type": "json", + "dfsName": "file:///" + } + }, + { + @id: 3, + child: 1, + pop: "broadcast-exchange" + }, + { + pop : "merge-join", + @id : 4, + left: 3, + right: 2, + join-type: "LEFT", + join-conditions: [ { relationship: "==", left: "a", right: "aa" } ] + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4a872263/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json b/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json new file mode 100644 index 0000000..aa15541 --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/broadcast_exchange_long_run.json @@ -0,0 +1,53 @@ +{ + "head": { + "type": "APACHE_DRILL_PHYSICAL", + "version": "1", + "generator": { + "type": "manual" + } + }, + "graph": [ + { + "@id": 1, + "pop": "mock-scan", + "url": "http://apache.org", + "entries": [ + { + "records": 10000, + "types": [ + { + "name": "blue", + "type": "INT", + "mode": "REQUIRED" + }, + { + "name": "red", + "type": "BIGINT", + "mode": "REQUIRED" + }, + { + "name": "green", + "type": "INT", + "mode": "REQUIRED" + } + ] + } + ] + }, + { + "@id": 2, + "child": 1, + "pop": "broadcast-exchange" + }, + { + @id: 3, + child: 2, + pop: "union-exchange" + }, + { + "@id": 4, + "child": 3, + "pop": "screen" + } + ] +} \ No newline at end of file
