DRILL-6323: Lateral Join - Initial implementation
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5a63e274 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5a63e274 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5a63e274 Branch: refs/heads/master Commit: 5a63e2748d71134c0258147972b45a0fb25f5461 Parents: 8051c24 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Mon Feb 5 14:46:19 2018 -0800 Committer: Parth Chandra <par...@apache.org> Committed: Tue Apr 17 18:15:44 2018 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 5 + .../physical/base/AbstractPhysicalVisitor.java | 3 - .../exec/physical/base/PhysicalVisitor.java | 3 - .../exec/physical/config/LateralJoinPOP.java | 55 ++ .../exec/physical/impl/join/LateralJoin.java | 55 ++ .../physical/impl/join/LateralJoinBatch.java | 741 +++++++++++++++++++ .../impl/join/LateralJoinBatchCreator.java | 34 + .../physical/impl/join/LateralJoinTemplate.java | 146 ++++ .../physical/impl/sort/RecordBatchData.java | 1 + .../exec/record/AbstractBinaryRecordBatch.java | 11 + .../drill/exec/record/AbstractRecordBatch.java | 1 + 11 files changed, 1049 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 77fa211..5a89081 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -186,6 +186,11 @@ public final class ExecConstants { public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled"; public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size"; + /** + * Setting to control enabling/disabling writing of generated code dump for Lateral + */ + public static final String ENABLE_CODE_DUMP_DEBUG_LATERAL = "drill.exec.compile.codegen.debug.lateral"; + /** Size of JDBC batch queue (in batches) above which throttling begins. */ public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD = "drill.jdbc.batch_queue_throttling_threshold"; http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 3a5d22e..66affeb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -21,13 +21,10 @@ import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; -import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.Limit; -import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; -import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index db9c873..8480a07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -21,13 +21,10 @@ import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; -import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.config.Limit; -import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.MergingReceiverPOP; -import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java new file mode 100644 index 0000000..946b4a6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.physical.base.AbstractJoinPop; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +import java.util.List; + +@JsonTypeName("lateral-join") +public class LateralJoinPOP extends AbstractJoinPop { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class); + + @JsonCreator + public LateralJoinPOP( + @JsonProperty("left") PhysicalOperator left, + @JsonProperty("right") PhysicalOperator right, + @JsonProperty("joinType") JoinRelType joinType) { + super(left, right, joinType, null, null); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.size() == 2, + "Lateral join should have two physical operators"); + return new LateralJoinPOP(children.get(0), children.get(1), joinType); + } + + @Override + public int getOperatorType() { + return CoreOperatorType.LATERAL_JOIN_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java new file mode 100644 index 0000000..1d946ce --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; + +import java.util.LinkedList; + +/** + * Interface for the lateral join operator. + */ +public interface LateralJoin { + public static TemplateClassDefinition<LateralJoin> TEMPLATE_DEFINITION = + new TemplateClassDefinition<>(LateralJoin.class, LateralJoinTemplate.class); + + public void setupLateralJoin(FragmentContext context, RecordBatch left, + RecordBatch right, LateralJoinBatch outgoing, + JoinRelType joinType); + + // Produce output records taking into account join type + public int crossJoinAndOutputRecords(int leftIndex, int rightIndex); + + public void generateLeftJoinOutput(int leftIndex); + + // Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex' + public void emitLeft(int leftIndex, int outIndex); + + // Project the record from the hyper container given the batch index and the record within the batch at 'outIndex' + public void emitRight(int rightIndex, int outIndex); + + // Setup the input/output value vector references + public void doSetup(FragmentContext context, RecordBatch rightBatch, + RecordBatch leftBatch, RecordBatch outgoing); +} http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java new file mode 100644 index 0000000..45b5059 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.base.Preconditions; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.record.AbstractBinaryRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessibleUtilities; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.AllocationHelper; + +import java.io.IOException; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; + +/* + * RecordBatch implementation for the lateral join operator + * TODO: Create another class called BatchState for both left and right batches and store + * TODO: Schema, index and other flags in it. + */ +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class); + + // Maximum number records in the outgoing batch + protected static final int MAX_BATCH_SIZE = 4096; + + // Input indexes to correctly update the stats + protected static final int LEFT_INPUT = 0; + protected static final int RIGHT_INPUT = 1; + + // Schema on the left side + private BatchSchema leftSchema = null; + + // Schema on the right side + private BatchSchema rightSchema = null; + + // Runtime generated class implementing the LateralJoin interface + private LateralJoin lateralJoiner = null; + + // Number of output records in the current outgoing batch + private int outputRecords = 0; + + // Current index of record in left incoming which is being processed + private int leftJoinIndex = -1; + + // Current index of record in right incoming which is being processed + private int rightJoinIndex = -1; + + // flag to keep track if current left batch needs to be processed in future next call + private boolean processLeftBatchInFuture = false; + + // Keep track if any matching right record was found for current left index record + private boolean matchedRecordFound = false; + + private boolean enableLateralCGDebugging = true; + + // Shared Generator mapping for the left/right side : constant + private static final GeneratorMapping EMIT_CONSTANT = + GeneratorMapping.create("doSetup"/* setup method */,"doSetup" /* eval method */, + null /* reset */, null /* cleanup */); + + // Generator mapping for the right side + private static final GeneratorMapping EMIT_RIGHT = + GeneratorMapping.create("doSetup"/* setup method */,"emitRight" /* eval method */, + null /* reset */,null /* cleanup */); + + // Generator mapping for the left side : scalar + private static final GeneratorMapping EMIT_LEFT = + GeneratorMapping.create("doSetup" /* setup method */, "emitLeft" /* eval method */, + null /* reset */, null /* cleanup */); + + // Mapping set for the right side + private static final MappingSet emitRightMapping = + new MappingSet("rightIndex" /* read index */, "outIndex" /* write index */, + "rightBatch" /* read container */,"outgoing" /* write container */, + EMIT_CONSTANT, EMIT_RIGHT); + + // Mapping set for the left side + private static final MappingSet emitLeftMapping = + new MappingSet("leftIndex" /* read index */, "outIndex" /* write index */, + "leftBatch" /* read container */,"outgoing" /* write container */, + EMIT_CONSTANT, EMIT_LEFT); + + protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, + RecordBatch left, RecordBatch right) throws OutOfMemoryException { + super(popConfig, context, left, right); + Preconditions.checkNotNull(left); + Preconditions.checkNotNull(right); + enableLateralCGDebugging = true;//context.getConfig().getBoolean(ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL); + } + + private boolean handleSchemaChange() { + try { + stats.startSetup(); + setupNewSchema(); + lateralJoiner = setupWorker(); + lateralJoiner.setupLateralJoin(context, left, right, this, popConfig.getJoinType()); + return true; + } catch (SchemaChangeException ex) { + logger.error("Failed to handle schema change hence killing the query"); + context.getExecutorState().fail(ex); + kill(false); + return false; + } finally { + stats.stopSetup(); + } + } + + private boolean isTerminalOutcome(IterOutcome outcome) { + return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE); + } + + /** + * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is + * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also + * when we populate the outgoing container then this method is called to get next left batch if current one is + * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of + * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome. + * @param leftBatch - reference to left incoming record batch. Not needed but provided to make it easy for testing. + * @return IterOutcome after processing current left batch + */ + private IterOutcome processLeftBatch(RecordBatch leftBatch) { + + boolean needLeftBatch = leftJoinIndex == -1; + + // If left batch is empty + while (needLeftBatch) { + leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, leftBatch) : leftUpstream; + final boolean emptyLeftBatch = leftBatch.getRecordCount() <=0; + + switch (leftUpstream) { + case OK_NEW_SCHEMA: + // This means there is already some records from previous join inside left batch + // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call + if (outputRecords > 0) { + processLeftBatchInFuture = true; + return OK_NEW_SCHEMA; + } + + // This OK_NEW_SCHEMA is received post build schema phase and from left side + if (!isSchemaChanged(left.getSchema(), leftSchema)) { + logger.warn("New schema received from left side is same as previous known left schema. Ignoring this " + + "schema change"); + + // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose + // OK_NEW_SCHEMA outcome + if (emptyLeftBatch) { + processLeftBatchInFuture = false; + continue; + } + } + + // If left batch is empty with actual schema change then just rebuild the output container and send empty + // batch downstream + if (emptyLeftBatch) { + if (handleSchemaChange()) { + container.setRecordCount(0); + leftJoinIndex = -1; + return OK_NEW_SCHEMA; + } else { + return STOP; + } + } // else - setup the new schema information after getting it from right side too. + case OK: + // With OK outcome we will keep calling next until we get a batch with >0 records + if (emptyLeftBatch) { + leftJoinIndex = -1; + continue; + } else { + leftJoinIndex = 0; + } + break; + case EMIT: + // don't call next on right batch + if (emptyLeftBatch) { + leftJoinIndex = -1; + container.setRecordCount(0); + return EMIT; + } else { + leftJoinIndex = 0; + } + break; + case OUT_OF_MEMORY: + case NONE: + case STOP: + // Not using =0 since if outgoing container is empty then no point returning anything + if (outputRecords > 0) { + processLeftBatchInFuture = true; + } + //TODO we got a STOP, shouldn't we stop immediately ? + // TODO: check what killAndDrain will do w.r.t UNNEST, we discussed about calling right side + // of LATERAL with NONE outcome or calling stop explicitly when NONE is seen on left side + killAndDrainIncoming(right, rightUpstream, RIGHT_INPUT); + return leftUpstream; + case NOT_YET: + try { + Thread.sleep(5); + } catch (InterruptedException ex) { + logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + + "received NOT_YET"); + } + break; + } + + needLeftBatch = leftJoinIndex == -1; + } + + return leftUpstream; + } + + private IterOutcome processRightBatch(RecordBatch right) { + // Check if we still have records left to process in left incoming from new batch or previously half processed + // batch. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new + // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then + // it will be set to -1. + // Whereas rightJoinIndex is to keep track of record in right batch being joined with + // record in left batch. So when there are cases such that all records in right batch is not consumed + // by the output, then rightJoinIndex will be a valid index. When all records are consumed it will be set to -1. + boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1); + while (needNewRightBatch) { + rightUpstream = next(RIGHT_INPUT, right); + switch (rightUpstream) { + case OK_NEW_SCHEMA: + // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a + // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT + // fall through + // + // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema + // downstream and later with subsequent next() call the join output will be produced + Preconditions.checkState(right.getRecordCount() == 0, + "Right side batch with OK_NEW_SCHEMA is not empty"); + + if (!isSchemaChanged(right.getSchema(), rightSchema)) { + logger.warn("New schema received from right side is same as previous known right schema. Ignoring this " + "schema change"); + continue; + } + if (handleSchemaChange()) { + container.setRecordCount(0); + rightJoinIndex = -1; + return OK_NEW_SCHEMA; + } else { + return STOP; + } + case OK: + case EMIT: + // Even if there are no records we should not call next() again because in case of LEFT join + // empty batch is of importance too + rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; + needNewRightBatch = false; + break; + case OUT_OF_MEMORY: + case NONE: + case STOP: + //TODO we got a STOP, shouldn't we stop immediately ? + // TODO: Should we kill left side if right side fails ? + killAndDrainIncoming(left, leftUpstream, LEFT_INPUT); + VectorAccessibleUtilities.clear(container); + needNewRightBatch = false; + break; + case NOT_YET: + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + + "received NOT_YET"); + } + break; + } + } + + return rightUpstream; + } + + /** + * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is + * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty + * left incoming batch is received then it call's next on right branch to get an incoming and finally produces + * output. + * @return IterOutcome state of the lateral join batch + */ + @Override + public IterOutcome innerNext() { + + // We don't do anything special on FIRST state. Process left batch first and then right batch if need be + IterOutcome childOutcome = processLeftBatch(left); + + // reset this state after calling processLeftBatch above. + processLeftBatchInFuture = false; + + // If the left batch doesn't have any record in the incoming batch or the state returned from + // left side is terminal state then just return the IterOutcome and don't call next() on + // right branch + if (left.getRecordCount() == 0 || isTerminalOutcome(childOutcome)) { + return childOutcome; + } + + // Left side has some records in the batch so let's process right batch + childOutcome = processRightBatch(right); + + // reset the left & right outcomes to OK here and send the empty batch downstream + // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do + if (childOutcome == OK_NEW_SCHEMA) { + leftUpstream = OK; + rightUpstream = OK; + return childOutcome; + } + + if (isTerminalOutcome(childOutcome)) { + return childOutcome; + } + + // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in + // output container based on new left schema and old right schema. If schema change failed then return STOP + // downstream + if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { + return STOP; + } + + // Setup the references of left, right and outgoing container in generated operator + if (state == BatchState.FIRST) { + lateralJoiner.setupLateralJoin(context, left, right, this, popConfig.getJoinType()); + state = BatchState.NOT_FIRST; + } + + // allocate space for the outgoing batch + allocateVectors(); + + return produceOutputBatch(); + } + + private IterOutcome produceOutputBatch() { + + // Try to fully pack the outgoing container + while (outputRecords < LateralJoinBatch.MAX_BATCH_SIZE) { + int previousOutputCount = outputRecords; + // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex + outputRecords = lateralJoiner.crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex); + + // We have produced some records in outgoing container, hence there must be a match found for left record + if (outputRecords > previousOutputCount) { + matchedRecordFound = true; + } + + if (right.getRecordCount() == 0) { + rightJoinIndex = -1; + } else { + // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the + // output records for this record batch until it's fully consumed + rightJoinIndex += outputRecords; + } + + final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount(); + + // Check if above join to produce output was based on empty right batch or + // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream + // is EMIT then increase the leftJoinIndex. + // Otherwise it means for the given right batch there is still some record left to be processed. + if (isRightProcessed) { + if (rightUpstream == EMIT) { + if (!matchedRecordFound) { + // will only produce left side in case of LEFT join + lateralJoiner.generateLeftJoinOutput(leftJoinIndex); + } + ++leftJoinIndex; + // Reset matchedRecord for next left index record + matchedRecordFound = false; + } + + // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK + VectorAccessibleUtilities.clear(right); + rightJoinIndex = -1; + } + + // Check if previous left record was last one, then set leftJoinIndex to -1 + final boolean isLeftProcessed = leftJoinIndex >= left.getRecordCount(); + if (isLeftProcessed) { + leftJoinIndex = -1; + VectorAccessibleUtilities.clear(left); + } + + // Check if output batch still has some space + if (outputRecords < MAX_BATCH_SIZE) { + // Check if left side still has records or not + if (isLeftProcessed) { + // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer + if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) { + break; + } else { + // Get both left batch and the right batch and make sure indexes are properly set + leftUpstream = processLeftBatch(left); + + if (processLeftBatchInFuture) { + // We should return the current output batch with OK outcome and don't reset the leftUpstream + finalizeOutputContainer(); + return OK; + } + } + } + + // If we are here it means one of the below: + // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space + // left in outgoing batch so let's get next right batch. + // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing + // batch. Now we have got new left batch with OK outcome. Let's get next right batch + // + // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome + + rightUpstream = processRightBatch(right); + + Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch"); + + if (isTerminalOutcome(rightUpstream)) { + return rightUpstream; + } + } + } // output batch is full to its max capacity + + finalizeOutputContainer(); + + if (leftUpstream == EMIT) { + return EMIT; + } + + if (leftUpstream == OK_NEW_SCHEMA) { + // return output batch with OK_NEW_SCHEMA and reset the state to OK + leftUpstream = OK; + return OK_NEW_SCHEMA; + } + + return OK; + } + + /** + * Finalizes the current output container with the records produced so far before sending it downstream + */ + private void finalizeOutputContainer() { + + VectorAccessibleUtilities.setValueCount(container, outputRecords); + + // Set the record count in the container + container.setRecordCount(outputRecords); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + logger.debug("Number of records emitted: " + outputRecords); + + // We are about to send the output batch so reset the outputRecords for future next call + outputRecords = 0; + } + + private void killAndDrainIncoming(RecordBatch batch, IterOutcome outcome, + int batchIndex) { + if (!hasMore(outcome)) { + return; + } + + batch.kill(true); + while (hasMore(outcome)) { + for (final VectorWrapper<?> wrapper : batch) { + wrapper.getValueVector().clear(); + } + outcome = next(batchIndex, batch); + } + if (batchIndex == RIGHT_INPUT) { + rightUpstream = outcome; + } else { + leftUpstream = outcome; + } + } + + private boolean hasMore(IterOutcome outcome) { + return outcome == OK || outcome == OK_NEW_SCHEMA || outcome == EMIT; + } + + private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) { + return newSchema.isEquivalent(oldSchema); + } + + /** + * Helps to create the outgoing container vectors based on known left and right batch schemas + * @throws SchemaChangeException + */ + private void setupNewSchema() throws SchemaChangeException { + + // Clear up the container + container.clear(); + leftSchema = left.getSchema(); + rightSchema = right.getSchema(); + + if (leftSchema == null || rightSchema == null) { + throw new SchemaChangeException("Either of left or right schema was not set properly in the batches. Hence " + + "failed to setupNewSchema"); + } + + // Setup LeftSchema in outgoing container + for (final VectorWrapper<?> vectorWrapper : left) { + container.addOrGet(vectorWrapper.getField()); + } + + // Setup RightSchema in the outgoing container + for (final VectorWrapper<?> vectorWrapper : right) { + MaterializedField rightField = vectorWrapper.getField(); + TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType(); + + // make right input schema optional if we have LEFT join + if (popConfig.getJoinType() == JoinRelType.LEFT && + rightFieldType.getMode() == TypeProtos.DataMode.REQUIRED) { + final TypeProtos.MajorType outputType = + Types.overrideMode(rightField.getType(), TypeProtos.DataMode.OPTIONAL); + + // Create the right field with optional type. This will also take care of creating + // children fields in case of ValueVectors of map type + rightField = rightField.withType(outputType); + } + container.addOrGet(rightField); + } + + // Let's build schema for the container + container.setRecordCount(0); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + } + + /** + * Method generates the runtime code needed for LateralJoin. Other than the setup method to set the input and output + * value vector references we implement two more methods + * 1. emitLeft() -> Copy record from the left side to output container + * 2. emitRight() -> Copy record from the right side to output container + * @return the runtime generated class that implements the LateralJoin interface + */ + private LateralJoin setupWorker() throws SchemaChangeException { + final CodeGenerator<LateralJoin> lateralCG = CodeGenerator.get( + LateralJoin.TEMPLATE_DEFINITION, context.getOptions()); + lateralCG.plainJavaCapable(true); + + // To enabled code gen dump for lateral use the setting ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL + lateralCG.saveCodeForDebugging(enableLateralCGDebugging); + final ClassGenerator<LateralJoin> nLJClassGenerator = lateralCG.getRoot(); + + // generate emitLeft + nLJClassGenerator.setMappingSet(emitLeftMapping); + JExpression outIndex = JExpr.direct("outIndex"); + JExpression leftIndex = JExpr.direct("leftIndex"); + + int fieldId = 0; + int outputFieldId = 0; + if (leftSchema != null) { + // Set the input and output value vector references corresponding to the left batch + for (MaterializedField field : leftSchema) { + final TypeProtos.MajorType fieldType = field.getType(); + + // Add the vector to the output container + container.addOrGet(field); + + JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", + new TypedFieldId(fieldType, false, fieldId)); + JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(fieldType, false, outputFieldId)); + + nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV)); + nLJClassGenerator.rotateBlock(); + fieldId++; + outputFieldId++; + } + } + + // generate emitRight + fieldId = 0; + nLJClassGenerator.setMappingSet(emitRightMapping); + JExpression rightIndex = JExpr.direct("rightIndex"); + + if (rightSchema != null) { + // Set the input and output value vector references corresponding to the right batch + for (MaterializedField field : rightSchema) { + + final TypeProtos.MajorType inputType = field.getType(); + TypeProtos.MajorType outputType; + // if join type is LEFT, make sure right batch output fields data mode is optional + if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) { + outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL); + } else { + outputType = inputType; + } + + MaterializedField newField = MaterializedField.create(field.getName(), outputType); + container.addOrGet(newField); + + JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightBatch", + new TypedFieldId(inputType, false, fieldId)); + JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", + new TypedFieldId(outputType, false, outputFieldId)); + nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe") + .arg(rightIndex) + .arg(outIndex) + .arg(inVV)); + nLJClassGenerator.rotateBlock(); + fieldId++; + outputFieldId++; + } + } + + try { + return context.getImplementationClass(lateralCG); + } catch (IOException | ClassTransformationException ex) { + throw new SchemaChangeException("Failed while setting up generated class with new schema information", ex); + } + } + + /** + * Simple method to allocate space for all the vectors in the container. + */ + private void allocateVectors() { + for (final VectorWrapper<?> vw : container) { + AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE); + } + } + + /** + * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in + * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side + * which UNNEST should abide by. + * + * @throws SchemaChangeException if batch schema was changed during execution + */ + @Override + protected void buildSchema() throws SchemaChangeException { + // Prefetch a RecordBatch from both left and right branch + if (!prefetchFirstBatchFromBothSides()) { + return; + } + + Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received"); + + // Setup output container schema based on known left and right schema + setupNewSchema(); + + // Release the vectors received from right side + VectorAccessibleUtilities.clear(right); + + + // We should not allocate memory for all the value vectors inside output batch + // since this is buildSchema phase and we are sending empty batches downstream + lateralJoiner = setupWorker(); + + // Set join index as invalid (-1) if the left side is empty, else set it to 0 + leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; + rightJoinIndex = -1; + + // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly + // by buildSchema caller and we should treat the batch as received with OK outcome. + leftUpstream = OK; + rightUpstream = OK; + } + + @Override + public void close() { + super.close(); + } + + @Override + protected void killIncoming(boolean sendUpstream) { + this.left.kill(sendUpstream); + this.right.kill(sendUpstream); + } + + @Override + public int getRecordCount() { + return container.getRecordCount(); + } + + /** + * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral + * to process the records at leftJoinIndex. + * + * @return - RecordBatch received as left side incoming + */ + @Override + public RecordBatch getIncoming() { + Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " + + "called iff there is any valid left batch"); + return left; + } + + /** + * Returns the current row index which the calling operator should process in current incoming left record batch. + * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never + * call next on right side with empty left batch + * + * @return - int - index of row to process. + */ + @Override + public int getRecordIndex() { + Preconditions.checkState (leftJoinIndex < left.getRecordCount(), + String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount())); + return leftJoinIndex; + } + + /** + * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch + */ + public IterOutcome getLeftOutcome() { + return leftUpstream; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java new file mode 100644 index 0000000..019b095 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> { + @Override + public LateralJoinBatch getBatch(ExecutorFragmentContext context, LateralJoinPOP config, List<RecordBatch> children) + throws ExecutionSetupException { + return new LateralJoinBatch(config, context, children.get(0), children.get(1)); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java new file mode 100644 index 0000000..6e756a4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; + +import javax.inject.Named; + +/* + * Template class that combined with the runtime generated source implements the LateralJoin interface. This + * class contains the main lateral join logic. + */ +public abstract class LateralJoinTemplate implements LateralJoin { + + // Current right input batch being processed + private RecordBatch right = null; + + // Index in outgoing container where new record will be inserted + private int outputIndex = 0; + + // Keep the join type at setup phase + private JoinRelType joinType; + + /** + * Method initializes necessary state and invokes the doSetup() to set the input and output value vector references. + * + * @param context Fragment context + * @param left Current left input batch being processed + * @param right Current right input batch being processed + * @param outgoing Output batch + */ + public void setupLateralJoin(FragmentContext context, + RecordBatch left, RecordBatch right, + LateralJoinBatch outgoing, JoinRelType joinType) { + this.right = right; + this.joinType = joinType; + doSetup(context, this.right, left, outgoing); + } + + /** + * Main entry point for producing the output records. This method populates the output batch after cross join of + * the record in a given left batch at left index and all the corresponding right batches produced for + * this left index. The right container is copied starting from rightIndex until number of records in the container. + * + * @return the number of records produced in the output batch + */ + public int crossJoinAndOutputRecords(int leftIndex, int rightIndex) { + + final int rightRecordCount = right.getRecordCount(); + int currentOutputIndex = outputIndex; + + // If there is no record in right batch just return current index in output batch + if (rightRecordCount <= 0) { + return currentOutputIndex; + } + + // Check if right batch is empty since we have to handle left join case + Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 but index is -1"); + // For every record in right side just emit left and right records in output container + for (; rightIndex < rightRecordCount; ++rightIndex) { + emitLeft(leftIndex, currentOutputIndex); + emitRight(rightIndex, currentOutputIndex); + ++currentOutputIndex; + + if (currentOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) { + break; + } + } + + updateOutputIndex(currentOutputIndex); + return currentOutputIndex; + } + + /** + * If current output batch is full then reset the output index for next output batch + * Otherwise it means we still have space left in output batch, so next call will continue populating from + * newOutputIndex + * @param newOutputIndex - new output index of outgoing batch after copying the records + */ + private void updateOutputIndex(int newOutputIndex) { + outputIndex = (newOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) ? + 0 : newOutputIndex; + } + + /** + * Method to copy just the left batch record at given leftIndex, the right side records will be NULL. This is + * used in case when Join Type is LEFT and we have only seen empty batches from right side + * @param leftIndex - index in left batch to copy record from + */ + public void generateLeftJoinOutput(int leftIndex) { + int currentOutputIndex = outputIndex; + + if (JoinRelType.LEFT == joinType) { + emitLeft(leftIndex, currentOutputIndex++); + updateOutputIndex(currentOutputIndex); + } + } + + /** + * Generated method to setup vector references in rightBatch, leftBatch and outgoing batch. It should be called + * after initial schema build phase, when the schema for outgoing container is known. This method should also be + * called after each New Schema discovery during execution. + * @param context + * @param rightBatch - right incoming batch + * @param leftBatch - left incoming batch + * @param outgoing - output batch + */ + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("rightBatch") RecordBatch rightBatch, + @Named("leftBatch") RecordBatch leftBatch, + @Named("outgoing") RecordBatch outgoing); + + /** + * Generated method to copy the record from right batch at rightIndex to outgoing batch at outIndex + * @param rightIndex - index to copy record from the right batch + * @param outIndex - index to copy record to a outgoing batch + */ + public abstract void emitRight(@Named("rightIndex") int rightIndex, + @Named("outIndex") int outIndex); + + /** + * Generated method to copy the record from left batch at leftIndex to outgoing batch at outIndex + * @param leftIndex - index to copy record from the left batch + * @param outIndex - index to copy record to a outgoing batch + */ + public abstract void emitLeft(@Named("leftIndex") int leftIndex, + @Named("outIndex") int outIndex); +} http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 6de4df6..881954c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -56,6 +56,7 @@ public class RecordBatchData { throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); } TransferPair tp = v.getValueVector().getTransferPair(allocator); + // Transfer make sure of releasing memory for value vector in source container. tp.transfer(); vectors.add(tp.getTo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index 94aef07..b671915 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -51,7 +51,11 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte * false if caller should stop and exit from processing. */ protected boolean prefetchFirstBatchFromBothSides() { + // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch leftUpstream = next(0, left); + + // Right will always get zero records with OK_NEW_SCHEMA outcome as first batch, since right + // now Lateral will always be tied up with UNNEST rightUpstream = next(1, right); if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { @@ -69,6 +73,13 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte return false; } + // EMIT outcome is not expected as part of first batch from either side + if (leftUpstream == IterOutcome.EMIT || rightUpstream == IterOutcome.EMIT) { + state = BatchState.STOP; + throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + + "buildSchema phase"); + } + return true; } http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 015d078..8bf1856 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -126,6 +126,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements stats.batchReceived(inputIndex, b.getRecordCount(), true); break; case OK: + case EMIT: stats.batchReceived(inputIndex, b.getRecordCount(), false); break; default: