This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 4168e1e84d57b15d7667f7a768a0a47a577d0e79 Author: Sorabh Hamirwasia <[email protected]> AuthorDate: Mon Jul 9 17:58:08 2018 -0700 DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data closes #1374 --- .../drill/exec/physical/impl/unnest/Unnest.java | 2 + .../exec/physical/impl/unnest/UnnestImpl.java | 20 +++++++++ .../physical/impl/unnest/UnnestRecordBatch.java | 50 ++++++++++++++------- .../impl/lateraljoin/TestE2EUnnestAndLateral.java | 51 ++++++++++++++++++++++ 4 files changed, 107 insertions(+), 16 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java index 77a2ffa..1a042b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java @@ -64,4 +64,6 @@ public interface Unnest { * time a new batch comes in. */ void resetGroupIndex(); + + void close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java index ffc64f9..1d3b8f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.LateralContract; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.slf4j.Logger; @@ -51,6 +52,7 @@ public class UnnestImpl implements Unnest { private SelectionVectorMode svMode; private RepeatedValueVector fieldToUnnest; private RepeatedValueVector.RepeatedAccessor accessor; + private RecordBatch outgoing; /** * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased @@ -97,8 +99,16 @@ public class UnnestImpl implements Unnest { logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount, recordCount, outputLimit); + final SchemaChangeCallBack callBack = new SchemaChangeCallBack(); for (TransferPair t : transfers) { t.splitAndTransfer(innerValueIndex, count); + + // Get the corresponding ValueVector in output container and transfer the data + final ValueVector vectorWithData = t.getTo(); + final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack); + Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " + + "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass()); + vectorWithData.makeTransferPair(outputVector).transfer(); } innerValueIndex += count; return count; @@ -110,6 +120,7 @@ public class UnnestImpl implements Unnest { List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException { this.svMode = incoming.getSchema().getSelectionVectorMode(); + this.outgoing = outgoing; if (svMode == NONE) { this.transfers = ImmutableList.copyOf(transfers); this.lateral = lateral; @@ -123,4 +134,13 @@ public class UnnestImpl implements Unnest { this.innerValueIndex = 0; } + @Override + public void close() { + if (transfers != null) { + for (TransferPair tp : transfers) { + tp.getTo().close(); + } + transfers = null; + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 9c1e702..d985423 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -24,6 +24,7 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.UnnestPOP; @@ -48,7 +49,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class); - private Unnest unnest; + private Unnest unnest = new UnnestImpl(); private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was // sent. The next iteration, we need to make sure the record batch sizer // is updated before we process the actual data. @@ -234,8 +235,23 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO return IterOutcome.STOP; } return OK_NEW_SCHEMA; - } - // else + } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA + try { + // This means even though there is no schema change for unnest field the reference of unnest field + // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector + // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will + // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will + // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer + // pair. It should do for each new left incoming batch. + resetUnnestTransferPair(); + container.zeroVectors(); + } catch (SchemaChangeException ex) { + kill(false); + logger.error("Failure during query", ex); + context.getExecutorState().fail(ex); + return IterOutcome.STOP; + } + } // else unnest.resetGroupIndex(); memoryManager.update(); } @@ -353,26 +369,27 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO return tp; } - @Override - protected boolean setupNewSchema() throws SchemaChangeException { - Preconditions.checkNotNull(lateral); - container.clear(); - recordCount = 0; + private TransferPair resetUnnestTransferPair() throws SchemaChangeException { final List<TransferPair> transfers = Lists.newArrayList(); - final FieldReference fieldReference = new FieldReference(popConfig.getColumn()); - final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference); - - final ValueVector unnestVector = transferPair.getTo(); transfers.add(transferPair); - container.add(unnestVector); logger.debug("Added transfer for unnest expression."); - container.buildSchema(SelectionVectorMode.NONE); - - this.unnest = new UnnestImpl(); + unnest.close(); unnest.setup(context, incoming, this, transfers, lateral); setUnnestVector(); + return transferPair; + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { + Preconditions.checkNotNull(lateral); + container.clear(); + recordCount = 0; + unnest = new UnnestImpl(); + final TransferPair tp = resetUnnestTransferPair(); + container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator())); + container.buildSchema(SelectionVectorMode.NONE); return true; } @@ -428,6 +445,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO @Override public void close() { updateStats(); + unnest.close(); super.close(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java index 17a9d33..394e732 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java @@ -370,6 +370,37 @@ public class TestE2EUnnestAndLateral extends ClusterTest { } } + /** + * This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with + * multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence + * the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even + * though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since + * vector reference for unnest field has changed for second Unnest. + * Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has + * schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this + * case vector corresponding to unnest field will not be affected and it will work fine. + * @throws Exception + */ + @Test + public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception { + + try { + dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1)); + String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " + + "orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " + + "FROM dfs.`lateraljoin/multipleFiles` customer, " + + "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " + + "FROM UNNEST(customer.c_orders) t1(o)) orders, " + + "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " + + "FROM UNNEST(orders.lineitems) t2(l)) olineitems"; + test(sql); + } catch (Exception ex) { + fail(); + } finally { + dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1)); + } + } + @Test public void testSchemaChangeOnUnnestColumn() throws Exception { try { @@ -387,6 +418,26 @@ public class TestE2EUnnestAndLateral extends ClusterTest { } @Test + public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception { + try { + dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2)); + + String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " + + "orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " + + "FROM dfs.`lateraljoin/multipleFiles` customer, " + + "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," + + " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " + + "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " + + "FROM UNNEST(orders.lineitems) t2(l)) olineitems"; + test(sql); + } catch (Exception ex) { + fail(); + } finally { + dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2)); + } + } + + @Test public void testSchemaChangeOnMultipleColumns() throws Exception { try { dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_3));
