Updated Branches: refs/heads/master d162e1338 -> 673a96d13
Fix for DRILL-65. Simple tests for distinct and non-distinct union. To make things simple, just treated all unions as blocking. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/673a96d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/673a96d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/673a96d1 Branch: refs/heads/master Commit: 673a96d136165dfc242e598fd501f5dfba8c5edf Parents: d162e13 Author: Jacques Nadeau <[email protected]> Authored: Thu May 23 09:46:27 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 23 09:46:27 2013 -0700 ---------------------------------------------------------------------- .../apache/drill/common/logical/data/Union.java | 10 +- .../org/apache/drill/exec/ref/ROPConverter.java | 3 +- .../org/apache/drill/exec/ref/UnbackedRecord.java | 22 ++++ .../org/apache/drill/exec/ref/rops/UnionROP.java | 79 ++++++++++----- .../apache/drill/exec/ref/values/DataValue.java | 1 + .../drill/exec/ref/values/SimpleMapValue.java | 21 ++++ .../apache/drill/exec/ref/rops/UnionROPTest.java | 46 +++++++++ .../ref/src/test/resources/union/distinct.json | 71 +++++++++++++ .../ref/src/test/resources/union/nondistinct.json | 71 +++++++++++++ 9 files changed, 291 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java index 7c8b88e..487401b 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Union.java @@ -26,10 +26,10 @@ public class Union extends LogicalOperatorBase { private final LogicalOperator[] inputs; private final boolean distinct; - @JsonCreator - public Union(@JsonProperty("inputs") LogicalOperator[] inputs){ - this(inputs, false); - } +// @JsonCreator +// public Union(@JsonProperty("inputs") LogicalOperator[] inputs){ +// this(inputs, false); +// } @JsonCreator public Union(@JsonProperty("inputs") LogicalOperator[] inputs, @JsonProperty("distinct") Boolean distinct){ @@ -37,7 +37,7 @@ public class Union extends LogicalOperatorBase { for (LogicalOperator o : inputs) { o.registerAsSubscriber(this); } - this.distinct = distinct; + this.distinct = distinct == null ? false : distinct; } public LogicalOperator[] getInputs() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java index 06a8690..90f3374 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/ROPConverter.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.ref.rops.ROP; import org.apache.drill.exec.ref.rops.ScanROP; import org.apache.drill.exec.ref.rops.StoreROP; import org.apache.drill.exec.ref.rops.UnionROP; +import org.apache.drill.exec.ref.rops.UnionROP; import org.apache.drill.exec.ref.rse.RSERegistry; import org.apache.drill.exec.ref.rse.ReferenceStorageEngine; import org.apache.drill.exec.ref.rse.ReferenceStorageEngine.ReadEntry; @@ -134,7 +135,7 @@ class ROPConverter { scanner.init(registry, builder); return; default: - Union logOp = new Union(null); + Union logOp = new Union(null, false); ROP parentUnion = new UnionROP(logOp); ScanROP[] scanners = new ScanROP[readEntries.size()]; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java index 6152a32..1fa4348 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java @@ -105,5 +105,27 @@ public class UnbackedRecord implements RecordPointer { return "UnbackedRecord [root=" + root + "]"; } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((root == null) ? 0 : root.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + UnbackedRecord other = (UnbackedRecord) obj; + if (root == null) { + if (other.root != null) return false; + } else if (!root.equals(other.root)) return false; + return true; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java index aca0b5a..b823e68 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/UnionROP.java @@ -17,66 +17,91 @@ ******************************************************************************/ package org.apache.drill.exec.ref.rops; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Union; import org.apache.drill.exec.ref.IteratorRegistry; import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.RecordIterator.NextOutcome; import org.apache.drill.exec.ref.RecordPointer; -import org.apache.drill.exec.ref.eval.EvaluatorFactory; +import org.apache.drill.exec.ref.UnbackedRecord; +import org.apache.drill.exec.ref.exceptions.SetupException; -public class UnionROP extends ROPBase<LogicalOperator>{ - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class UnionROP extends ROPBase<Union> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROP.class); - - private List<RecordIterator> incoming; - private ProxySimpleRecord record; + + private Collection<UnbackedRecord> records; + private List<RecordIterator> incoming = Lists.newArrayList(); + private Iterator<UnbackedRecord> iterator; public UnionROP(Union config) { super(config); + // to make things simple, we'll just always make this a blocking operator. + if(config.isDistinct()){ + records = Sets.newHashSet(); + }else{ + records = Lists.newArrayList(); + } } - + @Override - protected void setupEvals(EvaluatorFactory builder) { + protected void setupIterators(IteratorRegistry registry) throws SetupException { + for(LogicalOperator op : config.getInputs()){ + List<RecordIterator> more = registry.getOperator(op); + if(more.size() != 1) throw new SetupException("Iterator list was incorrect size."); + incoming.addAll(more); + } } - @Override - protected void setupIterators(IteratorRegistry builder) { - incoming = builder.getOperator(config); - record.setRecord(incoming.get(0).getRecordPointer()); + + protected void doWork() { + for(RecordIterator ri : incoming){ + RecordPointer rp = ri.getRecordPointer(); + while(ri.next() != NextOutcome.NONE_LEFT){ + UnbackedRecord r = new UnbackedRecord(); + r.copyFrom(rp); + records.add(r); + } + } + this.iterator = records.iterator(); } @Override protected RecordIterator getIteratorInternal() { - return new MultiIterator(); + return new ProxyIterator(); } - private class MultiIterator implements RecordIterator{ - private int current = 0; + private class ProxyIterator implements RecordIterator{ + private ProxySimpleRecord proxyRecord = new ProxySimpleRecord(); + + @Override + public RecordPointer getRecordPointer() { + return proxyRecord; + } @Override public NextOutcome next() { - for(; current < incoming.size(); current++, record.setRecord(incoming.get(current).getRecordPointer())) - while(current < incoming.size()){ + if(iterator == null) doWork(); - NextOutcome n = incoming.get(current).next(); - if(n != NextOutcome.NONE_LEFT) return n; - + if(iterator.hasNext()){ + proxyRecord.setRecord(iterator.next()); + return NextOutcome.INCREMENTED_SCHEMA_CHANGED; + }else{ + return NextOutcome.NONE_LEFT; } - return NextOutcome.NONE_LEFT; + } @Override public ROP getParent() { return UnionROP.this; } - - @Override - public RecordPointer getRecordPointer() { - return record; - } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java index 9e40014..c1a2980 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/DataValue.java @@ -22,6 +22,7 @@ public interface DataValue { public BooleanValue getAsBooleanValue(); public BytesValue getAsBytesValue(); public boolean equals(DataValue v); + public boolean equals(Object v); public int hashCode(); public DataValue copy(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java index e16e8c1..1c170f2 100644 --- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java +++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/SimpleMapValue.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.ref.values; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -106,4 +107,24 @@ public class SimpleMapValue extends BaseMapValue{ } return out; } + + @Override + public String toString() { + final int maxLen = 10; + return "SimpleMapValue [map=" + (map != null ? toString(map.entrySet(), maxLen) : null) + "]"; + } + + private String toString(Collection<?> collection, int maxLen) { + StringBuilder builder = new StringBuilder(); + builder.append("["); + int i = 0; + for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) { + if (i > 0) builder.append(", "); + builder.append(iterator.next()); + } + builder.append("]"); + return builder.toString(); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java new file mode 100644 index 0000000..7168a55 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/UnionROPTest.java @@ -0,0 +1,46 @@ +package org.apache.drill.exec.ref.rops; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ref.TestUtils; +import org.apache.drill.exec.ref.UnbackedRecord; +import org.apache.drill.exec.ref.values.DataValue; +import org.apache.drill.exec.ref.values.ScalarValues.LongScalar; +import org.junit.Test; + +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +public class UnionROPTest { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionROPTest.class); + + + @Test + public void checkDistinct() throws Exception{ + TestUtils.assertProduceCount("/union/distinct.json", 5); + } + + @Test + public void checkNonDistinct() throws Exception{ + TestUtils.assertProduceCount("/union/nondistinct.json", 10); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json new file mode 100644 index 0000000..b975a77 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/resources/union/distinct.json @@ -0,0 +1,71 @@ +{ + "head" : { + "type" : "apache_drill_logical_plan", + "version" : 1, + "generator" : { + "type" : "manual", + "info" : "na" + } + }, + "storage" : [ { + "type" : "queue", + "name" : "queue", + "encoding" : "RECORD" + + }, { + "type" : "classpath", + "name" : "donuts-json" + } ], + "query" : [ { + "op" : "scan", + "@id" : 1, + "memo" : "initial_scan", + "storageengine" : "donuts-json", + "selection" : { + "path" : "/employees.json", + "type" : "JSON" + }, + "ref" : "_MAP" + }, { + "op" : "project", + "input" : 1, + "@id" : 2, + "projections" : [ { + "ref" : "output.deptId", + "expr" : "_MAP.deptId" + } ] + }, { + "op" : "scan", + "@id" : 3, + "memo" : "initial_scan", + "storageengine" : "donuts-json", + "selection" : { + "path" : "/departments.json", + "type" : "JSON" + }, + "ref" : "_MAP" + }, { + "op" : "project", + "input" : 3, + "@id" : 4, + "projections" : [ { + "ref" : "output.deptId", + "expr" : "_MAP.deptId" + } ] + }, { + "op": "union", + "@id" : 5, + "distinct": true, + "inputs": [2, 4] + }, { + "op" : "store", + "input" : 5, + "@id" : 6, + "memo" : "output sink", + "target" : { + "number" : 0 + }, + "partition" : null, + "storageEngine" : "queue" + } ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/673a96d1/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json new file mode 100644 index 0000000..817ed48 --- /dev/null +++ b/sandbox/prototype/exec/ref/src/test/resources/union/nondistinct.json @@ -0,0 +1,71 @@ +{ + "head" : { + "type" : "apache_drill_logical_plan", + "version" : 1, + "generator" : { + "type" : "manual", + "info" : "na" + } + }, + "storage" : [ { + "type" : "queue", + "name" : "queue", + "encoding" : "RECORD" + + }, { + "type" : "classpath", + "name" : "donuts-json" + } ], + "query" : [ { + "op" : "scan", + "@id" : 1, + "memo" : "initial_scan", + "storageengine" : "donuts-json", + "selection" : { + "path" : "/employees.json", + "type" : "JSON" + }, + "ref" : "_MAP" + }, { + "op" : "project", + "input" : 1, + "@id" : 2, + "projections" : [ { + "ref" : "output.deptId", + "expr" : "_MAP.deptId" + } ] + }, { + "op" : "scan", + "@id" : 3, + "memo" : "initial_scan", + "storageengine" : "donuts-json", + "selection" : { + "path" : "/departments.json", + "type" : "JSON" + }, + "ref" : "_MAP" + }, { + "op" : "project", + "input" : 3, + "@id" : 4, + "projections" : [ { + "ref" : "output.deptId", + "expr" : "_MAP.deptId" + } ] + }, { + "op": "union", + "@id" : 5, + "distinct": false, + "inputs": [2, 4] + }, { + "op" : "store", + "input" : 5, + "@id" : 6, + "memo" : "output sink", + "target" : { + "number" : 0 + }, + "partition" : null, + "storageEngine" : "queue" + } ] +}
