DRILL-5822: The query with "SELECT *" with "ORDER BY" clause and `planner.slice_target`=1 doesn't preserve column order
- The commit for DRILL-847 is oudated. There is no need to canonicalize the batch or container since RecordBatchLoader swallows the "schema change" for now if two batches have different column ordering. closes #1017 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c1118a3d Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c1118a3d Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c1118a3d Branch: refs/heads/master Commit: c1118a3d9a74cf24f28bc69efca2c21d2a6d5b1d Parents: 17ca618 Author: Vitalii Diravka <vitalii.dira...@gmail.com> Authored: Thu Oct 26 18:07:33 2017 +0000 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Nov 13 11:05:29 2017 +0200 ---------------------------------------------------------------------- .../exec/physical/impl/TopN/TopNBatch.java | 11 +++------ .../impl/mergereceiver/MergingRecordBatch.java | 7 +----- .../physical/impl/sort/RecordBatchData.java | 8 +------ .../impl/sort/SortRecordBatchBuilder.java | 6 ----- .../drill/exec/record/RecordBatchLoader.java | 18 -------------- .../drill/exec/record/VectorContainer.java | 25 -------------------- .../java/org/apache/drill/TestStarQueries.java | 17 +++++++++++++ 7 files changed, 22 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 950e1fe..dcf67d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -238,7 +238,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } boolean success = false; try { - batch.canonicalize(); if (priorityQueue == null) { assert !schemaChanged; priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); @@ -323,7 +322,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); VectorContainer newQueue = new VectorContainer(); - builder.canonicalize(); builder.build(context, newQueue); priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent()); builder.getSv4().clear(); @@ -414,16 +412,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); final VectorContainer oldSchemaContainer = new VectorContainer(oContext); - builder.canonicalize(); builder.build(context, oldSchemaContainer); oldSchemaContainer.setRecordCount(builder.getSv4().getCount()); final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext); - // Canonicalize new container since we canonicalize incoming batches before adding to queue. - final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer); - canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); + newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); priorityQueue.cleanup(); - priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); - priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent()); + priorityQueue = createNewPriorityQueue(context, config.getOrderings(), newSchemaContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent()); } finally { builder.clear(); builder.close(); http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index eff1ae9..ec945d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -325,12 +325,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // after this point all batches have been released and their bytebuf are in batchLoaders - // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath. - for (final RecordBatchLoader loader : batchLoaders) { - loader.canonicalize(); - } - // Ensure all the incoming batches have the identical schema. + // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches. if (!isSameSchemaAmongBatches(batchLoaders)) { context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); return IterOutcome.STOP; @@ -581,7 +577,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } catch (final IOException e) { throw new DrillRuntimeException(e); } - outgoingContainer = VectorContainer.canonicalize(outgoingContainer); outgoingContainer.buildSchema(SelectionVectorMode.NONE); } http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 0cd55eb..6de4df6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -65,12 +65,6 @@ public class RecordBatchData { container.buildSchema(batch.getSchema().getSelectionVectorMode()); } - public void canonicalize() { - SelectionVectorMode mode = container.getSchema().getSelectionVectorMode(); - container = VectorContainer.canonicalize(container); - container.buildSchema(mode); - } - public int getRecordCount() { return recordCount; } http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 999fb04..6b3de25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -132,12 +132,6 @@ public class SortRecordBatchBuilder implements AutoCloseable { recordCount += rbd.getRecordCount(); } - public void canonicalize() { - for (RecordBatchData batch : batches.values()) { - batch.canonicalize(); - } - } - public boolean isEmpty() { return batches.isEmpty(); } http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 20b5cb5..3e6bf64 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -270,22 +270,4 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp resetRecordCount(); } - /** - * Sorts vectors into canonical order (by field name). Updates schema and - * internal vector container. - */ - public void canonicalize() { - //logger.debug( "RecordBatchLoader : before schema " + schema); - container = VectorContainer.canonicalize(container); - - // rebuild the schema. - SchemaBuilder b = BatchSchema.newBuilder(); - for(final VectorWrapper<?> v : container){ - b.addField(v.getField()); - } - b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); - this.schema = b.build(); - - //logger.debug( "RecordBatchLoader : after schema " + schema); - } } http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index abcb846..9564f11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -201,31 +201,6 @@ public class VectorContainer implements VectorAccessible { return vc; } - /** - * Sorts vectors into canonical order (by field name) in new VectorContainer. - */ - public static VectorContainer canonicalize(VectorContainer original) { - VectorContainer vc = new VectorContainer(); - List<VectorWrapper<?>> canonicalWrappers = new ArrayList<>(original.wrappers); - // Sort list of VectorWrapper alphabetically based on SchemaPath. - Collections.sort(canonicalWrappers, new Comparator<VectorWrapper<?>>() { - @Override - public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) { - return v1.getField().getName().compareTo(v2.getField().getName()); - } - }); - - for (VectorWrapper<?> w : canonicalWrappers) { - if (w.isHyper()) { - vc.add(w.getValueVectors()); - } else { - vc.add(w.getValueVector()); - } - } - vc.allocator = original.allocator; - return vc; - } - private void cloneAndTransfer(VectorWrapper<?> wrapper) { wrappers.add(wrapper.cloneAndTransfer(getAllocator())); } http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java index bdb080c..b4ac11f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java @@ -531,4 +531,21 @@ public class TestStarQueries extends BaseTestQuery{ .run(); } + @Test // DRILL-5822 + public void testSchemaForParallelizedStarOrderBy() throws Exception { + final String query = "select * from cp.`tpch/region.parquet` order by r_name"; + final BatchSchema expectedSchema = new SchemaBuilder() + .add("r_regionkey", TypeProtos.MinorType.INT) + .add("r_name",TypeProtos.MinorType.VARCHAR) + .add("r_comment", TypeProtos.MinorType.VARCHAR) + .build(); + + testBuilder() + .sqlQuery(query) + .optionSettingQueriesForTestQuery("alter session set `planner.slice_target`=1") + .schemaBaseLine(expectedSchema) + .build() + .run(); + } + }