sohami closed pull request #1376: DRILL-6592: Unnest record batch size is
called too frequently
URL: https://github.com/apache/drill/pull/1376
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 3ef547c5f46..9c1e702e9ba 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -49,6 +49,9 @@
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
private Unnest unnest;
+ private boolean hasNewSchema = false; // set to true if a new schema was
encountered and an empty batch was
+ // sent. The next iteration, we need
to make sure the record batch sizer
+ // is updated before we process the
actual data.
private boolean hasRemainder = false; // set to true if there is data left
over for the current row AND if we want
// to keep processing it. Kill may be
called by a limit in a subquery that
// requires us to stop processing
thecurrent row, but not stop processing
@@ -180,6 +183,12 @@ public IterOutcome innerNext() {
return nextState;
}
+ if (hasNewSchema) {
+ memoryManager.update();
+ hasNewSchema = false;
+ return doWork();
+ }
+
if (hasRemainder) {
return doWork();
}
@@ -194,7 +203,7 @@ public IterOutcome innerNext() {
state = BatchState.NOT_FIRST;
try {
stats.startSetup();
- hasRemainder = true; // next call to next will handle the actual data.
+ hasNewSchema = true; // next call to next will handle the actual data.
logger.debug("First batch received");
schemaChanged(); // checks if schema has changed (redundant in this
case becaause it has) AND saves the
// current field metadata for check in subsequent
iterations
@@ -213,10 +222,9 @@ public IterOutcome innerNext() {
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
- boolean isNewSchema = schemaChanged();
- stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
- if (isNewSchema) {
- hasRemainder = true; // next call to next will handle the actual
data.
+ hasNewSchema = schemaChanged();
+ stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+ if (hasNewSchema) {
try {
setupNewSchema();
} catch (SchemaChangeException ex) {
@@ -229,6 +237,7 @@ public IterOutcome innerNext() {
}
// else
unnest.resetGroupIndex();
+ memoryManager.update();
}
return doWork();
}
@@ -265,7 +274,6 @@ private void setUnnestVector() {
protected IterOutcome doWork() {
Preconditions.checkNotNull(lateral);
- memoryManager.update();
unnest.setOutputCount(memoryManager.getOutputRowCount());
final int incomingRecordCount = incoming.getRecordCount();
final int currentRecord = lateral.getRecordIndex();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services