parthchandra closed pull request #1362: DRILL-6576: Unnest reports incoming
record counts incorrectly
URL: https://github.com/apache/drill/pull/1362
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 06713a5164..ffc64f9237 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -103,7 +103,7 @@ public final int unnestRecords(final int recordCount) {
innerValueIndex += count;
return count;
- }
+ }
@Override
public final void setup(FragmentContext context, RecordBatch incoming,
RecordBatch outgoing,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e985c4defe..bc01a70477 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -199,6 +199,7 @@ public IterOutcome innerNext() {
schemaChanged(); // checks if schema has changed (redundant in this
case becaause it has) AND saves the
// current field metadata for check in subsequent
iterations
setupNewSchema();
+ stats.batchReceived(0, incoming.getRecordCount(), true);
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
@@ -207,32 +208,30 @@ public IterOutcome innerNext() {
} finally {
stats.stopSetup();
}
- // since we never called next on an upstream operator, incoming stats are
- // not updated. update input stats explicitly.
- stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
container.zeroVectors();
-
// Check if schema has changed
- if (lateral.getRecordIndex() == 0 && schemaChanged()) {
- hasRemainder = true; // next call to next will handle the actual
data.
- try {
- setupNewSchema();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- stats.batchReceived(0, incoming.getRecordCount(), true);
- return OK_NEW_SCHEMA;
- }
if (lateral.getRecordIndex() == 0) {
- unnest.resetGroupIndex();
+ boolean isNewSchema = schemaChanged();
+ if (isNewSchema) {
+ hasRemainder = true; // next call to next will handle the actual
data.
+ stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ return OK_NEW_SCHEMA;
+ } else {
+ unnest.resetGroupIndex();
+ stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+ }
}
- stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}
@@ -243,7 +242,8 @@ public VectorContainer getOutgoingContainer() {
return this.container;
}
- @SuppressWarnings("resource") private void setUnnestVector() {
+ @SuppressWarnings("resource")
+ private void setUnnestVector() {
final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
@@ -347,7 +347,8 @@ protected IterOutcome doWork() {
return tp;
}
- @Override protected boolean setupNewSchema() throws SchemaChangeException {
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services