[
https://issues.apache.org/jira/browse/DRILL-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16560345#comment-16560345
]
ASF GitHub Bot commented on DRILL-6616:
---------------------------------------
parthchandra commented on a change in pull request #1401: DRILL-6616: Batch
Processing for Lateral/Unnest
URL: https://github.com/apache/drill/pull/1401#discussion_r205897581
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
##########
@@ -83,55 +109,80 @@ public void setOutputCount(int outputCount) {
outputLimit = outputCount;
}
+ @Override
+ public void setRowIdVector(IntVector v) {
+ this.rowIdVector = v;
+ this.rowIdVectorMutator = rowIdVector.getMutator();
+ }
+
@Override
public final int unnestRecords(final int recordCount) {
Preconditions.checkArgument(svMode == NONE, "Unnest does not support
selection vector inputs.");
- if (innerValueIndex == -1) {
- innerValueIndex = 0;
- }
-
- // Current record being processed in the incoming record batch. We could
keep
- // track of it ourselves, but it is better to check with the Lateral Join
and get the
- // current record being processed thru the Lateral Join Contract.
- final int currentRecord = lateral.getRecordIndex();
- final int innerValueCount = accessor.getInnerValueCountAt(currentRecord);
- final int count = Math.min(Math.min(innerValueCount, outputLimit),
recordCount);
- logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record
count: {}, output limit: {}", innerValueCount,
- recordCount, outputLimit);
+ final int initialInnerValueIndex = runningInnerValueIndex;
+
+ outer:
+ {
+ int outputIndex = 0; // index in the output vector that we are writing to
+ final int valueCount = accessor.getValueCount();
+
+ for (; valueIndex < valueCount; valueIndex++) {
+ final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
+ logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record
count: {}, output limit: {}",
+ innerValueCount, recordCount, outputLimit);
+
+ for (; innerValueIndex < innerValueCount; innerValueIndex++) {
+ // If we've hit the batch size limit, stop and flush what we've got
so far.
+ if (outputIndex == outputLimit) {
+ // Flush this batch.
+ break outer;
+ }
+ try {
+ // rowId starts at 1, so the value for rowId is valueIndex+1
+ rowIdVectorMutator.setSafe(outputIndex, valueIndex + 1);
+
+ } finally {
+ outputIndex++;
+ //currentInnerValueIndexLocal++;
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Batch Processing for Lateral/Unnest
> -----------------------------------
>
> Key: DRILL-6616
> URL: https://issues.apache.org/jira/browse/DRILL-6616
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Relational Operators
> Affects Versions: 1.14.0
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Fix For: 1.15.0
>
>
> Implement the execution and planner side changes for the batch processing
> done by lateral and unnest. Based on the prototype we found performance to be
> much better as compared to initial row-by-row execution.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)