DRILL-600: Support planning for Union-All.  Added infrastructure for planning 
Union-Distinct (not enabled yet).


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/53e218ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/53e218ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/53e218ef

Branch: refs/heads/master
Commit: 53e218ef6f81eed4c72dcc253ea961348310f199
Parents: d9a2f1c
Author: Aman Sinha <[email protected]>
Authored: Tue Jun 10 14:54:09 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 11 21:25:16 2014 -0700

----------------------------------------------------------------------
 .../physical/base/AbstractPhysicalVisitor.java  |   2 +-
 .../exec/physical/base/PhysicalVisitor.java     |   2 +-
 .../drill/exec/physical/config/Union.java       |  65 --------
 .../drill/exec/physical/config/UnionAll.java    |  65 ++++++++
 .../impl/union/UnionAllBatchCreator.java        |  40 +++++
 .../impl/union/UnionAllRecordBatch.java         | 157 +++++++++++++++++++
 .../physical/impl/union/UnionBatchCreator.java  |  40 -----
 .../physical/impl/union/UnionRecordBatch.java   | 148 -----------------
 .../exec/planner/common/DrillUnionRelBase.java  |   7 +-
 .../exec/planner/logical/DrillRuleSets.java     |   2 +
 .../exec/planner/logical/DrillUnionRel.java     |   8 +-
 .../exec/planner/logical/DrillUnionRule.java    |  10 +-
 .../drill/exec/planner/physical/PrelUtil.java   |   4 +
 .../exec/planner/physical/UnionAllPrel.java     |  94 +++++++++++
 .../exec/planner/physical/UnionAllPrule.java    |  72 +++++++++
 .../planner/physical/UnionDistinctPrel.java     |  95 +++++++++++
 .../planner/physical/UnionDistinctPrule.java    |  72 +++++++++
 .../drill/exec/planner/physical/UnionPrel.java  |  48 ++++++
 .../org/apache/drill/TestExampleQueries.java    |  35 ++++-
 .../src/test/resources/union/test1.json         |   4 +-
 20 files changed, 707 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 0107101..c7c82aa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -28,7 +28,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends 
Throwable> impleme
   }
 
   @Override
-  public T visitUnion(Union union, X value) throws E {
+  public T visitUnion(UnionAll union, X value) throws E {
     return visitOp(union, value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 7a1440a..1798289 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -35,7 +35,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
 
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
+  public RETURN visitUnion(UnionAll union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
deleted file mode 100644
index 522100f..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Union.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.config;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-import java.util.List;
-
-@JsonTypeName("union")
-
-public class Union extends AbstractMultiple {
-
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Filter.class);
-
-  @JsonCreator
-  public Union(@JsonProperty("children") PhysicalOperator[] children) {
-    super(children);
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
-    return physicalVisitor.visitUnion(this, value);
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new Union(children.toArray(new PhysicalOperator[children.size()]));
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    OperatorCost cost = new OperatorCost(0,0,0,0);
-    for (int i = 0; i < children.length; i++) {
-      PhysicalOperator child = children[i];
-      cost.add(child.getCost());
-    }
-    return cost;
-  }
-
-  @Override
-  public int getOperatorType() {
-    return CoreOperatorType.UNION_VALUE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionAll.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionAll.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionAll.java
new file mode 100644
index 0000000..f0ca6ad
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionAll.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import java.util.List;
+
+@JsonTypeName("union-all")
+
+public class UnionAll extends AbstractMultiple {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Filter.class);
+
+  @JsonCreator
+  public UnionAll(@JsonProperty("children") PhysicalOperator[] children) {
+    super(children);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitUnion(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new UnionAll(children.toArray(new 
PhysicalOperator[children.size()]));
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    OperatorCost cost = new OperatorCost(0,0,0,0);
+    for (int i = 0; i < children.length; i++) {
+      PhysicalOperator child = children[i];
+      cost.add(child.getCost());
+    }
+    return cost;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.UNION_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
new file mode 100644
index 0000000..395cab4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.union;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class UnionAllBatchCreator implements BatchCreator<UnionAll>{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, UnionAll config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() >= 1);
+    return new UnionAllRecordBatch(config, children, context);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
new file mode 100644
index 0000000..e17c06c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.union;
+
+import com.google.common.collect.Lists;
+
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
+
+  private final List<RecordBatch> incoming;
+  private SelectionVector2 sv;
+  private Iterator<RecordBatch> incomingIterator = null;
+  private RecordBatch current = null;
+  private ArrayList<TransferPair> transfers;
+  private int outRecordCount;
+
+  public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, 
FragmentContext context) throws OutOfMemoryException {
+    super(config, context);
+    this.incoming = children;
+    this.incomingIterator = incoming.iterator();
+    current = incomingIterator.next();
+    sv = null;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return outRecordCount;
+  }
+
+  @Override
+  public void kill() {
+    if(current != null){
+      current.kill();
+      current = null;
+    }
+    for(;incomingIterator.hasNext();){
+      incomingIterator.next().kill();
+    }
+  }
+
+  @Override
+  protected void killIncoming() {
+    for (int i = 0; i < incoming.size(); i++) {
+      RecordBatch in = incoming.get(i);
+      in.kill();
+    }
+  }
+
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return sv;
+  }
+
+  @Override
+  public IterOutcome innerNext() {
+    if (current == null) { // end of iteration
+      return IterOutcome.NONE;
+    }
+    IterOutcome upstream = current.next();
+    logger.debug("Upstream... {}", upstream);
+    while (upstream == IterOutcome.NONE) {
+      if (!incomingIterator.hasNext()) {
+        current = null;
+        return IterOutcome.NONE;
+      }
+      current = incomingIterator.next();
+      upstream = current.next();
+    }
+    switch (upstream) {
+      case NONE:
+        throw new IllegalArgumentException("not possible!");
+      case NOT_YET:
+      case STOP:
+        return upstream;
+      case OK_NEW_SCHEMA:
+        setupSchema();
+        // fall through.
+      case OK:
+        doTransfer();
+        return upstream; // change if upstream changed, otherwise normal.
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  private void doTransfer() {
+    outRecordCount = current.getRecordCount();
+    if (container.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
+      this.sv = current.getSelectionVector2();
+    }
+    for (TransferPair transfer : transfers) {
+      transfer.transfer();
+    }
+
+//    for (VectorWrapper<?> vw : this.container) {
+//      ValueVector.Mutator m = vw.getValueVector().getMutator();
+//      m.setValueCount(outRecordCount);
+//    }
+
+  }
+
+  private void setupSchema() {
+    if (container != null) {
+      container.clear();
+    }
+    transfers = Lists.newArrayList();
+
+    for (VectorWrapper<?> vw : current) {
+      TransferPair pair = vw.getValueVector().getTransferPair();
+      container.add(pair.getTo());
+      transfers.add(pair);
+    }
+    container.buildSchema(current.getSchema().getSelectionVectorMode());
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this);
+  }
+  
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    for (int i = 0; i < incoming.size(); i++) {
+      RecordBatch in = incoming.get(i);
+      in.cleanup();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
deleted file mode 100644
index d9f8813..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionBatchCreator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.union;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import com.google.common.base.Preconditions;
-
-public class UnionBatchCreator implements BatchCreator<Union>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionBatchCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, Union config, 
List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.size() >= 1);
-    return new UnionRecordBatch(config, children, context);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
deleted file mode 100644
index d515323..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.union;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.ValueVector;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class UnionRecordBatch extends AbstractRecordBatch<Union> {
-
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnionRecordBatch.class);
-
-  private final List<RecordBatch> incoming;
-  private SelectionVector2 sv;
-  private Iterator<RecordBatch> incomingIterator = null;
-  private RecordBatch current = null;
-  private ArrayList<TransferPair> transfers;
-  private int outRecordCount;
-
-  public UnionRecordBatch(Union config, List<RecordBatch> children, 
FragmentContext context) throws OutOfMemoryException {
-    super(config, context);
-    this.incoming = children;
-    this.incomingIterator = incoming.iterator();
-    current = incomingIterator.next();
-    sv = null;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return outRecordCount;
-  }
-
-  @Override
-  public void kill() {
-    if(current != null){
-      current.kill();
-      current = null;
-    }
-    for(;incomingIterator.hasNext();){
-      incomingIterator.next().kill();
-    }
-  }
-
-  @Override
-  protected void killIncoming() {
-    for (int i = 0; i < incoming.size(); i++) {
-      RecordBatch in = incoming.get(i);
-      in.kill();
-    }
-  }
-
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    return sv;
-  }
-
-  @Override
-  public IterOutcome innerNext() {
-    if (current == null) { // end of iteration
-      return IterOutcome.NONE;
-    }
-    IterOutcome upstream = current.next();
-    logger.debug("Upstream... {}", upstream);
-    while (upstream == IterOutcome.NONE) {
-      if (!incomingIterator.hasNext()) {
-        current = null;
-        return IterOutcome.NONE;
-      }
-      current = incomingIterator.next();
-      upstream = current.next();
-    }
-    switch (upstream) {
-      case NONE:
-        throw new IllegalArgumentException("not possible!");
-      case NOT_YET:
-      case STOP:
-        return upstream;
-      case OK_NEW_SCHEMA:
-        setupSchema();
-        // fall through.
-      case OK:
-        doTransfer();
-        return upstream; // change if upstream changed, otherwise normal.
-      default:
-        throw new UnsupportedOperationException();
-    }
-  }
-
-  private void doTransfer() {
-    outRecordCount = current.getRecordCount();
-    if (container.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-      this.sv = current.getSelectionVector2();
-    }
-    for (TransferPair transfer : transfers) {
-      transfer.transfer();
-    }
-
-//    for (VectorWrapper<?> vw : this.container) {
-//      ValueVector.Mutator m = vw.getValueVector().getMutator();
-//      m.setValueCount(outRecordCount);
-//    }
-
-  }
-
-  private void setupSchema() {
-    if (container != null) {
-      container.clear();
-    }
-    transfers = Lists.newArrayList();
-
-    for (VectorWrapper<?> vw : current) {
-      TransferPair pair = vw.getValueVector().getTransferPair();
-      container.add(pair.getTo());
-      transfers.add(pair);
-    }
-    container.buildSchema(current.getSchema().getSelectionVectorMode());
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
index a16b8ee..6a828e2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnionRelBase.java
@@ -19,10 +19,10 @@ package org.apache.drill.exec.planner.common;
 
 import java.util.List;
 
+import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.UnionRelBase;
 import org.eigenbase.relopt.RelOptCluster;
-
 import org.eigenbase.relopt.RelTraitSet;
 
 /**
@@ -31,7 +31,10 @@ import org.eigenbase.relopt.RelTraitSet;
 public abstract class DrillUnionRelBase extends UnionRelBase implements 
DrillRelNode {
  
   public DrillUnionRelBase(RelOptCluster cluster, RelTraitSet traits,
-      List<RelNode> inputs, boolean all) {
+      List<RelNode> inputs, boolean all) throws InvalidRelException {
     super(cluster, traits, inputs, all);
+    if (! this.isHomogeneous(false /* don't compare names */)) {
+      throw new InvalidRelException("Input row types of the Union are not 
compatible.");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index c07fee3..9a256bb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -165,6 +165,8 @@ public class DrillRuleSets {
     ruleList.add(LimitPrule.INSTANCE);
     ruleList.add(WriterPrule.INSTANCE);
     ruleList.add(PushLimitToTopN.INSTANCE);
+    ruleList.add(UnionAllPrule.INSTANCE);
+    // ruleList.add(UnionDistinctPrule.INSTANCE);
     
     PlannerSettings ps = qcontext.getPlannerSettings();
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
index 93c4ca7..fdf43dd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
@@ -39,14 +39,18 @@ import org.eigenbase.relopt.RelTraitSet;
 public class DrillUnionRel extends DrillUnionRelBase implements DrillRel {
   /** Creates a DrillUnionRel. */
   public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
-      List<RelNode> inputs, boolean all) {
+      List<RelNode> inputs, boolean all) throws InvalidRelException {
     super(cluster, traits, inputs, all);
   }
 
   @Override
   public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs,
       boolean all) {
-    return new DrillUnionRel(getCluster(), traitSet, inputs, all);
+    try { 
+      return new DrillUnionRel(getCluster(), traitSet, inputs, all);
+    } catch (InvalidRelException e) {
+      throw new AssertionError(e) ;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
index b607605..379d8e1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
@@ -18,18 +18,22 @@
 package org.apache.drill.exec.planner.logical;
 
 import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.UnionRel;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.*;
+import org.eigenbase.trace.EigenbaseTrace;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Logger;
 
 /**
  * Rule that converts a {@link UnionRel} to a {@link DrillUnionRelBase}, 
implemented by a "union" operation.
  */
 public class DrillUnionRule extends RelOptRule {
   public static final RelOptRule INSTANCE = new DrillUnionRule();
+  protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
 
   private DrillUnionRule() {
     super(RelOptHelper.any(UnionRel.class, Convention.NONE), "DrillUnionRule");
@@ -44,6 +48,10 @@ public class DrillUnionRule extends RelOptRule {
       final RelNode convertedInput = convert(input, 
input.getTraitSet().plus(DrillRel.DRILL_LOGICAL));
       convertedInputs.add(convertedInput);
     }
-    call.transformTo(new DrillUnionRel(union.getCluster(), traits, 
convertedInputs, union.all));
+    try { 
+      call.transformTo(new DrillUnionRel(union.getCluster(), traits, 
convertedInputs, union.all));
+    } catch (InvalidRelException e) {
+      tracer.warning(e.toString()) ;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index d982647..c66ff5d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -95,6 +95,10 @@ public class PrelUtil {
   public static Iterator<Prel> iter(RelNode... nodes){
     return (Iterator<Prel>) (Object) Arrays.asList(nodes).iterator();
   }
+  
+  public static Iterator<Prel> iter(List<RelNode> nodes) {
+    return (Iterator<Prel>) (Object) nodes.iterator();
+  }
 
   public static PlannerSettings getSettings(RelOptCluster cluster){
     return 
cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
new file mode 100644
index 0000000..20722e9
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrel.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.UnionRelBase;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.google.common.collect.Lists;
+
+public class UnionAllPrel extends UnionPrel {
+
+  public UnionAllPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> 
inputs) throws InvalidRelException {
+    super(cluster, traits, inputs, true /* all */);
+  }
+
+
+  @Override
+  public UnionRelBase copy(RelTraitSet traitSet, List<RelNode> inputs, boolean 
all) {
+    try {
+      return new UnionAllPrel(this.getCluster(), traitSet, inputs);
+    }catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1);
+    }
+    double totalInputRowCount = 0;
+    for (int i = 0; i < this.getInputs().size(); i++) {
+      totalInputRowCount += 
RelMetadataQuery.getRowCount(this.getInputs().get(i));
+    }
+
+    double cpuCost = totalInputRowCount * DrillCostBase.BASE_CPU_COST;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(totalInputRowCount, cpuCost, 0, 0);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    List<PhysicalOperator> inputPops = Lists.newArrayList();
+    
+    for (int i = 0; i < this.getInputs().size(); i++) {
+      inputPops.add( 
((Prel)this.getInputs().get(i)).getPhysicalOperator(creator));
+    }
+
+    UnionAll unionall = new UnionAll(inputPops.toArray(new 
PhysicalOperator[inputPops.size()]));
+    unionall.setOperatorId(creator.getOperatorId(this));
+
+    return unionall;
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
new file mode 100644
index 0000000..1ce73a4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionAllPrule.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.drill.exec.planner.logical.DrillUnionRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import com.google.common.collect.Lists;
+
+public class UnionAllPrule extends Prule {
+  public static final RelOptRule INSTANCE = new UnionAllPrule();
+  protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+  private UnionAllPrule() {
+    super(
+        RelOptHelper.any(DrillUnionRel.class), "Prel.UnionAllPrule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    DrillUnionRel union = (DrillUnionRel) call.rel(0);
+    return ((! union.isDistinct()) && union.isHomogeneous(false /* don't 
compare names */)); 
+  }
+  
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillUnionRel union = (DrillUnionRel) call.rel(0);
+    final List<RelNode> inputs = union.getInputs();
+    List<RelNode> convertedInputList = Lists.newArrayList();
+    RelTraitSet traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+    
+    try {
+      for (int i = 0; i < inputs.size(); i++) {
+        RelNode convertedInput = convert(inputs.get(i), 
PrelUtil.fixTraits(call, traits));
+        convertedInputList.add(convertedInput);    
+      }
+      
+      traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
+      UnionAllPrel unionAll = new UnionAllPrel(union.getCluster(), traits, 
convertedInputList);
+
+      call.transformTo(unionAll);
+      
+    } catch (InvalidRelException e) {
+      tracer.warning(e.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
new file mode 100644
index 0000000..ad63906
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrel.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnionAll;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.UnionRelBase;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.google.common.collect.Lists;
+
+public class UnionDistinctPrel extends UnionPrel {
+
+  public UnionDistinctPrel(RelOptCluster cluster, RelTraitSet traits, 
List<RelNode> inputs) throws InvalidRelException {
+    super(cluster, traits, inputs, false /* all = false */);
+  }
+
+
+  @Override
+  public UnionRelBase copy(RelTraitSet traitSet, List<RelNode> inputs, boolean 
all) {
+    try {
+      return new UnionDistinctPrel(this.getCluster(), traitSet, inputs);
+    }catch (InvalidRelException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+      return super.computeSelfCost(planner).multiplyBy(.1);
+    }
+    double totalInputRowCount = 0;
+    for (int i = 0; i < this.getInputs().size(); i++) {
+      totalInputRowCount += 
RelMetadataQuery.getRowCount(this.getInputs().get(i));
+    }
+
+    double cpuCost = totalInputRowCount * DrillCostBase.BASE_CPU_COST;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    return costFactory.makeCost(totalInputRowCount, cpuCost, 0, 0);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    List<PhysicalOperator> inputPops = Lists.newArrayList();
+    
+    for (int i = 0; i < this.getInputs().size(); i++) {
+      inputPops.add( 
((Prel)this.getInputs().get(i)).getPhysicalOperator(creator));
+    }
+
+    ///TODO: change this to UnionDistinct once implemented end-to-end..
+    UnionAll unionAll = new UnionAll(inputPops.toArray(new 
PhysicalOperator[inputPops.size()]));
+    unionAll.setOperatorId(creator.getOperatorId(this));
+
+    return unionAll;
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
new file mode 100644
index 0000000..7cd5733
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionDistinctPrule.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.drill.exec.planner.logical.DrillUnionRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import com.google.common.collect.Lists;
+
+public class UnionDistinctPrule extends Prule {
+  public static final RelOptRule INSTANCE = new UnionDistinctPrule();
+  protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+  private UnionDistinctPrule() {
+    super(
+        RelOptHelper.any(DrillUnionRel.class), "Prel.UnionDistinctPrule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    DrillUnionRel union = (DrillUnionRel) call.rel(0);
+    return (union.isDistinct() && union.isHomogeneous(false /* don't compare 
names */));
+  }
+  
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillUnionRel union = (DrillUnionRel) call.rel(0);
+    final List<RelNode> inputs = union.getInputs();
+    List<RelNode> convertedInputList = Lists.newArrayList();
+    RelTraitSet traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+    
+    try {
+      for (int i = 0; i < inputs.size(); i++) {
+        RelNode convertedInput = convert(inputs.get(i), 
PrelUtil.fixTraits(call, traits));
+        convertedInputList.add(convertedInput);    
+      }
+      
+      traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
+      UnionDistinctPrel unionDistinct = new 
UnionDistinctPrel(union.getCluster(), traits, convertedInputList);
+
+      call.transformTo(unionDistinct);
+      
+    } catch (InvalidRelException e) {
+      tracer.warning(e.toString());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
new file mode 100644
index 0000000..cade2df
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionPrel.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+
+public abstract class UnionPrel extends DrillUnionRelBase implements Prel{
+
+  public UnionPrel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> 
inputs, boolean all) throws InvalidRelException{
+    super(cluster, traits, inputs, all);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(this.getInputs());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 1757290..2fcc4fb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -18,6 +18,7 @@
 package org.apache.drill;
 
 import org.apache.drill.common.util.FileUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestExampleQueries extends BaseTestQuery{
@@ -37,7 +38,7 @@ public class TestExampleQueries extends BaseTestQuery{
     test("select count(*) from cp.`customer.json` limit 1");
     test("select count(*) from cp.`customer.json` limit 1");
   }
-
+  
   @Test
   public void testCaseReturnValueVarChar() throws Exception{
     test("select case when employee_id < 1000 then 'ABC' else 'DEF' end from 
cp.`employee.json` limit 5");
@@ -156,4 +157,36 @@ public class TestExampleQueries extends BaseTestQuery{
     test("select count(*) as mycnt, count(*) + 2 * count(*) as mycnt2 from 
cp.`tpch/nation.parquet` where 1 < 2");
   }
 
+
+  @Test    // Simple Union-All over two scans
+  public void testUnionAll1() throws Exception {
+    test("select n_regionkey from cp.`tpch/nation.parquet` union all select 
r_regionkey from cp.`tpch/region.parquet`");  
+  }
+
+  @Test  // Union-All over inner joins
+  public void testUnionAll2() throws Exception {
+    test("select n1.n_nationkey from cp.`tpch/nation.parquet` n1 inner join 
cp.`tpch/region.parquet` r1 on n1.n_regionkey = r1.r_regionkey where 
n1.n_nationkey in (1, 2)  union all select n2.n_nationkey from 
cp.`tpch/nation.parquet` n2 inner join cp.`tpch/region.parquet` r2 on 
n2.n_regionkey = r2.r_regionkey where n2.n_nationkey in (3, 4)");
+  }
+  
+  @Test  // Union-All over grouped aggregates
+  public void testUnionAll3() throws Exception {
+    test("select n1.n_nationkey from cp.`tpch/nation.parquet` n1 where 
n1.n_nationkey in (1, 2) group by n1.n_nationkey union all select 
r1.r_regionkey from cp.`tpch/region.parquet` r1 group by r1.r_regionkey");
+  }
+  
+  @Test    // Chain of Union-Alls
+  public void testUnionAll4() throws Exception {
+    test("select n_regionkey from cp.`tpch/nation.parquet` union all select 
r_regionkey from cp.`tpch/region.parquet` union all select n_nationkey from 
cp.`tpch/nation.parquet` union all select c_custkey from 
cp.`tpch/customer.parquet` where c_custkey < 5");  
+  }
+  
+  @Test  // Union-All of all columns in the table
+  public void testUnionAll5() throws Exception {
+    test("select * from cp.`tpch/region.parquet` r1 union all select * from 
cp.`tpch/region.parquet` r2");
+  }
+  
+  @Test
+  @Ignore // Produces wrong result
+  public void testUnionAll6() throws Exception {
+    test("select n_nationkey, n_regionkey from cp.`tpch/nation.parquet` where 
n_regionkey = 1 union all select r_regionkey, r_regionkey from 
cp.`tpch/region.parquet` where r_regionkey = 2");
+  }  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/53e218ef/exec/java-exec/src/test/resources/union/test1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/union/test1.json 
b/exec/java-exec/src/test/resources/union/test1.json
index 5b33232..a4dcc08 100644
--- a/exec/java-exec/src/test/resources/union/test1.json
+++ b/exec/java-exec/src/test/resources/union/test1.json
@@ -40,7 +40,7 @@
         {
             @id:4,
             children: [1,3],
-            pop:"union"
+            pop:"union-all"
         },
         {
           @id:5,
@@ -54,4 +54,4 @@
             pop: "screen"
         }
     ]
-}
\ No newline at end of file
+}

Reply via email to