DRILL-1648: Fix for fast schema issue that was causing compilation issues in downstream operators.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e515e621 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e515e621 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e515e621 Branch: refs/heads/master Commit: e515e6211201ec764d405c9a04b3ce43e4b3259d Parents: 2f6efea Author: Jason Altekruse <altekruseja...@gmail.com> Authored: Tue Nov 4 15:08:27 2014 -0800 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Tue Nov 11 16:48:45 2014 -0800 ---------------------------------------------------------------------- .../impl/flatten/FlattenRecordBatch.java | 60 +++++++++++++++----- .../org/apache/drill/TestExampleQueries.java | 18 ------ .../exec/physical/impl/flatten/TestFlatten.java | 42 ++++++++++++++ 3 files changed, 87 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 5171a25..129174e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -53,12 +53,15 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.RepeatedVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; +// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly +// as they come in public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class); @@ -251,8 +254,19 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { incoming.buildSchema(); if ( ! fastSchemaCalled ) { for (VectorWrapper vw : incoming) { - ValueVector vector = container.addOrGet(vw.getField()); - container.add(vector); + if (vw.getField().getPath().equals(popConfig.getColumn())) { + if (vw.getValueVector() instanceof MapVector) { + // fast schema upstream did not report a repeated type + // assume it will be repeated in the actual results and it will fail in execution if it is not + ValueVector vector = container.addOrGet(vw.getField()); + container.add(vector); + } else { + container.add(getFlattenFieldTransferPair().getTo()); + } + } else { + ValueVector vector = container.addOrGet(vw.getField()); + container.add(vector); + } } fastSchemaCalled = true; container.buildSchema(SelectionVectorMode.NONE); @@ -264,6 +278,33 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } + /** + * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for + * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to + * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the + * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just + * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten + * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or + * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened + * value coming out of the repeated field.) + */ + private TransferPair getFlattenFieldTransferPair() { + ValueVector flattenField = incoming.getValueAccessorById( + incoming.getSchema().getColumn( + incoming.getValueVectorId( + popConfig.getColumn()).getFieldIds()[0]).getValueClass(), + incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector(); + + TransferPair tp; + if (flattenField instanceof RepeatedMapVector) { + tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(); + } else { + ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues(); + tp = vvIn.getTransferPair(); + } + return tp; + } + @Override protected boolean setupNewSchema() throws SchemaChangeException { this.allocationVectors = Lists.newArrayList(); @@ -275,25 +316,14 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry()); IntOpenHashSet transferFieldIds = new IntOpenHashSet(); - RepeatedVector flattenField = ((RepeatedVector) incoming.getValueAccessorById( - incoming.getSchema().getColumn( - incoming.getValueVectorId( - popConfig.getColumn()).getFieldIds()[0]).getValueClass(), - incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector()); - NamedExpression namedExpression = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; TypedFieldId id = vectorRead.getFieldId(); Preconditions.checkNotNull(incoming); - TransferPair tp = null; - if (flattenField instanceof RepeatedMapVector) { - tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(); - } else { - ValueVector vvIn = flattenField.getAccessor().getAllChildValues(); - tp = vvIn.getTransferPair(); - } + TransferPair tp = getFlattenFieldTransferPair(); + transfers.add(tp); container.add(tp.getTo()); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 5b64d15..bb411e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -34,24 +34,6 @@ public class TestExampleQueries extends BaseTestQuery{ test("select recipe, c.inventor.name as name, c.inventor.age as age from cp.`parquet/complex.parquet` c"); } - @Test - public void testFlatten() throws Exception { - test("select flatten(kvgen(f1)) as monkey, x " + - "from cp.`/store/json/test_flatten_mapify.json`"); - - test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " + - "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1"); - - test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`"); - - } - - @Test - @Ignore("Can't be run on classpath since that fs doesn't support glob queries.") - public void testWildcard() throws Exception { - test("select * from dfs.`/tmp/xx/ab*/*.json`"); - } - @Test // see DRILL-553 public void testQueryWithNullValues() throws Exception { test("select count(*) from cp.`customer.json` limit 1"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e515e621/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java new file mode 100644 index 0000000..9514517 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java @@ -0,0 +1,42 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.impl.flatten; + +import org.apache.drill.BaseTestQuery; +import org.junit.Test; + +public class TestFlatten extends BaseTestQuery { + + @Test + public void testKVGenFlatten1() throws Exception { + test("select flatten(kvgen(f1)) as monkey, x " + + "from cp.`/store/json/test_flatten_mapify.json`"); + } + + @Test + public void testTwoFlattens() throws Exception { + test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`"); + } + + @Test + public void testFilterFlattenedRecords() throws Exception { + test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " + + "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1"); + } + +}