This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 779edf8 DRILL-6503: Performance improvements in lateral
779edf8 is described below
commit 779edf880a1e92608b68108f18e79eff6eb4afa5
Author: Sorabh Hamirwasia <[email protected]>
AuthorDate: Tue Jun 26 23:19:25 2018 -0700
DRILL-6503: Performance improvements in lateral
closes #1328
---
.../drill/exec/physical/config/LateralJoinPOP.java | 4 +
.../exec/physical/impl/join/LateralJoinBatch.java | 162 ++++++++++++++-------
.../impl/join/TestLateralJoinCorrectness.java | 16 +-
.../unnest/TestUnnestWithLateralCorrectness.java | 6 +-
4 files changed, 121 insertions(+), 67 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index fab89a2..a12fed1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -43,6 +43,10 @@ public class LateralJoinPOP extends AbstractJoinPop {
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("joinType") JoinRelType joinType) {
super(left, right, joinType, null, null);
+ Preconditions.checkArgument(joinType != JoinRelType.FULL,
+ "Full outer join is currently not supported with Lateral Join");
+ Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
+ "Right join is currently not supported with Lateral Join");
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index a09913f..578cbc8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -52,11 +52,6 @@ import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
- // Input indexes to correctly update the stats
- private static final int LEFT_INPUT = 0;
-
- private static final int RIGHT_INPUT = 1;
-
// Maximum number records in the outgoing batch
private int maxOutputRowCount;
@@ -81,8 +76,12 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
// Keep track if any matching right record was found for current left index
record
private boolean matchedRecordFound;
+ // Used only for testing
private boolean useMemoryManager = true;
+ // Flag to keep track of new left batch so that update on memory manager is
called only once per left batch
+ private boolean isNewLeftBatch = false;
+
/*
****************************************************************************************************************
* Public Methods
*
****************************************************************************************************************/
@@ -147,9 +146,16 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
// Setup the references of left, right and outgoing container in generated
operator
state = BatchState.NOT_FIRST;
- // Update the memory manager
- updateMemoryManager(LEFT_INPUT);
- updateMemoryManager(RIGHT_INPUT);
+ // Update the memory manager only if its a brand new incoming i.e.
leftJoinIndex and rightJoinIndex is 0
+ // Otherwise there will be a case where while filling last output batch,
some records from previous left or
+ // right batch are still left to be sent in output for which we will count
this batch twice. The actual checks
+ // are done in updateMemoryManager
+ updateMemoryManager(LEFT_INDEX);
+
+ // We have to call update on memory manager for empty batches
(rightJoinIndex = -1) as well since other wise while
+ // allocating memory for vectors below it can fail. Since in that case
colSize will not have any info on right side
+ // vectors and throws NPE. The actual checks are done in
updateMemoryManager
+ updateMemoryManager(RIGHT_INDEX);
// allocate space for the outgoing batch
allocateVectors();
@@ -161,21 +167,25 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
public void close() {
updateBatchMemoryManagerStats();
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg
bytes : {}, avg row bytes : {}, record count : {}",
-
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes
: {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {},
avg bytes : {}, avg row bytes : {}, " +
+ "record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {},
avg bytes : {}, avg row bytes : {}, " +
+ "record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg
bytes : {}, avg row bytes : {}, " +
+ "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
+ batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
+ batchMemoryManager.getTotalOutputRecords());
+ }
super.close();
}
@@ -238,6 +248,7 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
boolean validBatch = setBatchState(leftUpstream);
if (validBatch) {
+ isNewLeftBatch = true;
rightUpstream = next(1, right);
validBatch = setBatchState(rightUpstream);
}
@@ -266,10 +277,6 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
}
Preconditions.checkState(right.getRecordCount() == 0, "Unexpected
non-empty first right batch received");
- // Update the record memory manager
- updateMemoryManager(LEFT_INPUT);
- updateMemoryManager(RIGHT_INPUT);
-
// Setup output container schema based on known left and right schema
setupNewSchema();
@@ -337,7 +344,12 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
// If left batch is empty
while (needLeftBatch) {
- leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) :
leftUpstream;
+
+ if (!processLeftBatchInFuture) {
+ leftUpstream = next(LEFT_INDEX, left);
+ isNewLeftBatch = true;
+ }
+
final boolean emptyLeftBatch = left.getRecordCount() <=0;
logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
@@ -418,7 +430,7 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
// will be a valid index. When all records are consumed it will be set to
-1.
boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
while (needNewRightBatch) {
- rightUpstream = next(RIGHT_INPUT, right);
+ rightUpstream = next(RIGHT_INDEX, right);
switch (rightUpstream) {
case OK_NEW_SCHEMA:
// We should not get OK_NEW_SCHEMA multiple times for the same left
incoming batch. So there won't be a
@@ -503,7 +515,8 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
if (rightUpstream == EMIT) {
if (!matchedRecordFound && JoinRelType.LEFT ==
popConfig.getJoinType()) {
// copy left side in case of LEFT join
- emitLeft(leftJoinIndex, outputIndex++);
+ emitLeft(leftJoinIndex, outputIndex, 1);
+ ++outputIndex;
}
++leftJoinIndex;
// Reset matchedRecord for next left index record
@@ -557,7 +570,7 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Update the batch memory manager to use new left incoming batch
- updateMemoryManager(LEFT_INPUT);
+ updateMemoryManager(LEFT_INDEX);
}
}
@@ -577,7 +590,7 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Update the batch memory manager to use new right incoming batch
- updateMemoryManager(RIGHT_INPUT);
+ updateMemoryManager(RIGHT_INDEX);
}
} // output batch is full to its max capacity
@@ -615,9 +628,11 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
batchMemoryManager.updateOutgoingStats(outputIndex);
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- logger.debug("Number of records emitted: {} and Allocator Stats:
[AllocatedMem: {}, PeakMem: {}]", outputIndex,
- container.getAllocator().getAllocatedMemory(),
container.getAllocator().getPeakMemoryAllocation());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("Number of records emitted: {} and Allocator Stats:
[AllocatedMem: {}, PeakMem: {}]",
+ outputIndex, container.getAllocator().getAllocatedMemory(),
container.getAllocator().getPeakMemoryAllocation());
+ }
// Update the output index for next output batch to zero
outputIndex = 0;
@@ -745,8 +760,6 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
* this left index. The right container is copied starting from rightIndex
until number of records in the container.
*/
private void crossJoinAndOutputRecords() {
- logger.trace("Producing output for leftIndex: {}, rightIndex: {},
rightRecordCount: {} and outputIndex: {}",
- leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex);
final int rightRecordCount = right.getRecordCount();
// If there is no record in right batch just return current index in
output batch
@@ -756,16 +769,30 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
// Check if right batch is empty since we have to handle left join case
Preconditions.checkState(rightJoinIndex != -1, "Right batch record count
is >0 but index is -1");
- // For every record in right side just emit left and right records in
output container
- for (int i = rightJoinIndex; i < rightRecordCount; ++i) {
- emitLeft(leftJoinIndex, outputIndex);
- emitRight(i, outputIndex);
- ++outputIndex;
-
- if (isOutgoingBatchFull()) {
- break;
- }
+
+ int currentOutIndex = outputIndex;
+ // Number of rows that can be copied in output batch
+ final int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
+ // Number of rows that can be copied inside output batch is minimum of
available slot in
+ // output batch and available data to copy from right side. It can be half
consumed right batch
+ // which has few more rows to be copied to output but output batch has
more to fill.
+ final int rowsToCopy = Math.min(maxAvailableRowSlot, (rightRecordCount -
rightJoinIndex));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Producing output for leftIndex: {}, rightIndex: {},
rightRecordCount: {}, outputIndex: {} and " +
+ "availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex,
rightRecordCount, outputIndex, maxAvailableRowSlot);
+ logger.debug("Output Batch stats before copying new data: {}", new
RecordBatchSizer(this));
}
+
+ // First copy all the left vectors data. Doing it in this way since it's
the same data being copied over may be
+ // we will have performance gain from JVM
+ emitLeft(leftJoinIndex, currentOutIndex, rowsToCopy);
+
+ // Copy all the right side vectors data
+ emitRight(rightJoinIndex, currentOutIndex, rowsToCopy);
+
+ // Update outputIndex
+ outputIndex += rowsToCopy;
}
/**
@@ -779,9 +806,14 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
* @param startVectorIndex - start index of vector inside source record batch
* @param endVectorIndex - end index of vector inside source record batch
* @param baseVectorIndex - base index to be added to startVectorIndex to
get corresponding vector in outgoing batch
+ * @param numRowsToCopy - Number of rows to copy into output batch
+ * @param moveFromIndex - boolean to indicate if the fromIndex should also
be increased or not. Since in case of
+ * copying data from left vector fromIndex is constant
whereas in case of copying data from right
+ * vector fromIndex will move along with output index.
*/
private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex,
RecordBatch batch,
- int startVectorIndex, int
endVectorIndex, int baseVectorIndex) {
+ int startVectorIndex, int
endVectorIndex, int baseVectorIndex,
+ int numRowsToCopy, boolean
moveFromIndex) {
// Get the vectors using field index rather than Materialized field since
input batch field can be different from
// output container field in case of Left Join. As we rebuild the right
Schema field to be optional for output
// container.
@@ -796,10 +828,14 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
final ValueVector outputVector =
this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
logger.trace("Copying data from incoming batch vector to outgoing batch
vector. [IncomingBatch: " +
- "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {},
VectorType: {}) and BaseIndex: {}]",
- fromRowIndex, inputValueClass, toRowIndex, outputValueClass,
baseVectorIndex);
- // Copy data from input vector to output vector
- outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
+ "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {},
VectorType: {}) and Other: (TimeEachValue: {}," +
+ " NumBaseIndex: {}) ]",
+ fromRowIndex, inputValueClass, toRowIndex, outputValueClass,
numRowsToCopy, baseVectorIndex);
+
+ // Copy data from input vector to output vector for numRowsToCopy times.
+ for (int j = 0; j < numRowsToCopy; ++j) {
+ outputVector.copyEntry(toRowIndex + j, inputVector, (moveFromIndex) ?
fromRowIndex + j : fromRowIndex);
+ }
}
}
@@ -809,8 +845,9 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
* @param leftIndex - index to copy data from left incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
- private void emitLeft(int leftIndex, int outIndex) {
- copyDataToOutputVectors(leftIndex, outIndex, left, 0,
leftSchema.getFieldCount(), 0);
+ private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
+ copyDataToOutputVectors(leftIndex, outIndex, left, 0,
+ leftSchema.getFieldCount(), 0, numRowsToCopy, false);
}
/**
@@ -819,8 +856,9 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
* @param rightIndex - index to copy data from right incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
- private void emitRight(int rightIndex, int outIndex) {
- copyDataToOutputVectors(rightIndex, outIndex, right, 0,
rightSchema.getFieldCount(), leftSchema.getFieldCount());
+ private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
+ copyDataToOutputVectors(rightIndex, outIndex, right, 0,
+ rightSchema.getFieldCount(), leftSchema.getFieldCount(), numRowsToCopy,
true);
}
/**
@@ -847,12 +885,24 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
}
private void updateMemoryManager(int inputIndex) {
+
+ if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
+ // reset state and continue to update
+ isNewLeftBatch = false;
+ } else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 ||
rightJoinIndex == -1)) {
+ // continue to update
+ } else {
+ return;
+ }
+
// For cases where all the previous input were consumed and send with
previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem
since outputIndex will be 0
final int newOutputRowCount = batchMemoryManager.update(inputIndex,
outputIndex);
if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left"
: "right", batchMemoryManager.getRecordBatchSizer(inputIndex));
+ logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX
? "left" : "right",
+ batchMemoryManager.getRecordBatchSizer(inputIndex));
+ logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}",
maxOutputRowCount, newOutputRowCount);
}
if (useMemoryManager) {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index e9e9aac..79a7bd4 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -107,7 +107,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
.buildSchema();
emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
- ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
}
@AfterClass
@@ -1754,7 +1754,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1,
fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -1863,7 +1863,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1,
fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -1964,7 +1964,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1,
fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2091,7 +2091,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2225,7 +2225,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2369,7 +2369,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2723,7 +2723,7 @@ public class TestLateralJoinCorrectness extends
SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null,
JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new
LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index f281964..03fd1c1 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -69,7 +69,7 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
@BeforeClass public static void setUpBeforeClass() throws Exception {
mockPopConfig = new MockStorePOP(null);
- ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
operatorContext = fixture.newOperatorContext(mockPopConfig);
}
@@ -875,8 +875,8 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
final ProjectRecordBatch projectBatch2 =
new ProjectRecordBatch(projectPopConfig2, unnestBatch2,
fixture.getFragmentContext());
- final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1,
projectPopConfig2, JoinRelType.FULL);
- final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig,
ljPopConfig2, JoinRelType.FULL);
+ final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1,
projectPopConfig2, JoinRelType.INNER);
+ final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig,
ljPopConfig2, JoinRelType.INNER);
final LateralJoinBatch lateralJoinBatch2 =
new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(),
projectBatch1, projectBatch2);