DRILL-303: RecordBatchLoader.load always creates new schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b12c0b15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b12c0b15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b12c0b15 Branch: refs/heads/master Commit: b12c0b155698a3d2ecd5cf3bfdf994fccd65f8d6 Parents: 2c811a8 Author: Steven Phillips <[email protected]> Authored: Sun Dec 1 20:06:05 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Sun Dec 1 20:06:05 2013 -0800 ---------------------------------------------------------------------- .../drill/exec/record/RecordBatchLoader.java | 17 +++--- .../exec/physical/impl/TestUnionExchange.java | 62 ++++++++++++++++++++ .../test/resources/sender/union_exchange.json | 47 +++++++++++++++ 3 files changed, 116 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 016f340..f19184f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -62,10 +62,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp this.valueCount = def.getRecordCount(); boolean schemaChanged = schema == null; - Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap(); + Map<FieldDef, ValueVector> oldFields = Maps.newHashMap(); for(VectorWrapper<?> w : container){ ValueVector v = w.getValueVector(); - oldFields.put(v.getField(), v); + oldFields.put(v.getField().getDef(), v); } VectorContainer newVectors = new VectorContainer(); @@ -76,15 +76,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp for (FieldMetadata fmd : fields) { FieldDef fieldDef = fmd.getDef(); ValueVector v = oldFields.remove(fieldDef); - if(v != null){ - container.add(v); - continue; + if(v == null) { + // if we arrive here, we didn't have a matching vector. + schemaChanged = true; + MaterializedField m = new MaterializedField(fieldDef); + v = TypeHelper.getNewVector(m, allocator); } - - // if we arrive here, we didn't have a matching vector. - schemaChanged = true; - MaterializedField m = new MaterializedField(fieldDef); - v = TypeHelper.getNewVector(m, allocator); if (fmd.getValueCount() == 0){ v.clear(); } else { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java new file mode 100644 index 0000000..2e16b47 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +public class TestUnionExchange extends PopUnitTestBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionExchange.class); + + @Test + public void twoBitTwoExchangeTwoEntryRun() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + bit2.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/sender/union_exchange.json"), + Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(150, count); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b12c0b15/exec/java-exec/src/test/resources/sender/union_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/union_exchange.json b/exec/java-exec/src/test/resources/sender/union_exchange.json new file mode 100644 index 0000000..76963bd --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/union_exchange.json @@ -0,0 +1,47 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"} + ]}, + {records: 200, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 2, + child: 1, + pop: "union-exchange" + }, + { + @id: 3, + child: 2, + pop: "filter", + expr: "alternate()" + }, + { + @id: 4, + child: 3, + pop:"selection-vector-remover" + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file
