parthchandra commented on a change in pull request #1401: DRILL-6616: Batch
Processing for Lateral/Unnest
URL: https://github.com/apache/drill/pull/1401#discussion_r205897581
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
##########
@@ -83,55 +109,80 @@ public void setOutputCount(int outputCount) {
outputLimit = outputCount;
}
+ @Override
+ public void setRowIdVector(IntVector v) {
+ this.rowIdVector = v;
+ this.rowIdVectorMutator = rowIdVector.getMutator();
+ }
+
@Override
public final int unnestRecords(final int recordCount) {
Preconditions.checkArgument(svMode == NONE, "Unnest does not support
selection vector inputs.");
- if (innerValueIndex == -1) {
- innerValueIndex = 0;
- }
-
- // Current record being processed in the incoming record batch. We could
keep
- // track of it ourselves, but it is better to check with the Lateral Join
and get the
- // current record being processed thru the Lateral Join Contract.
- final int currentRecord = lateral.getRecordIndex();
- final int innerValueCount = accessor.getInnerValueCountAt(currentRecord);
- final int count = Math.min(Math.min(innerValueCount, outputLimit),
recordCount);
- logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record
count: {}, output limit: {}", innerValueCount,
- recordCount, outputLimit);
+ final int initialInnerValueIndex = runningInnerValueIndex;
+
+ outer:
+ {
+ int outputIndex = 0; // index in the output vector that we are writing to
+ final int valueCount = accessor.getValueCount();
+
+ for (; valueIndex < valueCount; valueIndex++) {
+ final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
+ logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record
count: {}, output limit: {}",
+ innerValueCount, recordCount, outputLimit);
+
+ for (; innerValueIndex < innerValueCount; innerValueIndex++) {
+ // If we've hit the batch size limit, stop and flush what we've got
so far.
+ if (outputIndex == outputLimit) {
+ // Flush this batch.
+ break outer;
+ }
+ try {
+ // rowId starts at 1, so the value for rowId is valueIndex+1
+ rowIdVectorMutator.setSafe(outputIndex, valueIndex + 1);
+
+ } finally {
+ outputIndex++;
+ //currentInnerValueIndexLocal++;
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services