sohami closed pull request #1434: DRILL-6694: NPE in UnnestRecordBatch when
query uses a column name no…
URL: https://github.com/apache/drill/pull/1434
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e1b8acb42de..a00fae67bd5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -62,6 +62,8 @@
private int remainderIndex = 0;
private int recordCount;
private MaterializedField unnestFieldMetadata;
+ // Reference of TypedFieldId for Unnest column. It's always set in
schemaChanged method and later used by others
+ private TypedFieldId unnestTypedFieldId;
private final UnnestMemoryManager memoryManager;
public enum Metric implements MetricDef {
@@ -95,12 +97,8 @@ public void update() {
// Get sizing information for the batch.
setRecordBatchSizer(new RecordBatchSizer(incoming));
- final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
- final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
-
// Get column size of unnest column.
-
- RecordBatchSizer.ColumnSize columnSize =
getRecordBatchSizer().getColumn(field.getName());
+ RecordBatchSizer.ColumnSize columnSize =
getRecordBatchSizer().getColumn(unnestFieldMetadata.getName());
final int rowIdColumnSize =
TypeHelper.getSize(rowIdVector.getField().getType());
@@ -213,22 +211,15 @@ public IterOutcome innerNext() {
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
- boolean hasNewSchema = schemaChanged();
- stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
- if (hasNewSchema) {
- try {
+ try {
+ boolean hasNewSchema = schemaChanged();
+ stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+ if (hasNewSchema) {
setupNewSchema();
hasRemainder = true;
memoryManager.update();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- return OK_NEW_SCHEMA;
- } else { // Unnest field schema didn't changed but new left
empty/nonempty batch might come with OK_NEW_SCHEMA
- try {
+ return OK_NEW_SCHEMA;
+ } else { // Unnest field schema didn't changed but new left
empty/nonempty batch might come with OK_NEW_SCHEMA
// This means even though there is no schema change for unnest
field the reference of unnest field
// ValueVector must have changed hence we should just refresh the
transfer pairs and keep output vector
// same as before. In case when new left batch is received with
SchemaChange but was empty Lateral will
@@ -237,19 +228,18 @@ public IterOutcome innerNext() {
// pair. It should do for each new left incoming batch.
resetUnnestTransferPair();
container.zeroVectors();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- } // else
- unnest.resetGroupIndex();
- memoryManager.update();
+ } // else
+ unnest.resetGroupIndex();
+ memoryManager.update();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
}
return doWork();
}
-
}
@Override
@@ -259,11 +249,10 @@ public VectorContainer getOutgoingContainer() {
@SuppressWarnings("resource")
private void setUnnestVector() {
- final TypedFieldId typedFieldId =
incoming.getValueVectorId(popConfig.getColumn());
- final MaterializedField field =
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+ final MaterializedField field =
incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
final ValueVector inVV =
- incoming.getValueAccessorById(field.getValueClass(),
typedFieldId.getFieldIds()).getValueVector();
+ incoming.getValueAccessorById(field.getValueClass(),
unnestTypedFieldId.getFieldIds()).getValueVector();
if (!(inVV instanceof RepeatedValueVector)) {
if (incoming.getRecordCount() != 0) {
@@ -333,10 +322,11 @@ protected IterOutcome doWork() {
* the end of one of the other vectors while we are copying the data of the
other vectors alongside each new unnested
* value coming out of the repeated field.)
*/
- @SuppressWarnings("resource") private TransferPair
getUnnestFieldTransferPair(FieldReference reference) {
- final TypedFieldId fieldId =
incoming.getValueVectorId(popConfig.getColumn());
- final Class<?> vectorClass =
incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
- final ValueVector unnestField = incoming.getValueAccessorById(vectorClass,
fieldId.getFieldIds()).getValueVector();
+ @SuppressWarnings("resource")
+ private TransferPair getUnnestFieldTransferPair(FieldReference reference) {
+ final int[] typeFieldIds = unnestTypedFieldId.getFieldIds();
+ final Class<?> vectorClass =
incoming.getSchema().getColumn(typeFieldIds[0]).getValueClass();
+ final ValueVector unnestField = incoming.getValueAccessorById(vectorClass,
typeFieldIds).getValueVector();
TransferPair tp = null;
if (unnestField instanceof RepeatedMapVector) {
@@ -398,9 +388,9 @@ protected boolean setupNewSchema() throws
SchemaChangeException {
*
* @return true if the schema has changed, false otherwise
*/
- private boolean schemaChanged() {
- final TypedFieldId fieldId =
incoming.getValueVectorId(popConfig.getColumn());
- final MaterializedField thisField =
incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
+ private boolean schemaChanged() throws SchemaChangeException {
+ unnestTypedFieldId = checkAndGetUnnestFieldId();
+ final MaterializedField thisField =
incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
final MaterializedField prevField = unnestFieldMetadata;
Preconditions.checkNotNull(thisField);
@@ -440,6 +430,17 @@ private void updateStats() {
}
+ private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException
{
+ final TypedFieldId fieldId =
incoming.getValueVectorId(popConfig.getColumn());
+ if (fieldId == null) {
+ throw new SchemaChangeException(String.format("Unnest column %s not
found inside the incoming record batch. " +
+ "This may happen if a wrong Unnest column name is used in the query.
Please rerun query after fixing that.",
+ popConfig.getColumn()));
+ }
+
+ return fieldId;
+ }
+
@Override
public void close() {
updateStats();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services