Repository: incubator-drill Updated Branches: refs/heads/master e1e5ea0ed -> 8490d7433
DRILL-864: MergeJoinBatch fails to set record count in ValueVectors in container Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/623a52e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/623a52e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/623a52e1 Branch: refs/heads/master Commit: 623a52e11e94cdfeac1bb734890ec7155cc760d5 Parents: e1e5ea0 Author: vkorukanti <[email protected]> Authored: Thu May 29 17:23:05 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 2 09:11:39 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/join/MergeJoinBatch.java | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/623a52e1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 121cfec..b284454 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; import java.io.IOException; import java.util.List; +import com.google.common.base.Preconditions; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -181,18 +182,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { case BATCH_RETURNED: // only return new schema if new worker has been setup. logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); + setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; case FAILURE: kill(); return IterOutcome.STOP; case NO_MORE_DATA: logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE")); + setRecordCountInContainer(); return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE; case SCHEMA_CHANGED: worker = null; if(status.getOutPosition() > 0){ // if we have current data, let's return that. logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); + setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; }else{ // loop again to rebuild worker. @@ -210,6 +214,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } } + private void setRecordCountInContainer() { + for(VectorWrapper vw : container){ + Preconditions.checkArgument(!vw.isHyper()); + vw.getValueVector().getMutator().setValueCount(getRecordCount()); + } + } + public void resetBatchBuilder() { batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status); }
