[
https://issues.apache.org/jira/browse/DRILL-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498771#comment-16498771
]
ASF GitHub Bot commented on DRILL-6446:
---------------------------------------
sohami commented on a change in pull request #1293: DRILL-6446: Support for
EMIT outcome in TopN
URL: https://github.com/apache/drill/pull/1293#discussion_r192542392
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
##########
@@ -162,56 +173,67 @@ public void buildSchema() throws SchemaChangeException {
return;
case NONE:
state = BatchState.DONE;
+ case EMIT:
+ throw new IllegalStateException("Unexpected EMIT outcome received in
buildSchema phase");
default:
- return;
+ throw new IllegalStateException("Unexpected outcome received in
buildSchema phase");
}
}
@Override
public IterOutcome innerNext() {
recordCount = 0;
if (state == BatchState.DONE) {
- return IterOutcome.NONE;
+ return NONE;
}
- if (schema != null) {
- if (getSelectionVector4().next()) {
+
+ // If both schema and priorityQueue are non-null and priority queue is not
reset, that means we still have data
+ // to be sent downstream for the current record boundary
+ if (schema != null && priorityQueue != null &&
priorityQueue.isInitialized()) {
+ if (sv4.next()) {
recordCount = sv4.getCount();
- return IterOutcome.OK;
+ container.setRecordCount(recordCount);
} else {
recordCount = 0;
- return IterOutcome.NONE;
+ container.setRecordCount(0);
}
+ return getFinalOutcome();
}
try{
outer: while (true) {
Stopwatch watch = Stopwatch.createStarted();
- IterOutcome upstream;
if (first) {
- upstream = IterOutcome.OK_NEW_SCHEMA;
+ laskKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
first = false;
} else {
- upstream = next(incoming);
+ laskKnownOutcome = next(incoming);
}
- if (upstream == IterOutcome.OK && schema == null) {
- upstream = IterOutcome.OK_NEW_SCHEMA;
+ if (laskKnownOutcome == OK && schema == null) {
+ laskKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
container.clear();
}
logger.debug("Took {} us to get next",
watch.elapsed(TimeUnit.MICROSECONDS));
- switch (upstream) {
+ switch (laskKnownOutcome) {
case NONE:
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
case OUT_OF_MEMORY:
case STOP:
- return upstream;
+ return laskKnownOutcome;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes.
Artificial schema changes are ignored.
+ // schema change handling in case when EMIT is also seen is same as
without EMIT. i.e. only if union type
+ // is enabled it will be handled.
+ container.clear();
+ firstBatchForSchema = true;
if (!incoming.getSchema().equals(schema)) {
Review comment:
I am keeping it as is for now. Don't want to change any behavior related to
schema change handling.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Support for EMIT outcome in TopN
> --------------------------------
>
> Key: DRILL-6446
> URL: https://issues.apache.org/jira/browse/DRILL-6446
> Project: Apache Drill
> Issue Type: Task
> Components: Execution - Relational Operators
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Fix For: 1.14.0
>
>
> With Lateral and Unnest if TopN is present in the sub-query, then it needs to
> handle the EMIT outcome correctly. This means when a EMIT is received then
> perform the TopN operation on the records buffered so far and produce output
> with it. After EMIT TopN should refresh it's state and again work on next
> batches of incoming record unless an EMIT is seen again.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)