Updated Branches:
  refs/heads/master 70cd6af56 -> fe94aa814

DRILL-263: PartitionSender not setting opposite minor fragment id correctly


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f3ecb207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f3ecb207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f3ecb207

Branch: refs/heads/master
Commit: f3ecb2077acaf21c46a1ce2145d21cbc0e9b635e
Parents: 70cd6af
Author: Steven Phillips <[email protected]>
Authored: Thu Sep 19 21:44:17 2013 -0700
Committer: Steven Phillips <[email protected]>
Committed: Wed Oct 30 15:23:10 2013 -0700

----------------------------------------------------------------------
 .../impl/partitionsender/OutgoingRecordBatch.java        |  8 +++++---
 .../impl/partitionsender/PartitionSenderRootExec.java    | 11 ++++++++---
 2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3ecb207/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 2940dc0..c24a7a3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -63,12 +63,14 @@ public class OutgoingRecordBatch implements RecordBatch {
   private VectorContainer vectorContainer;
   private int recordCount;
   private int recordCapacity;
+  private int oppositeMinorFragmentId;
 
-  public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, 
RecordBatch incoming, FragmentContext context) {
+  public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, 
RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) {
     this.incoming = incoming;
     this.context = context;
     this.operator = operator;
     this.tunnel = tunnel;
+    this.oppositeMinorFragmentId = oppositeMinorFragmentId;
     initializeBatch();
   }
 
@@ -107,7 +109,7 @@ public class OutgoingRecordBatch implements RecordBatch {
                                                                       
handle.getMajorFragmentId(),
                                                                       
handle.getMinorFragmentId(),
                                                                       
operator.getOppositeMajorFragmentId(),
-                                                                      0,
+                                                                      
oppositeMinorFragmentId,
                                                                       
getWritableBatch());
 
       tunnel.sendRecordBatch(statusHandler, context, writableBatch);
@@ -123,7 +125,7 @@ public class OutgoingRecordBatch implements RecordBatch {
                                                                         
handle.getMajorFragmentId(),
                                                                         
handle.getMinorFragmentId(),
                                                                         
operator.getOppositeMajorFragmentId(),
-                                                                        0,
+                                                                        
oppositeMinorFragmentId,
                                                                         
getWritableBatch());
         tunnel.sendRecordBatch(statusHandler, context, writableBatch);
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f3ecb207/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 033dd51..87c7ee5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -64,8 +64,8 @@ class PartitionSenderRootExec implements RootExec {
     this.context = context;
     this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
     int fieldId = 0;
-    for (CoordinationProtos.DrillbitEndpoint endpoint : 
operator.getDestinations())
-      outgoing[fieldId++] = new OutgoingRecordBatch(operator,
+    for (CoordinationProtos.DrillbitEndpoint endpoint : 
operator.getDestinations()) {
+      outgoing[fieldId] = new OutgoingRecordBatch(operator,
                                                     
context.getCommunicator().getTunnel(endpoint),
                                                     incoming,
                                                     context);
@@ -107,7 +107,12 @@ class PartitionSenderRootExec implements RootExec {
       case OK_NEW_SCHEMA:
         try {
           // send all existing batches
-          flushOutgoingBatches(false, true);
+          if (partitioner != null) {
+            flushOutgoingBatches(false, true);
+          }
+          for (OutgoingRecordBatch b : outgoing) {
+            b.initializeBatch();
+          }
           // update OutgoingRecordBatch's schema and generate partitioning code
           createPartitioner();
         } catch (SchemaChangeException e) {

Reply via email to