DRILL-6323: Lateral Join - Remove codegen and operator template class. Logic to copy data is moved to LateralJoinBatch itself
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5ed31962 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5ed31962 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5ed31962 Branch: refs/heads/master Commit: 5ed319620b66009f069934bba72f2740ff3cacaf Parents: 74565cc Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> Authored: Fri Mar 9 15:56:22 2018 -0800 Committer: Parth Chandra <par...@apache.org> Committed: Tue Apr 17 18:15:54 2018 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/join/LateralJoin.java | 57 ---- .../physical/impl/join/LateralJoinBatch.java | 313 ++++++++----------- .../physical/impl/join/LateralJoinTemplate.java | 147 --------- .../impl/join/TestLateralJoinCorrectness.java | 67 ++++ 4 files changed, 195 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java deleted file mode 100644 index 723c0ef..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.join; - -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.drill.exec.compile.TemplateClassDefinition; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.LateralJoinPOP; -import org.apache.drill.exec.record.ExpandableHyperContainer; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; - -import java.util.LinkedList; - -/** - * Interface for the lateral join operator. - */ -public interface LateralJoin { - public static TemplateClassDefinition<LateralJoin> TEMPLATE_DEFINITION = - new TemplateClassDefinition<>(LateralJoin.class, LateralJoinTemplate.class); - - public void setupLateralJoin(FragmentContext context, RecordBatch left, - RecordBatch right, LateralJoinBatch outgoing, - JoinRelType joinType); - - // Produce output records taking into account join type - public int crossJoinAndOutputRecords(int leftIndex, int rightIndex); - - public int generateLeftJoinOutput(int leftIndex); - - public void updateOutputIndex(int newOutputIndex); - - // Project the record at offset 'leftIndex' in the left input batch into the output container at offset 'outIndex' - public void emitLeft(int leftIndex, int outIndex); - - // Project the record from the hyper container given the batch index and the record within the batch at 'outIndex' - public void emitRight(int rightIndex, int outIndex); - - // Setup the input/output value vector references - public void doSetup(FragmentContext context, RecordBatch rightBatch, - RecordBatch leftBatch, RecordBatch outgoing); -} http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 122ff86..f01bf1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -18,19 +18,11 @@ package org.apache.drill.exec.physical.impl.join; import com.google.common.base.Preconditions; -import com.sun.codemodel.JExpr; -import com.sun.codemodel.JExpression; -import com.sun.codemodel.JVar; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.compile.sig.GeneratorMapping; -import org.apache.drill.exec.compile.sig.MappingSet; -import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.LateralContract; import org.apache.drill.exec.physical.config.LateralJoinPOP; @@ -38,12 +30,10 @@ import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; - -import java.io.IOException; +import org.apache.drill.exec.vector.ValueVector; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; @@ -74,11 +64,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Schema on the right side private BatchSchema rightSchema = null; - // Runtime generated class implementing the LateralJoin interface - private LateralJoin lateralJoiner = null; - - // Number of output records in the current outgoing batch - private int outputRecords = 0; + // Index in output batch to populate next row + private int outputIndex = 0; // Current index of record in left incoming which is being processed private int leftJoinIndex = -1; @@ -92,49 +79,17 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Keep track if any matching right record was found for current left index record private boolean matchedRecordFound = false; - private boolean enableLateralCGDebugging = true; - - // Shared Generator mapping for the left/right side : constant - private static final GeneratorMapping EMIT_CONSTANT = - GeneratorMapping.create("doSetup"/* setup method */,"doSetup" /* eval method */, - null /* reset */, null /* cleanup */); - - // Generator mapping for the right side - private static final GeneratorMapping EMIT_RIGHT = - GeneratorMapping.create("doSetup"/* setup method */,"emitRight" /* eval method */, - null /* reset */,null /* cleanup */); - - // Generator mapping for the left side : scalar - private static final GeneratorMapping EMIT_LEFT = - GeneratorMapping.create("doSetup" /* setup method */, "emitLeft" /* eval method */, - null /* reset */, null /* cleanup */); - - // Mapping set for the right side - private static final MappingSet emitRightMapping = - new MappingSet("rightIndex" /* read index */, "outIndex" /* write index */, - "rightBatch" /* read container */,"outgoing" /* write container */, - EMIT_CONSTANT, EMIT_RIGHT); - - // Mapping set for the left side - private static final MappingSet emitLeftMapping = - new MappingSet("leftIndex" /* read index */, "outIndex" /* write index */, - "leftBatch" /* read container */,"outgoing" /* write container */, - EMIT_CONSTANT, EMIT_LEFT); - protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, left, right); Preconditions.checkNotNull(left); Preconditions.checkNotNull(right); - enableLateralCGDebugging = true;//context.getConfig().getBoolean(ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL); } private boolean handleSchemaChange() { try { stats.startSetup(); setupNewSchema(); - lateralJoiner = setupWorker(); - lateralJoiner.setupLateralJoin(context, left, right, this, popConfig.getJoinType()); return true; } catch (SchemaChangeException ex) { logger.error("Failed to handle schema change hence killing the query"); @@ -184,7 +139,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } else { leftUpstream = OK; } - } else if (outputRecords > 0) { + } else if (outputIndex > 0) { // This means there is already some records from previous join inside left batch // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call processLeftBatchInFuture = true; @@ -225,7 +180,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> case NONE: case STOP: // Not using =0 since if outgoing container is empty then no point returning anything - if (outputRecords > 0) { + if (outputIndex > 0) { processLeftBatchInFuture = true; } return leftUpstream; @@ -253,19 +208,18 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> private IterOutcome processRightBatch() { // Check if we still have records left to process in left incoming from new batch or previously half processed // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new - // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then - // it will be set to -1. - // Whereas rightJoinIndex is to keep track of record in right batch being joined with - // record in left batch. So when there are cases such that all records in right batch is not consumed - // by the output, then rightJoinIndex will be a valid index. When all records are consumed it will be set to -1. + // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set + // to -1. + // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch. + // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex + // will be a valid index. When all records are consumed it will be set to -1. boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1); while (needNewRightBatch) { rightUpstream = next(RIGHT_INPUT, right); switch (rightUpstream) { case OK_NEW_SCHEMA: // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a - // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT - // fall through + // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through // // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema // downstream and later with subsequent next() call the join output will be produced @@ -285,8 +239,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } case OK: case EMIT: - // Even if there are no records we should not call next() again because in case of LEFT join - // empty batch is of importance too + // Even if there are no records we should not call next() again because in case of LEFT join empty batch is + // of importance too rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; needNewRightBatch = false; break; @@ -351,17 +305,15 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // output container based on new left schema and old right schema. If schema change failed then return STOP // downstream if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { - return STOP; + return STOP; } // Setup the references of left, right and outgoing container in generated operator - if (state == BatchState.FIRST) { - lateralJoiner.setupLateralJoin(context, left, right, this, popConfig.getJoinType()); - state = BatchState.NOT_FIRST; - } + state = BatchState.NOT_FIRST; // allocate space for the outgoing batch allocateVectors(); + return produceOutputBatch(); } @@ -379,27 +331,26 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> boolean isLeftProcessed = false; // Try to fully pack the outgoing container - while (outputRecords < LateralJoinBatch.MAX_BATCH_SIZE) { - int previousOutputCount = outputRecords; + while (outputIndex < LateralJoinBatch.MAX_BATCH_SIZE) { + final int previousOutputCount = outputIndex; // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex - outputRecords = lateralJoiner.crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex); + outputIndex = crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex, outputIndex); // We have produced some records in outgoing container, hence there must be a match found for left record - if (outputRecords > previousOutputCount) { + if (outputIndex > previousOutputCount) { + // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right + // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use + // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using + // first right batch matchedRecordFound = true; } - if (right.getRecordCount() == 0) { - rightJoinIndex = -1; - } else { - // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the - // output records for this record batch until it's fully consumed. - // - // Also it can be so that one output batch can contain records from 2 different right batch hence the - // rightJoinIndex should move by number of records in output batch for current right batch only. - rightJoinIndex += outputRecords - previousOutputCount; - } - + // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the + // output records for this record batch until it's fully consumed. + // + // Also it can be so that one output batch can contain records from 2 different right batch hence the + // rightJoinIndex should move by number of records in output batch for current right batch only. + rightJoinIndex += outputIndex - previousOutputCount; final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount(); // Check if above join to produce output was based on empty right batch or @@ -408,9 +359,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Otherwise it means for the given right batch there is still some record left to be processed. if (isRightProcessed) { if (rightUpstream == EMIT) { - if (!matchedRecordFound) { - // will only produce left side in case of LEFT join - outputRecords = lateralJoiner.generateLeftJoinOutput(leftJoinIndex); + if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) { + // copy left side in case of LEFT join + emitLeft(leftJoinIndex, outputIndex++); } ++leftJoinIndex; // Reset matchedRecord for next left index record @@ -430,7 +381,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } // Check if output batch still has some space - if (outputRecords < MAX_BATCH_SIZE) { + if (outputIndex < MAX_BATCH_SIZE) { // Check if left side still has records or not if (isLeftProcessed) { // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer @@ -467,9 +418,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // batch. Now we have got new left batch with OK outcome. Let's get next right batch // // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome - rightUpstream = processRightBatch(); - Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch"); if (isTerminalOutcome(rightUpstream)) { @@ -501,19 +450,15 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> * Finalizes the current output container with the records produced so far before sending it downstream */ private void finalizeOutputContainer() { - - VectorAccessibleUtilities.setValueCount(container, outputRecords); + VectorAccessibleUtilities.setValueCount(container, outputIndex); // Set the record count in the container - container.setRecordCount(outputRecords); + container.setRecordCount(outputIndex); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + logger.debug("Number of records emitted: " + outputIndex); - logger.debug("Number of records emitted: " + outputRecords); - - // We are about to send the output batch so reset the outputRecords for future next call - outputRecords = 0; // Update the output index for next output batch to zero - lateralJoiner.updateOutputIndex(0); + outputIndex = 0; } /** @@ -595,96 +540,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } // Let's build schema for the container - container.setRecordCount(0); + outputIndex = 0; + container.setRecordCount(outputIndex); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); } /** - * Method generates the runtime code needed for LateralJoin. Other than the setup method to set the input and output - * value vector references we implement two more methods - * 1. emitLeft() -> Copy record from the left side to output container - * 2. emitRight() -> Copy record from the right side to output container - * @return the runtime generated class that implements the LateralJoin interface - */ - private LateralJoin setupWorker() throws SchemaChangeException { - final CodeGenerator<LateralJoin> lateralCG = CodeGenerator.get( - LateralJoin.TEMPLATE_DEFINITION, context.getOptions()); - lateralCG.plainJavaCapable(true); - - // To enabled code gen dump for lateral use the setting ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL - lateralCG.saveCodeForDebugging(enableLateralCGDebugging); - final ClassGenerator<LateralJoin> nLJClassGenerator = lateralCG.getRoot(); - - // generate emitLeft - nLJClassGenerator.setMappingSet(emitLeftMapping); - JExpression outIndex = JExpr.direct("outIndex"); - JExpression leftIndex = JExpr.direct("leftIndex"); - - int fieldId = 0; - int outputFieldId = 0; - if (leftSchema != null) { - // Set the input and output value vector references corresponding to the left batch - for (MaterializedField field : leftSchema) { - final TypeProtos.MajorType fieldType = field.getType(); - - // Add the vector to the output container - container.addOrGet(field); - - JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch", - new TypedFieldId(fieldType, false, fieldId)); - JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(fieldType, false, outputFieldId)); - - nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV)); - nLJClassGenerator.rotateBlock(); - fieldId++; - outputFieldId++; - } - } - - // generate emitRight - fieldId = 0; - nLJClassGenerator.setMappingSet(emitRightMapping); - JExpression rightIndex = JExpr.direct("rightIndex"); - - if (rightSchema != null) { - // Set the input and output value vector references corresponding to the right batch - for (MaterializedField field : rightSchema) { - - final TypeProtos.MajorType inputType = field.getType(); - TypeProtos.MajorType outputType; - // if join type is LEFT, make sure right batch output fields data mode is optional - if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) { - outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL); - } else { - outputType = inputType; - } - - MaterializedField newField = MaterializedField.create(field.getName(), outputType); - container.addOrGet(newField); - - JVar inVV = nLJClassGenerator.declareVectorValueSetupAndMember("rightBatch", - new TypedFieldId(inputType, false, fieldId)); - JVar outVV = nLJClassGenerator.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(outputType, false, outputFieldId)); - nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe") - .arg(rightIndex) - .arg(outIndex) - .arg(inVV)); - nLJClassGenerator.rotateBlock(); - fieldId++; - outputFieldId++; - } - } - - try { - return context.getImplementationClass(lateralCG); - } catch (IOException | ClassTransformationException ex) { - throw new SchemaChangeException("Failed while setting up generated class with new schema information", ex); - } - } - - /** * Simple method to allocate space for all the vectors in the container. */ private void allocateVectors() { @@ -715,8 +576,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> * failure outcome then we don't even call next on right branch, since there is no left incoming. * @return true if both the left/right batch was received without failure outcome. * false if either of batch is received with failure outcome. - * - * @throws SchemaChangeException */ @Override protected boolean prefetchFirstBatchFromBothSides() { @@ -760,10 +619,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Release the vectors received from right side VectorAccessibleUtilities.clear(right); - // We should not allocate memory for all the value vectors inside output batch - // since this is buildSchema phase and we are sending empty batches downstream - lateralJoiner = setupWorker(); - // Set join index as invalid (-1) if the left side is empty, else set it to 0 leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; rightJoinIndex = -1; @@ -825,7 +680,95 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> /** * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch */ + @Override public IterOutcome getLeftOutcome() { return leftUpstream; } + + /** + * Main entry point for producing the output records. This method populates the output batch after cross join of + * the record in a given left batch at left index and all the corresponding right batches produced for + * this left index. The right container is copied starting from rightIndex until number of records in the container. + * + * @param leftIndex - row index in left incoming batch + * @param rightIndex - row index in right incoming batch + * @param outIndex - row index in output batch + * + * @return - final row index of output batch + */ + private int crossJoinAndOutputRecords(final int leftIndex, final int rightIndex, final int outIndex) { + final int rightRecordCount = right.getRecordCount(); + int outBatchIndex = outIndex; + + // If there is no record in right batch just return current index in output batch + if (rightRecordCount <= 0) { + return outBatchIndex; + } + + // Check if right batch is empty since we have to handle left join case + Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 but index is -1"); + // For every record in right side just emit left and right records in output container + for (int i = rightIndex; i < rightRecordCount; ++i) { + emitLeft(leftIndex, outBatchIndex); + emitRight(i, outBatchIndex); + ++outBatchIndex; + + if (outBatchIndex >= LateralJoinBatch.MAX_BATCH_SIZE) { + break; + } + } + return outBatchIndex; + } + + /** + * Given a record batch, copies data from all it's vectors at fromRowIndex to all the vectors in output batch at + * toRowIndex. It iterates over all the vectors from startVectorIndex to endVectorIndex inside the record batch to + * copy the data and copies it inside vectors from startVectorIndex + baseVectorIndex to endVectorIndex + + * baseVectorIndex. + * @param fromRowIndex - row index of all the vectors in batch to copy data from + * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to + * @param batch - source record batch holding vectors with data + * @param startVectorIndex - start index of vector inside source record batch + * @param endVectorIndex - end index of vector inside source record batch + * @param baseVectorIndex - base index to be added to startVectorIndex to get corresponding vector in outgoing batch + */ + private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBatch batch, + int startVectorIndex, int endVectorIndex, int baseVectorIndex) { + // Get the vectors using field index rather than Materialized field since input batch field can be different from + // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output + // container. + for (int i = startVectorIndex; i < endVectorIndex; ++i) { + // Get input vector + final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass(); + final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector(); + + // Get output vector + final int outputVectorIndex = i + baseVectorIndex; + final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass(); + final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector(); + + // Copy data from input vector to output vector + outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex); + } + } + + /** + * Copies data at leftIndex from each of vector's in left incoming batch to outIndex at corresponding vectors in + * outgoing record batch + * @param leftIndex - index to copy data from left incoming batch vectors + * @param outIndex - index to copy data to in outgoing batch vectors + */ + private void emitLeft(int leftIndex, int outIndex) { + copyDataToOutputVectors(leftIndex, outIndex, left, 0, leftSchema.getFieldCount(), 0); + } + + /** + * Copies data at rightIndex from each of vector's in right incoming batch to outIndex at corresponding vectors in + * outgoing record batch + * @param rightIndex - index to copy data from right incoming batch vectors + * @param outIndex - index to copy data to in outgoing batch vectors + */ + private void emitRight(int rightIndex, int outIndex) { + copyDataToOutputVectors(rightIndex, outIndex, right, 0, rightSchema.getFieldCount(), leftSchema.getFieldCount()); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java deleted file mode 100644 index d7b4f1d..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.join; - -import com.google.common.base.Preconditions; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; - -import javax.inject.Named; - -/* - * Template class that combined with the runtime generated source implements the LateralJoin interface. This - * class contains the main lateral join logic. - */ -public abstract class LateralJoinTemplate implements LateralJoin { - - // Current right input batch being processed - private RecordBatch right = null; - - // Index in outgoing container where new record will be inserted - private int outputIndex = 0; - - // Keep the join type at setup phase - private JoinRelType joinType; - - /** - * Method initializes necessary state and invokes the doSetup() to set the input and output value vector references. - * - * @param context Fragment context - * @param left Current left input batch being processed - * @param right Current right input batch being processed - * @param outgoing Output batch - */ - public void setupLateralJoin(FragmentContext context, - RecordBatch left, RecordBatch right, - LateralJoinBatch outgoing, JoinRelType joinType) { - this.right = right; - this.joinType = joinType; - doSetup(context, this.right, left, outgoing); - } - - /** - * Main entry point for producing the output records. This method populates the output batch after cross join of - * the record in a given left batch at left index and all the corresponding right batches produced for - * this left index. The right container is copied starting from rightIndex until number of records in the container. - * - * @return the number of records produced in the output batch - */ - public int crossJoinAndOutputRecords(int leftIndex, int rightIndex) { - - final int rightRecordCount = right.getRecordCount(); - int currentOutputIndex = outputIndex; - - // If there is no record in right batch just return current index in output batch - if (rightRecordCount <= 0) { - return currentOutputIndex; - } - - // Check if right batch is empty since we have to handle left join case - Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 but index is -1"); - // For every record in right side just emit left and right records in output container - for (; rightIndex < rightRecordCount; ++rightIndex) { - emitLeft(leftIndex, currentOutputIndex); - emitRight(rightIndex, currentOutputIndex); - ++currentOutputIndex; - - if (currentOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) { - break; - } - } - - updateOutputIndex(currentOutputIndex); - return currentOutputIndex; - } - - /** - * If current output batch is full then reset the output index for next output batch - * Otherwise it means we still have space left in output batch, so next call will continue populating from - * newOutputIndex - * @param newOutputIndex - new output index of outgoing batch after copying the records - */ - public void updateOutputIndex(int newOutputIndex) { - outputIndex = (newOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) ? - 0 : newOutputIndex; - } - - /** - * Method to copy just the left batch record at given leftIndex, the right side records will be NULL. This is - * used in case when Join Type is LEFT and we have only seen empty batches from right side - * @param leftIndex - index in left batch to copy record from - */ - public int generateLeftJoinOutput(int leftIndex) { - int currentOutputIndex = outputIndex; - - if (JoinRelType.LEFT == joinType) { - emitLeft(leftIndex, currentOutputIndex++); - updateOutputIndex(currentOutputIndex); - } - return currentOutputIndex; - } - - /** - * Generated method to setup vector references in rightBatch, leftBatch and outgoing batch. It should be called - * after initial schema build phase, when the schema for outgoing container is known. This method should also be - * called after each New Schema discovery during execution. - * @param context - * @param rightBatch - right incoming batch - * @param leftBatch - left incoming batch - * @param outgoing - output batch - */ - public abstract void doSetup(@Named("context") FragmentContext context, - @Named("rightBatch") RecordBatch rightBatch, - @Named("leftBatch") RecordBatch leftBatch, - @Named("outgoing") RecordBatch outgoing); - - /** - * Generated method to copy the record from right batch at rightIndex to outgoing batch at outIndex - * @param rightIndex - index to copy record from the right batch - * @param outIndex - index to copy record to a outgoing batch - */ - public abstract void emitRight(@Named("rightIndex") int rightIndex, - @Named("outIndex") int outIndex); - - /** - * Generated method to copy the record from left batch at leftIndex to outgoing batch at outIndex - * @param leftIndex - index to copy record from the left batch - * @param outIndex - index to copy record to a outgoing batch - */ - public abstract void emitLeft(@Named("leftIndex") int leftIndex, - @Named("outIndex") int outIndex); -} http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java index 8e4d1d3..b237ef7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java @@ -1504,6 +1504,73 @@ public class TestLateralJoinCorrectness extends SubOperatorTest { } /** + * Test to see if there are multiple rows in left batch and for some rows right side produces multiple batches such + * that some are with records and some are empty then we are not duplicating those left side records based on left + * join type. In this case all the output will be produces only in 1 record batch. + * + * @throws Exception + */ + @Test + public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception { + // Get the left container with dummy data for Lateral Join + final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema) + .addRow(1, 10, "item10") + .addRow(2, 20, "item20") + .build(); + + final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema) + .addRow(6, 60, "item61") + .addRow(7, 70, "item71") + .addRow(8, 80, "item81") + .build(); + + leftContainer.add(leftRowSet2.container()); + + // Get the left IterOutcomes for Lateral Join + leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Create Left MockRecordBatch + final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + leftContainer, leftOutcomes, leftContainer.get(0).getSchema()); + + // Get the right container with dummy data + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet.container()); + rightContainer.add(nonEmptyRightRowSet2.container()); + rightContainer.add(emptyRightRowSet.container()); + rightContainer.add(emptyRightRowSet.container()); + + rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + rightOutcomes.add(RecordBatch.IterOutcome.OK); + rightOutcomes.add(RecordBatch.IterOutcome.OK); + rightOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext, + rightContainer, rightOutcomes, rightContainer.get(0).getSchema()); + + final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT); + + final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(), + leftMockBatch, rightMockBatch); + + try { + final int expectedOutputRecordCount = 6; // 3 for first left row and 1 for second left row + assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next()); + assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next()); + assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount); + assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next()); + } catch (AssertionError | Exception error) { + fail(); + } finally { + // Close all the resources for this test case + ljBatch.close(); + leftMockBatch.close(); + rightMockBatch.close(); + } + } + + /** * Test to see if there are multiple rows in left batch and for some rows right side produces batch with records * and for other rows right side produces empty batches then based on left join type we are populating the output * batch correctly. Expectation is that for left rows if we find corresponding right rows then we will output both