This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit cad9aad12ff18a9315b8cce971e27c1b32c48079 Author: Parth Chandra <[email protected]> AuthorDate: Fri Jul 6 16:23:51 2018 -0700 DRILL-6592: Unnest record batch size is called too frequently closes #1376 --- .../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 3ef547c..9c1e702 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -49,6 +49,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class); private Unnest unnest; + private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was + // sent. The next iteration, we need to make sure the record batch sizer + // is updated before we process the actual data. private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want // to keep processing it. Kill may be called by a limit in a subquery that // requires us to stop processing thecurrent row, but not stop processing @@ -180,6 +183,12 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO return nextState; } + if (hasNewSchema) { + memoryManager.update(); + hasNewSchema = false; + return doWork(); + } + if (hasRemainder) { return doWork(); } @@ -194,7 +203,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO state = BatchState.NOT_FIRST; try { stats.startSetup(); - hasRemainder = true; // next call to next will handle the actual data. + hasNewSchema = true; // next call to next will handle the actual data. logger.debug("First batch received"); schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the // current field metadata for check in subsequent iterations @@ -213,10 +222,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO container.zeroVectors(); // Check if schema has changed if (lateral.getRecordIndex() == 0) { - boolean isNewSchema = schemaChanged(); - stats.batchReceived(0, incoming.getRecordCount(), isNewSchema); - if (isNewSchema) { - hasRemainder = true; // next call to next will handle the actual data. + hasNewSchema = schemaChanged(); + stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema); + if (hasNewSchema) { try { setupNewSchema(); } catch (SchemaChangeException ex) { @@ -229,6 +237,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO } // else unnest.resetGroupIndex(); + memoryManager.update(); } return doWork(); } @@ -265,7 +274,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO protected IterOutcome doWork() { Preconditions.checkNotNull(lateral); - memoryManager.update(); unnest.setOutputCount(memoryManager.getOutputRowCount()); final int incomingRecordCount = incoming.getRecordCount(); final int currentRecord = lateral.getRecordIndex();
