Updated Branches: refs/heads/master 70cd6af56 -> fe94aa814
DRILL-263: PartitionSender not setting opposite minor fragment id correctly Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f3ecb207 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f3ecb207 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f3ecb207 Branch: refs/heads/master Commit: f3ecb2077acaf21c46a1ce2145d21cbc0e9b635e Parents: 70cd6af Author: Steven Phillips <[email protected]> Authored: Thu Sep 19 21:44:17 2013 -0700 Committer: Steven Phillips <[email protected]> Committed: Wed Oct 30 15:23:10 2013 -0700 ---------------------------------------------------------------------- .../impl/partitionsender/OutgoingRecordBatch.java | 8 +++++--- .../impl/partitionsender/PartitionSenderRootExec.java | 11 ++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3ecb207/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 2940dc0..c24a7a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -63,12 +63,14 @@ public class OutgoingRecordBatch implements RecordBatch { private VectorContainer vectorContainer; private int recordCount; private int recordCapacity; + private int oppositeMinorFragmentId; - public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context) { + public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) { this.incoming = incoming; this.context = context; this.operator = operator; this.tunnel = tunnel; + this.oppositeMinorFragmentId = oppositeMinorFragmentId; initializeBatch(); } @@ -107,7 +109,7 @@ public class OutgoingRecordBatch implements RecordBatch { handle.getMajorFragmentId(), handle.getMinorFragmentId(), operator.getOppositeMajorFragmentId(), - 0, + oppositeMinorFragmentId, getWritableBatch()); tunnel.sendRecordBatch(statusHandler, context, writableBatch); @@ -123,7 +125,7 @@ public class OutgoingRecordBatch implements RecordBatch { handle.getMajorFragmentId(), handle.getMinorFragmentId(), operator.getOppositeMajorFragmentId(), - 0, + oppositeMinorFragmentId, getWritableBatch()); tunnel.sendRecordBatch(statusHandler, context, writableBatch); return true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3ecb207/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 033dd51..87c7ee5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -64,8 +64,8 @@ class PartitionSenderRootExec implements RootExec { this.context = context; this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; int fieldId = 0; - for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) - outgoing[fieldId++] = new OutgoingRecordBatch(operator, + for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) { + outgoing[fieldId] = new OutgoingRecordBatch(operator, context.getCommunicator().getTunnel(endpoint), incoming, context); @@ -107,7 +107,12 @@ class PartitionSenderRootExec implements RootExec { case OK_NEW_SCHEMA: try { // send all existing batches - flushOutgoingBatches(false, true); + if (partitioner != null) { + flushOutgoingBatches(false, true); + } + for (OutgoingRecordBatch b : outgoing) { + b.initializeBatch(); + } // update OutgoingRecordBatch's schema and generate partitioning code createPartitioner(); } catch (SchemaChangeException e) {
