DRILL-853: Enable broadcast joins and fix some issues with BroadcastExchange and ScreenCreator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/760cbd42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/760cbd42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/760cbd42 Branch: refs/heads/master Commit: 760cbd421c131ed43f5011764c7e244b661bd84b Parents: 623a52e Author: Aman Sinha <[email protected]> Authored: Wed May 28 17:32:05 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 2 09:12:09 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/ScreenCreator.java | 4 +- .../BroadcastSenderRootExec.java | 51 +++++++++++++++----- .../exec/planner/physical/PlannerSettings.java | 11 +++-- .../apache/drill/exec/record/WritableBatch.java | 6 +++ .../apache/drill/exec/rpc/data/DataTunnel.java | 12 ++--- .../server/options/SystemOptionManager.java | 1 + 6 files changed, 63 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index c92633f..9aefbe8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -74,6 +74,7 @@ public class ScreenCreator implements RootCreator<Screen>{ public boolean next() { if(!ok){ stop(); + context.fail(this.listener.ex); return false; } @@ -135,7 +136,7 @@ public class ScreenCreator implements RootCreator<Screen>{ private SendListener listener = new SendListener(); private class SendListener extends BaseRpcOutcomeListener<Ack>{ - + volatile RpcException ex; @Override @@ -150,6 +151,7 @@ public class ScreenCreator implements RootCreator<Screen>{ logger.error("Failure while sending data to user.", ex); ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); ok = false; + this.ex = ex; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 0a01583..9c55825 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -17,6 +17,8 @@ ******************************************************************************/ package org.apache.drill.exec.physical.impl.broadcastsender; +import io.netty.buffer.ByteBuf; + import java.util.List; import org.apache.drill.exec.ops.FragmentContext; @@ -26,13 +28,15 @@ import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.work.ErrorHelper; /** * Broadcast Sender broadcasts incoming batches to all receivers (one or more). @@ -47,7 +51,6 @@ public class BroadcastSenderRootExec implements RootExec { private final ExecProtos.FragmentHandle handle; private volatile boolean ok; private final RecordBatch incoming; - private final DrillRpcFuture[] responseFutures; public BroadcastSenderRootExec(FragmentContext context, RecordBatch incoming, @@ -63,12 +66,12 @@ public class BroadcastSenderRootExec implements RootExec { FragmentHandle opp = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(i).build(); tunnels[i] = context.getDataTunnel(destinations.get(i), opp); } - responseFutures = new DrillRpcFuture[destinations.size()]; } @Override public boolean next() { if(!ok) { + context.fail(statusHandler.ex); return false; } @@ -79,24 +82,25 @@ public class BroadcastSenderRootExec implements RootExec { case NONE: for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i); - responseFutures[i] = tunnels[i].sendRecordBatch(context, b2); + tunnels[i].sendRecordBatch(this.statusHandler, b2); + statusHandler.sendCount.increment(); } - waitAllFutures(false); return false; case OK_NEW_SCHEMA: case OK: WritableBatch writableBatch = incoming.getWritableBatch(); + if (tunnels.length > 1) { + writableBatch.retainBuffers(tunnels.length - 1); + } for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch); - if(i > 0) { - writableBatch.retainBuffers(); - } - responseFutures[i] = tunnels[i].sendRecordBatch(context, batch); + tunnels[i].sendRecordBatch(this.statusHandler, batch); + statusHandler.sendCount.increment(); } - return waitAllFutures(true); + return ok; case NOT_YET: default: @@ -104,6 +108,7 @@ public class BroadcastSenderRootExec implements RootExec { } } + /* private boolean waitAllFutures(boolean haltOnError) { for (DrillRpcFuture<?> responseFuture : responseFutures) { try { @@ -124,10 +129,34 @@ public class BroadcastSenderRootExec implements RootExec { } return true; } - +*/ + @Override public void stop() { ok = false; + statusHandler.sendCount.waitForSendComplete(); incoming.cleanup(); } + + private StatusHandler statusHandler = new StatusHandler(); + private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> { + volatile RpcException ex; + private final SendingAccountor sendCount = new SendingAccountor(); + + @Override + public void success(Ack value, ByteBuf buffer) { + sendCount.decrement(); + super.success(value, buffer); + } + + @Override + public void failed(RpcException ex) { + sendCount.decrement(); + logger.error("Failure while sending data to user.", ex); + ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); + ok = false; + this.ex = ex; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 18a32af..ad9fa90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -22,13 +22,15 @@ import net.hydromatic.optiq.tools.FrameworkContext; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValidator; import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; +import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator; public class PlannerSettings implements FrameworkContext{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class); private int numEndPoints = 0; private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing - private int broadcastThreshold = 10000; // Consider broadcast inner plans if estimated rows is less than this threshold + + public static final int MAX_BROADCAST_THRESHOLD = Integer.MAX_VALUE; public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false); public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true); @@ -36,7 +38,8 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true); public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true); public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true); - public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false); + public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true); + public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 10000); public OptionManager options = null; @@ -88,8 +91,8 @@ public class PlannerSettings implements FrameworkContext{ return options.getOption(BROADCAST.getOptionName()).bool_val; } - public int getBroadcastThreshold() { - return broadcastThreshold; + public long getBroadcastThreshold() { + return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 4ff3708..14ade39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -163,4 +163,10 @@ public class WritableBatch { buf.retain(); } } + + public void retainBuffers(int increment) { + for (ByteBuf buf : buffers) { + buf.retain(increment); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 1dcd89e..98bbeeb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -37,22 +37,22 @@ public class DataTunnel { } public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) { - SendBatch b = new SendBatch(outcomeListener, batch); + SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch); manager.runCommand(b); } public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) { - SendBatchAsync b = new SendBatchAsync(batch, context); + SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context); manager.runCommand(b); return b.getFuture(); } - public static class SendBatch extends ListeningCommand<Ack, DataClientConnection> { + private static class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> { final FragmentWritableBatch batch; - public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { + public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { super(listener); this.batch = batch; } @@ -70,11 +70,11 @@ public class DataTunnel { } - public static class SendBatchAsync extends FutureBitCommand<Ack, DataClientConnection> { + private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> { final FragmentWritableBatch batch; final FragmentContext context; - public SendBatchAsync(FragmentWritableBatch batch, FragmentContext context) { + public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) { super(); this.batch = batch; this.context = context; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/760cbd42/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 8d9a68f..3e90eb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -41,6 +41,7 @@ public class SystemOptionManager implements OptionManager{ PlannerSettings.MERGEJOIN, PlannerSettings.MULTIPHASE, PlannerSettings.BROADCAST, + PlannerSettings.BROADCAST_THRESHOLD, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR };
