DRILL-1333: Flatten operator for allowing more complex queryies against 
repeated data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2871c3ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2871c3ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2871c3ba

Branch: refs/heads/master
Commit: 2871c3ba515e06cd4bae98742139a99e12e42407
Parents: 7309506
Author: Jason Altekruse <altekruseja...@gmail.com>
Authored: Thu Oct 9 17:55:45 2014 -0700
Committer: Jason Altekruse <altekruseja...@gmail.com>
Committed: Wed Oct 29 08:44:11 2014 -0700

----------------------------------------------------------------------
 .../src/main/codegen/templates/BaseWriter.java  |   3 +-
 .../codegen/templates/RepeatedValueVectors.java |  17 +-
 .../drill/exec/expr/EvaluationVisitor.java      |   1 +
 .../exec/expr/fn/impl/conv/DummyFlatten.java    |  44 +++
 .../org/apache/drill/exec/ops/QueryContext.java |   2 +-
 .../physical/base/AbstractPhysicalVisitor.java  |   6 +
 .../exec/physical/base/PhysicalVisitor.java     |   2 +
 .../drill/exec/physical/config/FlattenPOP.java  |  73 ++++
 .../impl/flatten/FlattenBatchCreator.java       |  40 ++
 .../impl/flatten/FlattenRecordBatch.java        | 374 +++++++++++++++++++
 .../physical/impl/flatten/FlattenTemplate.java  | 119 ++++++
 .../exec/physical/impl/flatten/Flattener.java   |  38 ++
 .../exec/planner/physical/DrillFlattenPrel.java |  75 ++++
 .../exec/planner/physical/PlannerSettings.java  |   5 +-
 .../exec/planner/physical/ProjectPrel.java      |   2 +-
 .../physical/visitor/BasePrelVisitor.java       |   6 +
 .../planner/physical/visitor/PrelVisitor.java   |   2 +
 .../visitor/RewriteProjectToFlatten.java        | 114 ++++++
 .../visitor/RexVisitorComplexExprSplitter.java  | 119 ++++++
 .../visitor/SplitUpComplexExpressions.java      | 146 ++++++++
 .../planner/sql/handlers/DefaultSqlHandler.java |   9 +
 .../exec/vector/RepeatedFixedWidthVector.java   |   7 +-
 .../vector/RepeatedVariableWidthVector.java     |   4 +-
 .../drill/exec/vector/RepeatedVector.java       |  26 ++
 .../exec/vector/complex/RepeatedListVector.java |  19 +-
 .../exec/vector/complex/RepeatedMapVector.java  | 137 +++++--
 .../vector/complex/impl/AbstractBaseWriter.java |   3 +-
 .../java/org/apache/drill/PlanningBase.java     |   2 +-
 .../org/apache/drill/TestExampleQueries.java    |  12 +
 .../resources/flatten/corrected_physical.json   |  81 ++++
 .../flatten/test_flatten_physical.json          |  78 ++++
 .../resources/jsoninput/input2_modified.json    | 106 ++++++
 .../resources/jsoninput/repeated_list_bug.json  |   6 +
 .../test/resources/store/json/nested_json.json  |   8 +
 .../store/json/test_flatten_mapify.json         |  26 ++
 35 files changed, 1665 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/codegen/templates/BaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java 
b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
index 224148c..979d4ac 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
@@ -32,7 +32,8 @@ public interface BaseWriter extends Positionable{
   boolean ok();
   WriteState getState();
   int getValueCapacity();
-  
+  void resetState();
+
   public interface MapWriter extends BaseWriter{
 
     MaterializedField getField();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 2132212..2853e83 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -45,7 +45,7 @@ package org.apache.drill.exec.vector;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using 
FreeMarker.
  */
 
- public final class Repeated${minor.class}Vector extends BaseValueVector 
implements Repeated<#if type.major == 
"VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
+public final class Repeated${minor.class}Vector extends BaseValueVector 
implements Repeated<#if type.major == 
"VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
 
   private int parentValueCount;
   private int childValueCount;
@@ -59,7 +59,8 @@ package org.apache.drill.exec.vector;
   public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator 
allocator) {
     super(field, allocator);
     this.offsets = new UInt4Vector(null, allocator);
-    this.values = new ${minor.class}Vector(null, allocator);
+    MaterializedField mf = MaterializedField.create(field.getPath(), 
Types.required(field.getType().getMinorType()));
+    this.values = new ${minor.class}Vector(mf, allocator);
   }
 
   public int getValueCapacity(){
@@ -106,7 +107,7 @@ package org.apache.drill.exec.vector;
     int endPos = offsets.getAccessor().get(startIndex+length);
     values.splitAndTransferTo(startIndex, endPos-startPos, target.values);
     target.offsets.clear();
-    target.offsets.allocateNew(length+1);
+    target.offsets.allocateNew(endPos - startPos + 1);
     int normalizedPos = 0;
     for (int i=0; i<length+1;i++) {
       normalizedPos = offsets.getAccessor().get(startIndex+i) - startPos;
@@ -303,7 +304,11 @@ package org.apache.drill.exec.vector;
     public int getCount(int index) {
       return offsets.getAccessor().get(index+1) - 
offsets.getAccessor().get(index);
     }
-    
+
+    public ValueVector getAllChildValues() {
+      return values;
+    }
+
     public List<${friendlyType}> getObject(int index) {
       List<${friendlyType}> vals = new JsonStringArrayList();
       int start = offsets.getAccessor().get(index);
@@ -313,6 +318,10 @@ package org.apache.drill.exec.vector;
       }
       return vals;
     }
+
+    public int getGroupSizeAtIndex(int index){
+      return offsets.getAccessor().get(index+1) - 
offsets.getAccessor().get(index);
+    }
     
     public ${friendlyType} getSingleObject(int index, int arrayIndex){
       int start = offsets.getAccessor().get(index);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index d1e10ae..5cf4a35 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -301,6 +301,7 @@ public class EvaluationVisitor {
             TypeHelper.getWriterInterface(inputContainer.getMinorType(), 
inputContainer.getMajorType().getMode()));
         JVar writer = generator.declareClassField("writer", writerIFace);
         generator.getSetupBlock().assign(writer, 
JExpr._new(writerImpl).arg(vv).arg(JExpr._null()));
+        generator.getEvalBlock().add(writer.invoke("resetState"));
         
generator.getEvalBlock().add(writer.invoke("setPosition").arg(outIndex));
         String copyMethod = inputContainer.isSingularRepeated() ? 
"copyAsValueSingle" : "copyAsValue";
         
generator.getEvalBlock().add(inputContainer.getHolder().invoke(copyMethod).arg(writer));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java
new file mode 100644
index 0000000..d4e3115
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyFlatten.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+/**
+ * This and {@link DummyConvertTo} class merely act as a placeholder so that 
Optiq
+ * allows the 'flatten()' function in SQL.
+ */
+@FunctionTemplate(name = "flatten", scope = FunctionScope.SIMPLE, nulls = 
NullHandling.NULL_IF_NULL)
+public class DummyFlatten implements DrillSimpleFunc {
+
+  @Output BaseWriter.ComplexWriter out;
+
+  @Override
+  public void setup(RecordBatch incoming) { }
+
+  @Override
+  public void eval() { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 4b290a5..ea48b05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -58,7 +58,7 @@ public class QueryContext{
     this.session = session;
     this.timer = new Multitimer<>(QuerySetup.class);
     this.queryOptions = new QueryOptionManager(session.getOptions());
-    this.plannerSettings = new PlannerSettings(queryOptions);
+    this.plannerSettings = new PlannerSettings(queryOptions, 
getFunctionRegistry());
     this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size());
     this.table = new DrillOperatorTable(getFunctionRegistry());
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 031ab10..27b0ecb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
@@ -105,6 +106,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitFlatten(FlattenPOP flatten, X value) throws E {
+    return visitOp(flatten, value);
+  }
+
+  @Override
   public T visitReceiver(Receiver receiver, X value) throws E {
     return visitOp(receiver, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index f9a9c21..e6a89d0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
@@ -62,6 +63,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
+  public RETURN visitFlatten(FlattenPOP flatten, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitHashJoin(HashJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
new file mode 100644
index 0000000..42e6870
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/FlattenPOP.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("flatten")
+public class FlattenPOP extends AbstractSingle {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenPOP.class);
+
+  private SchemaPath column;
+
+  @JsonCreator
+  public FlattenPOP(
+      @JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("column") SchemaPath column) {
+    super(child);
+    this.column = column;
+  }
+
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.singletonIterator(child);
+  }
+
+  public SchemaPath getColumn() {
+    return column;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitFlatten(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new FlattenPOP(child, column);
+  }
+
+  @Override
+  public int getOperatorType() {
+    // TODO - add this operator to the protobuf definition
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
new file mode 100644
index 0000000..6f02824
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.flatten;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, FlattenPOP config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new FlattenRecordBatch(config, children.iterator().next(), context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
new file mode 100644
index 0000000..5171a25
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.flatten;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.carrotsearch.hppc.IntOpenHashSet;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+
+public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);
+
+  private Flattener flattener;
+  private List<ValueVector> allocationVectors;
+  private List<ComplexWriter> complexWriters;
+  private boolean hasRemainder = false;
+  private int remainderIndex = 0;
+  private int recordCount;
+  // the buildSchema method is always called first by a short circuit path to 
return schema information to the client
+  // this information is not entirely accurate as Drill determines schema on 
the fly, so here it needs to have modified
+  // behavior for that call to setup the schema for the flatten operation
+  private boolean fastSchemaCalled;
+
+  private static final String EMPTY_STRING = "";
+
+  private class ClassifierResult {
+    public boolean isStar = false;
+    public List<String> outputNames;
+    public String prefix = "";
+
+    private void clear() {
+      isStar = false;
+      prefix = "";
+      if (outputNames != null) {
+        outputNames.clear();
+      }
+
+      // note:  don't clear the internal maps since they have cumulative data..
+    }
+  }
+
+  public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+    super(pop, context, incoming);
+    fastSchemaCalled = false;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    super.killIncoming(sendUpstream);
+    hasRemainder = false;
+  }
+
+
+  @Override
+  public IterOutcome innerNext() {
+    if (hasRemainder) {
+      handleRemainder();
+      return IterOutcome.OK;
+    }
+    return super.innerNext();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    return this.container;
+  }
+
+  private void setFlattenVector() {
+    try {
+      flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById(
+          incoming.getSchema().getColumn(
+              incoming.getValueVectorId(
+                  popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
+          
incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
+    } catch (Exception ex) {
+      throw new DrillRuntimeException("Trying to flatten a non-repeated 
filed.");
+    }
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+    int incomingRecordCount = incoming.getRecordCount();
+
+    if (!doAlloc()) {
+      outOfMemory = true;
+      return IterOutcome.OUT_OF_MEMORY;
+    }
+
+    // we call this in setupSchema, but we also need to call it here so we 
have a reference to the appropriate vector
+    // inside of the the flattener for the current batch
+    setFlattenVector();
+
+    int childCount = flattener.getFlattenField().getAccessor().getValueCount();
+    int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0);
+    // TODO - change this to be based on the repeated vector length
+    if (outputRecords < childCount) {
+      setValueCount(outputRecords);
+      hasRemainder = true;
+      remainderIndex = outputRecords;
+      this.recordCount = remainderIndex;
+    } else {
+      setValueCount(outputRecords);
+      for(VectorWrapper<?> v: incoming) {
+        v.clear();
+      }
+      this.recordCount = outputRecords;
+    }
+    // In case of complex writer expression, vectors would be added to batch 
run-time.
+    // We have to re-build the schema.
+    if (complexWriters != null) {
+      container.buildSchema(SelectionVectorMode.NONE);
+    }
+
+    return IterOutcome.OK;
+  }
+
+  private void handleRemainder() {
+    int remainingRecordCount = 
flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex;
+    if (!doAlloc()) {
+      outOfMemory = true;
+      return;
+    }
+
+    int projRecords = flattener.flattenRecords(remainderIndex, 
remainingRecordCount, 0);
+    if (projRecords < remainingRecordCount) {
+      setValueCount(projRecords);
+      this.recordCount = projRecords;
+      remainderIndex += projRecords;
+    } else {
+      setValueCount(remainingRecordCount);
+      hasRemainder = false;
+      remainderIndex = 0;
+      for (VectorWrapper<?> v : incoming) {
+        v.clear();
+      }
+      this.recordCount = remainingRecordCount;
+    }
+    // In case of complex writer expression, vectors would be added to batch 
run-time.
+    // We have to re-build the schema.
+    if (complexWriters != null) {
+      container.buildSchema(SelectionVectorMode.NONE);
+    }
+  }
+
+  public void addComplexWriter(ComplexWriter writer) {
+    complexWriters.add(writer);
+  }
+
+  private boolean doAlloc() {
+    //Allocate vv in the allocationVectors.
+    for (ValueVector v : this.allocationVectors) {
+      if (!v.allocateNewSafe()) {
+        return false;
+      }
+    }
+
+    //Allocate vv for complexWriters.
+    if (complexWriters == null) {
+      return true;
+    }
+
+    for (ComplexWriter writer : complexWriters) {
+      writer.allocate();
+    }
+
+    return true;
+  }
+
+  private void setValueCount(int count) {
+    for (ValueVector v : allocationVectors) {
+      ValueVector.Mutator m = v.getMutator();
+      m.setValueCount(count);
+    }
+
+    if (complexWriters == null) {
+      return;
+    }
+
+    for (ComplexWriter writer : complexWriters) {
+      writer.setValueCount(count);
+    }
+  }
+
+  private FieldReference getRef(NamedExpression e) {
+    FieldReference ref = e.getRef();
+    PathSegment seg = ref.getRootSegment();
+
+    return ref;
+  }
+
+  @Override
+  public IterOutcome buildSchema() throws SchemaChangeException {
+    incoming.buildSchema();
+    if ( ! fastSchemaCalled ) {
+      for (VectorWrapper vw : incoming) {
+        ValueVector vector = container.addOrGet(vw.getField());
+        container.add(vector);
+      }
+      fastSchemaCalled = true;
+      container.buildSchema(SelectionVectorMode.NONE);
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+    else {
+      setupNewSchema();
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    this.allocationVectors = Lists.newArrayList();
+    container.clear();
+    final List<NamedExpression> exprs = getExpressionList();
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final List<TransferPair> transfers = Lists.newArrayList();
+
+    final ClassGenerator<Flattener> cg = 
CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
+    IntOpenHashSet transferFieldIds = new IntOpenHashSet();
+
+    RepeatedVector flattenField = ((RepeatedVector) 
incoming.getValueAccessorById(
+          incoming.getSchema().getColumn(
+              incoming.getValueVectorId(
+                  popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
+          
incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
+
+    NamedExpression namedExpression = new 
NamedExpression(popConfig.getColumn(), new 
FieldReference(popConfig.getColumn()));
+    LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, 
collector, context.getFunctionRegistry(), true);
+    ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+    TypedFieldId id = vectorRead.getFieldId();
+    Preconditions.checkNotNull(incoming);
+
+    TransferPair tp = null;
+    if (flattenField instanceof RepeatedMapVector) {
+      tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap();
+    } else {
+      ValueVector vvIn = flattenField.getAccessor().getAllChildValues();
+      tp = vvIn.getTransferPair();
+    }
+    transfers.add(tp);
+    container.add(tp.getTo());
+    transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+
+    logger.debug("Added transfer for project expression.");
+
+    ClassifierResult result = new ClassifierResult();
+
+    for (int i = 0; i < exprs.size(); i++) {
+      namedExpression = exprs.get(i);
+      result.clear();
+
+      String outputName = getRef(namedExpression).getRootSegment().getPath();
+      if (result != null && result.outputNames != null && 
result.outputNames.size() > 0) {
+        for (int j = 0; j < result.outputNames.size(); j++) {
+          if (!result.outputNames.get(j).equals(EMPTY_STRING)) {
+            outputName = result.outputNames.get(j);
+            break;
+          }
+        }
+      }
+
+      expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), 
incoming, collector, context.getFunctionRegistry(), true);
+      final MaterializedField outputField = 
MaterializedField.create(outputName, expr.getMajorType());
+      if (collector.hasErrors()) {
+        throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+      }
+      if (expr instanceof DrillFuncHolderExpr &&
+          ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder())  {
+        // Need to process ComplexWriter function evaluation.
+        // Lazy initialization of the list of complex writers, if not done yet.
+        if (complexWriters == null) {
+          complexWriters = Lists.newArrayList();
+        }
+
+        // The reference name will be passed to ComplexWriter, used as the 
name of the output vector from the writer.
+        ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) 
expr).getHolder()).setReference(namedExpression.getRef());
+        cg.addExpr(expr);
+      } else{
+        // need to do evaluation.
+        ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
+        allocationVectors.add(vector);
+        TypedFieldId fid = container.add(vector);
+        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, 
expr, true);
+        HoldingContainer hc = cg.addExpr(write);
+
+        
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+        logger.debug("Added eval for project expression.");
+      }
+    }
+
+    cg.rotateBlock();
+    cg.getEvalBlock()._return(JExpr.TRUE);
+
+    container.buildSchema(SelectionVectorMode.NONE);
+
+    try {
+      this.flattener = context.getImplementationClass(cg.getCodeGenerator());
+      flattener.setup(context, incoming, this, transfers);
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
+    }
+    return true;
+  }
+
+  private List<NamedExpression> getExpressionList() {
+
+    List<NamedExpression> exprs = Lists.newArrayList();
+    for (MaterializedField field : incoming.getSchema()) {
+      if (field.getPath().equals(popConfig.getColumn())) {
+        continue;
+      }
+      exprs.add(new NamedExpression(field.getPath(), new 
FieldReference(field.getPath())));
+    }
+    return exprs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
new file mode 100644
index 0000000..af4cead
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.flatten;
+
+import java.util.List;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.drill.exec.vector.RepeatedVector;
+
+public abstract class FlattenTemplate implements Flattener {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class);
+
+  private ImmutableList<TransferPair> transfers;
+  private SelectionVector2 vector2;
+  private SelectionVector4 vector4;
+  private SelectionVectorMode svMode;
+  RepeatedVector fieldToFlatten;
+  private int groupIndex;
+  // this allows for groups to be written between batches if we run out of 
space, for cases where we have finished
+  // a batch on the boundary it will be set to 0
+  private int childIndexWithinCurrGroup;
+  // calculating the current group size requires reading the start and end out 
of the offset vector, this only happens
+  // once and is stored here for faster reference
+  private int currGroupSize;
+  private int childIndex;
+
+  public FlattenTemplate() throws SchemaChangeException {
+    childIndexWithinCurrGroup = -1;
+  }
+
+  @Override
+  public void setFlattenField(RepeatedVector flattenField) {
+    this.fieldToFlatten = flattenField;
+  }
+
+  public RepeatedVector getFlattenField() {
+    return fieldToFlatten;
+  }
+
+  @Override
+  public final int flattenRecords(int startIndex, final int recordCount, int 
firstOutputIndex) {
+    startIndex = childIndex;
+    switch (svMode) {
+      case FOUR_BYTE:
+        throw new UnsupportedOperationException("Flatten does not support 
selection vector inputs.");
+
+      case TWO_BYTE:
+        throw new UnsupportedOperationException("Flatten does not support 
selection vector inputs.");
+
+      case NONE:
+        if (childIndexWithinCurrGroup == -1) {
+          childIndexWithinCurrGroup = 0;
+        }
+        outer:
+        for ( ; groupIndex < fieldToFlatten.getAccessor().getGroupCount(); 
groupIndex++) {
+          currGroupSize = 
fieldToFlatten.getAccessor().getGroupSizeAtIndex(groupIndex);
+          for ( ; childIndexWithinCurrGroup < currGroupSize; 
childIndexWithinCurrGroup++) {
+            if (!doEval(groupIndex, firstOutputIndex)) {
+              break outer;
+            }
+            firstOutputIndex++;
+            childIndex++;
+          }
+          childIndexWithinCurrGroup = 0;
+        }
+
+        for (TransferPair t : transfers) {
+          t.splitAndTransfer(startIndex, childIndex - startIndex);
+        }
+        return childIndex - startIndex;
+
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public final void setup(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing, List<TransferPair> transfers)  throws 
SchemaChangeException{
+
+    this.svMode = incoming.getSchema().getSelectionVectorMode();
+    switch (svMode) {
+      case FOUR_BYTE:
+        throw new UnsupportedOperationException("Flatten does not support 
selection vector inputs.");
+      case TWO_BYTE:
+        throw new UnsupportedOperationException("Flatten does not support 
selection vector inputs.");
+    }
+    this.transfers = ImmutableList.copyOf(transfers);
+    doSetup(context, incoming, outgoing);
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
+  public abstract boolean doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
new file mode 100644
index 0000000..49b9c1b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.flatten;
+
+import java.util.List;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.RepeatedVector;
+
+public interface Flattener {
+
+  public abstract void setup(FragmentContext context, RecordBatch incoming,  
RecordBatch outgoing, List<TransferPair> transfers)  throws 
SchemaChangeException;
+  public abstract int flattenRecords(int startIndex, int recordCount, int 
firstOutputIndex);
+  public void setFlattenField(RepeatedVector repeatedColumn);
+  public RepeatedVector getFlattenField();
+
+  public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java
new file mode 100644
index 0000000..1409347
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillFlattenPrel.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class DrillFlattenPrel extends SinglePrel implements Prel {
+
+  RexNode toFlatten;
+
+  public DrillFlattenPrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child, RexNode toFlatten) {
+    super(cluster, traits, child);
+    this.toFlatten = toFlatten;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new DrillFlattenPrel(getCluster(), traitSet, sole(inputs), 
toFlatten);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    Prel child = (Prel) this.getChild();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+    FlattenPOP f = new FlattenPOP(childPOP, (SchemaPath) 
getFlattenExpression(new DrillParseContext()));
+    return creator.addMetadata(this, f);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  protected LogicalExpression getFlattenExpression(DrillParseContext context){
+    return DrillOptiq.toDrill(context, getChild(), toFlatten);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index a036a46..38274c7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
@@ -48,9 +49,11 @@ public class PlannerSettings implements Context{
   public static final OptionValidator HASH_SINGLE_KEY = new 
BooleanValidator("planner.enable_hash_single_key", true);
 
   public OptionManager options = null;
+  public FunctionImplementationRegistry functionImplementationRegistry = null;
 
-  public PlannerSettings(OptionManager options){
+  public PlannerSettings(OptionManager options, FunctionImplementationRegistry 
functionImplementationRegistry){
     this.options = options;
+    this.functionImplementationRegistry = functionImplementationRegistry;
   }
 
   public OptionManager getOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 1f621da..310e18c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -66,7 +66,7 @@ public class ProjectPrel extends DrillProjectRelBase 
implements Prel{
 
   @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
+    return logicalVisitor.visitProject(this, value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
index 81f247d..b84b345 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/BasePrelVisitor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.JoinPrel;
 import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
@@ -43,6 +44,11 @@ public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> implements
   }
 
   @Override
+  public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP {
+    return visitPrel(prel, value);
+  }
+
+  @Override
   public RETURN visitScreen(ScreenPrel prel, EXTRA value) throws EXCEP {
     return visitPrel(prel, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index f519220..b3a25c1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.JoinPrel;
 import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
 import org.apache.drill.exec.planner.physical.WriterPrel;
@@ -33,6 +34,7 @@ public interface PrelVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitWriter(WriterPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
+  public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
 
   public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java
new file mode 100644
index 0000000..d4b3573
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RewriteProjectToFlatten.java
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import net.hydromatic.optiq.tools.RelConversionException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.planner.physical.DrillFlattenPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
+import org.apache.drill.exec.planner.types.RelDataTypeHolder;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelShuttleImpl;
+import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelDataTypeFieldImpl;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.SqlFunction;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.util.NlsString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RewriteProjectToFlatten extends BasePrelVisitor<Prel, Object, 
RelConversionException> {
+
+  RelDataTypeFactory factory;
+  DrillOperatorTable table;
+
+  public RewriteProjectToFlatten(RelDataTypeFactory factory, 
DrillOperatorTable table) {
+    super();
+    this.factory = factory;
+    this.table = table;
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Object value) throws RelConversionException 
{
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      child = child.accept(this, null);
+      children.add(child);
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+
+  @Override
+  public Prel visitProject(ProjectPrel node, Object unused) throws 
RelConversionException {
+    ProjectPrel project = node;
+    List<RexNode> exprList = new ArrayList<>();
+    boolean rewrite = false;
+
+    List<RelDataTypeField> relDataTypes = new ArrayList();
+    int i = 0;
+    RexNode flatttenExpr = null;
+    for (RexNode rex : project.getChildExps()) {
+      RexNode newExpr = rex;
+      if (rex instanceof RexCall) {
+        RexCall function = (RexCall) rex;
+        String functionName = function.getOperator().getName();
+        int nArgs = function.getOperands().size();
+
+        if (functionName.equalsIgnoreCase("flatten") ) {
+          rewrite = true;
+          if (function.getOperands().size() != 1) {
+            throw new RelConversionException("Flatten expression expects a 
single input.");
+          }
+          newExpr = function.getOperands().get(0);
+          RexBuilder builder = new RexBuilder(factory);
+          flatttenExpr = builder.makeInputRef( new RelDataTypeDrillImpl(new 
RelDataTypeHolder(), factory), i);
+        }
+      }
+      relDataTypes.add(project.getRowType().getFieldList().get(i));
+      i++;
+      exprList.add(newExpr);
+    }
+    if (rewrite == true) {
+      // TODO - figure out what is the right setting for the traits
+      Prel newChild = ((Prel)project.getInput(0)).accept(this, null);
+      ProjectPrel newProject = new ProjectPrel(node.getCluster(), 
project.getTraitSet(), newChild, exprList, new RelRecordType(relDataTypes));
+      DrillFlattenPrel flatten = new DrillFlattenPrel(project.getCluster(), 
project.getTraitSet(), newProject, flatttenExpr);
+      return flatten;
+    }
+
+    Prel child = ((Prel)project.getChild()).accept(this, null);
+    return new ProjectPrel(node.getCluster(), project.getTraitSet(), child, 
exprList, new RelRecordType(relDataTypes));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java
new file mode 100644
index 0000000..73242d5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RexVisitorComplexExprSplitter.java
@@ -0,0 +1,119 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.planner.physical.visitor;
+
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
+import org.apache.drill.exec.planner.types.RelDataTypeHolder;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexCorrelVariable;
+import org.eigenbase.rex.RexDynamicParam;
+import org.eigenbase.rex.RexFieldAccess;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexLocalRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.eigenbase.rex.RexRangeRef;
+import org.eigenbase.rex.RexVisitorImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RexVisitorComplexExprSplitter extends RexVisitorImpl<RexNode> {
+
+  RelDataTypeFactory factory;
+  FunctionImplementationRegistry funcReg;
+  List<RexNode> complexExprs;
+  List<ProjectPrel> projects;
+  int lastUsedIndex;
+
+  public RexVisitorComplexExprSplitter(RelDataTypeFactory factory, 
FunctionImplementationRegistry funcReg, int firstUnused) {
+    super(true);
+    this.factory = factory;
+    this.funcReg = funcReg;
+    this.complexExprs = new ArrayList();
+    this.lastUsedIndex = firstUnused;
+  }
+
+  public  List<RexNode> getComplexExprs() {
+    return complexExprs;
+  }
+
+  @Override
+  public RexNode visitInputRef(RexInputRef inputRef) {
+    return inputRef;
+  }
+
+  @Override
+  public RexNode visitLocalRef(RexLocalRef localRef) {
+    return localRef;
+  }
+
+  @Override
+  public RexNode visitLiteral(RexLiteral literal) {
+    return literal;
+  }
+
+  @Override
+  public RexNode visitOver(RexOver over) {
+    return over;
+  }
+
+  @Override
+  public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) {
+    return correlVariable;
+  }
+
+  public RexNode visitCall(RexCall call) {
+
+    String functionName = call.getOperator().getName();
+
+    List<RexNode> newOps = new ArrayList();
+    for (RexNode operand : call.operands) {
+      newOps.add(operand.accept(this));
+    }
+    if (funcReg.isFunctionComplexOutput(functionName) ) {
+      RexBuilder builder = new RexBuilder(factory);
+      RexNode ret = builder.makeInputRef( new RelDataTypeDrillImpl(new 
RelDataTypeHolder(), factory), lastUsedIndex);
+      lastUsedIndex++;
+      complexExprs.add(call.clone(new RelDataTypeDrillImpl(new 
RelDataTypeHolder(),factory), newOps));
+      return ret;
+    }
+    return call.clone(new RelDataTypeDrillImpl(new 
RelDataTypeHolder(),factory), newOps);
+  }
+
+  @Override
+  public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {
+    return dynamicParam;
+  }
+
+  @Override
+  public RexNode visitRangeRef(RexRangeRef rangeRef) {
+    return rangeRef;
+  }
+
+  @Override
+  public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+    return fieldAccess;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
new file mode 100644
index 0000000..f53b228
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SplitUpComplexExpressions.java
@@ -0,0 +1,146 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import net.hydromatic.optiq.tools.RelConversionException;
+
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.physical.DrillFlattenPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
+import org.apache.drill.exec.planner.types.RelDataTypeHolder;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelShuttleImpl;
+import org.apache.drill.exec.planner.sql.DrillOperatorTable;
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelDataTypeFieldImpl;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.SqlFunction;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.type.SqlTypeName;
+import org.eigenbase.util.NlsString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SplitUpComplexExpressions extends BasePrelVisitor<Prel, Object, 
RelConversionException> {
+
+  RelDataTypeFactory factory;
+  DrillOperatorTable table;
+  FunctionImplementationRegistry funcReg;
+
+  public SplitUpComplexExpressions(RelDataTypeFactory factory, 
DrillOperatorTable table, FunctionImplementationRegistry funcReg) {
+    super();
+    this.factory = factory;
+    this.table = table;
+    this.funcReg = funcReg;
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Object value) throws RelConversionException 
{
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      child = child.accept(this, null);
+      children.add(child);
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+
+  @Override
+  public Prel visitProject(ProjectPrel project, Object unused) throws 
RelConversionException {
+
+    List<RexNode> exprList = new ArrayList<>();
+
+    List<RelDataTypeField> relDataTypes = new ArrayList();
+    List<RelDataTypeField> origRelDataTypes = new ArrayList();
+    int i = 0;
+
+    ProjectPushInfo columnInfo = 
PrelUtil.getColumns(project.getInput(0).getRowType(), project.getProjects());
+
+    List<RexNode> newProjects = Lists.newArrayList();
+    if (columnInfo == null ) {
+      return project;
+    }
+    int lastRexInput = columnInfo.columns.size();
+    RexVisitorComplexExprSplitter exprSplitter = new 
RexVisitorComplexExprSplitter(factory, funcReg, lastRexInput);
+
+    for (RexNode n : project.getChildExps()) {
+      newProjects.add(n.accept(columnInfo.getInputRewriter()));
+    }
+
+    for (RexNode rex : project.getChildExps()) {
+      origRelDataTypes.add(project.getRowType().getFieldList().get(i));
+      i++;
+      exprList.add(rex.accept(exprSplitter));
+    }
+    List<RexNode> complexExprs = exprSplitter.getComplexExprs();
+
+    RelNode originalInput = project.getInput(0);
+    ProjectPrel childProject;
+
+    List<RexNode> allExprs = new ArrayList();
+    for (int index = 0; index < lastRexInput; index++) {
+      RexBuilder builder = new RexBuilder(factory);
+      allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new 
RelDataTypeHolder(), factory), index));
+      relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, 
allExprs.size(), factory.createSqlType(SqlTypeName.ANY) ));
+    }
+    RexNode currRexNode;
+    int index = lastRexInput - 1;
+
+    // if the projection expressions contained complex outputs, split them 
into their own individual projects
+    if (complexExprs.size() > 0 ) {
+      while (complexExprs.size() > 0) {
+        if ( index >= lastRexInput ) {
+          allExprs.remove(allExprs.size() - 1);
+          RexBuilder builder = new RexBuilder(factory);
+          allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new 
RelDataTypeHolder(), factory), index));
+        }
+        index++;
+        currRexNode = complexExprs.remove(0);
+        allExprs.add(currRexNode);
+        relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, 
allExprs.size(), factory.createSqlType(SqlTypeName.ANY) ));
+        childProject = new ProjectPrel(project.getCluster(), 
project.getTraitSet(), originalInput, ImmutableList.copyOf(allExprs), new 
RelRecordType(relDataTypes));
+        originalInput = childProject;
+      }
+      // copied from above, find a better way to do this
+      allExprs.remove(allExprs.size() - 1);
+      RexBuilder builder = new RexBuilder(factory);
+      allExprs.add(builder.makeInputRef( new RelDataTypeDrillImpl(new 
RelDataTypeHolder(), factory), index));
+      relDataTypes.add(new RelDataTypeFieldImpl("EXPR$" + index, 
allExprs.size(), factory.createSqlType(SqlTypeName.ANY) ));
+    }
+    return new ProjectPrel(project.getCluster(), project.getTraitSet(), 
originalInput, exprList, new RelRecordType(origRelDataTypes));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0bb59bf..ca444a3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
 import org.apache.drill.exec.planner.logical.RewriteProjectRel;
+import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -52,6 +53,7 @@ import 
org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
 import 
org.apache.drill.exec.planner.physical.visitor.ProducerConsumerPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import 
org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
+import 
org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
 import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -194,6 +196,13 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      */
     phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
 
+    // TODO - re-enable once execution issues are resolved. This rule allows 
for several flatten operations to appear
+    // within a single query, it also breaks up all expressions with complex 
outputs into their own project operations.
+    // It currently appears to be producing good plans, but for the flatten 
case it is revealing execution errors in the
+    // project operator.
+//    phyRelNode = ((Prel) phyRelNode).accept(new 
SplitUpComplexExpressions(planner.getTypeFactory(), 
context.getDrillOperatorTable(), 
context.getPlannerSettings().functionImplementationRegistry), null);
+    phyRelNode = ((Prel) phyRelNode).accept(new 
RewriteProjectToFlatten(planner.getTypeFactory(), 
context.getDrillOperatorTable()), null);
+    // Definitely before this one
     /*
      * 2.)
      * Since our operators work via names rather than indices, we have to make 
to reorder any

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
index a28caf1..aadc563 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.vector;
 
 import io.netty.buffer.DrillBuf;
 
-public interface RepeatedFixedWidthVector extends ValueVector{
+public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector {
   /**
    * Allocate a new memory space for this vector.  Must be called prior to 
using the ValueVector.
    *
@@ -37,12 +37,13 @@ public interface RepeatedFixedWidthVector extends 
ValueVector{
    */
   public int load(int parentValueCount, int childValueCount, DrillBuf buf);
 
-  public abstract RepeatedAccessor getAccessor();
-
   public abstract RepeatedMutator getMutator();
 
   public interface RepeatedAccessor extends Accessor {
     public int getGroupCount();
+    public int getValueCount();
+    public int getGroupSizeAtIndex(int index);
+    public ValueVector getAllChildValues();
   }
   public interface RepeatedMutator extends Mutator {
     public void setValueCounts(int parentValueCount, int childValueCount);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
index 2e7689c..a499341 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.vector;
 
 import io.netty.buffer.DrillBuf;
 
-public interface RepeatedVariableWidthVector extends ValueVector{
+public interface RepeatedVariableWidthVector extends ValueVector, 
RepeatedVector {
   /**
    * Allocate a new memory space for this vector.  Must be called prior to 
using the ValueVector.
    *
@@ -35,8 +35,6 @@ public interface RepeatedVariableWidthVector extends 
ValueVector{
    */
   public int getByteCapacity();
 
-  public abstract RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
-
   /**
    * Load the records in the provided buffer based on the given number of 
values.
    * @param dataBytes   The number of bytes associated with the data array.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
new file mode 100644
index 0000000..b23ee02
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import io.netty.buffer.DrillBuf;
+
+public interface RepeatedVector {
+
+  public RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 9870dd5..c75b359 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -202,9 +202,22 @@ public class RepeatedListVector extends 
AbstractContainerVector implements Repea
       return l;
     }
 
+    public int getGroupSizeAtIndex(int index) {
+      return offsets.getAccessor().get(index+1) - 
offsets.getAccessor().get(index);
+    }
+
+    @Override
+    public ValueVector getAllChildValues() {
+      return vector;
+    }
+
     @Override
     public int getValueCount() {
-      return offsets.getAccessor().getValueCount() - 1;
+//      if (offsets.getAccessor().getValueCount() == 0 ) {
+//        return 0;
+//      } else {
+        return offsets.getAccessor().get(offsets.getAccessor().getValueCount() 
- 1);
+//      }
     }
 
     public void get(int index, RepeatedListHolder holder) {
@@ -249,7 +262,7 @@ public class RepeatedListVector extends 
AbstractContainerVector implements Repea
 
     @Override
     public int getGroupCount() {
-      return size();
+      return offsets.getAccessor().getValueCount() - 1;
     }
   }
 
@@ -402,7 +415,7 @@ public class RepeatedListVector extends 
AbstractContainerVector implements Repea
     return getField() //
         .getAsBuilder() //
         .setBufferLength(getBufferSize()) //
-        .setValueCount(accessor.getValueCount()) //
+        .setValueCount(accessor.getGroupCount()) //
         .addChild(vector.getMetadata()) //
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 2612924..beb2475 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
@@ -57,7 +58,7 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
 
   public final static MajorType TYPE = 
MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
 
-  private final UInt4Vector offsets;   // offsets to start of each record
+  private final UInt4Vector offsets;   // offsets to start of each record 
(considering record indices are 0-indexed)
   private final Map<String, ValueVector> vectors = Maps.newLinkedHashMap();
   private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
   private final RepeatedMapReaderImpl reader = new 
RepeatedMapReaderImpl(RepeatedMapVector.this);
@@ -66,7 +67,7 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
   private final Mutator mutator = new Mutator();
   private final BufferAllocator allocator;
   private final MaterializedField field;
-  private int lastSet = -1;
+  private int lastPopulatedValueIndex = -1;
 
   public RepeatedMapVector(MaterializedField field, BufferAllocator allocator) 
{
     this.field = field;
@@ -76,12 +77,12 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
   }
 
   @Override
-  public void allocateNew(int parentValueCount, int childValueCount) {
+  public void allocateNew(int topLevelValueCount, int childValueCount) {
     clear();
-    offsets.allocateNew(parentValueCount+1);
+    offsets.allocateNew(topLevelValueCount+1);
     offsets.zeroVector();
     for (ValueVector v : vectors.values()) {
-      AllocationHelper.allocatePrecomputedChildCount(v, parentValueCount, 50, 
childValueCount);
+      AllocationHelper.allocatePrecomputedChildCount(v, topLevelValueCount, 
50, childValueCount);
     }
     mutator.reset();
     accessor.reset();
@@ -204,6 +205,10 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
     }
   }
 
+  public TransferPair getTransferPairToSingleMap() {
+    return new SingleMapTransferPair(field.getPath());
+  }
+
   @Override
   public TransferPair getTransferPair(FieldReference ref) {
     return new MapTransferPair(ref);
@@ -230,6 +235,74 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
     return true;
   }
 
+  private class SingleMapTransferPair implements TransferPair{
+    private RepeatedMapVector from = RepeatedMapVector.this;
+    private TransferPair[] pairs;
+    private MapVector to;
+
+    public SingleMapTransferPair(SchemaPath path) {
+
+      MaterializedField mf = MaterializedField.create(path, 
Types.required(field.getType().getMinorType()));
+      MapVector v = new MapVector(mf, allocator);
+      pairs = new TransferPair[vectors.size()];
+      int i =0;
+      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
+        TransferPair otherSide = e.getValue().getTransferPair();
+        v.put(e.getKey(), otherSide.getTo());
+        pairs[i++] = otherSide;
+      }
+      this.to = v;
+    }
+
+    public SingleMapTransferPair(MapVector to) {
+      this.to = to;
+      pairs = new TransferPair[vectors.size()];
+      int i =0;
+      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
+        int preSize = to.vectors.size();
+        ValueVector v = to.addOrGet(e.getKey(), 
e.getValue().getField().getType(), e.getValue().getClass());
+        if (to.vectors.size() != preSize) {
+          v.allocateNew();
+        }
+        pairs[i++] = e.getValue().makeTransferPair(v);
+      }
+    }
+
+
+    @Override
+    public void transfer() {
+      for (TransferPair p : pairs) {
+        p.transfer();
+      }
+      to.getMutator().setValueCount(from.getAccessor().getValueCount());
+      clear();
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public boolean copyValueSafe(int from, int to) {
+      for (TransferPair p : pairs) {
+        if (!p.copyValueSafe(from, to)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      for (TransferPair p : pairs) {
+        p.splitAndTransfer(startIndex, length);
+      }
+      to.getMutator().setValueCount(length);
+    }
+
+  }
+
   private class MapTransferPair implements TransferPair{
 
     private final TransferPair[] pairs;
@@ -278,10 +351,10 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
     }
 
     @Override
-    public boolean copyValueSafe(int from, int to) {
+    public boolean copyValueSafe(int srcIndex, int destIndex) {
       RepeatedMapHolder holder = new RepeatedMapHolder();
-      accessor.get(from, holder);
-      int newIndex = this.to.offsets.getAccessor().get(to);
+      accessor.get(srcIndex, holder);
+      int newIndex = to.offsets.getAccessor().get(destIndex);
       //todo: make these bulk copies
       for (int i = holder.start; i < holder.end; i++, newIndex++) {
         for (TransferPair p : pairs) {
@@ -290,10 +363,10 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
           }
         }
       }
-      if (!this.to.offsets.getMutator().setSafe(to+1, newIndex)) {
+      if (!to.offsets.getMutator().setSafe(destIndex+1, newIndex)) {
         return false;
       }
-      this.to.lastSet++;
+      to.lastPopulatedValueIndex = destIndex;
       return true;
     }
 
@@ -413,6 +486,15 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
       return offsets.getAccessor().getValueCount() - 1;
     }
 
+    public int getGroupSizeAtIndex(int index) {
+      return offsets.getAccessor().get(index+1) - 
offsets.getAccessor().get(index);
+    }
+
+    @Override
+    public ValueVector getAllChildValues() {
+      throw new UnsupportedOperationException("Cannot retrieve inner vector 
from repeated map.");
+    }
+
     public void get(int index, RepeatedMapHolder holder) {
       assert index < getValueCapacity()-1;
       holder.start = offsets.getAccessor().get(index);
@@ -458,44 +540,43 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
     }
   }
 
-  private void populateEmpties(int groupCount) {
-    int previousEnd = offsets.getAccessor().get(lastSet + 1);
-    for (int i = lastSet + 2; i <= groupCount; i++) {
-      offsets.getMutator().setSafe(i, previousEnd);
+  private void populateEmpties(int topLevelValueCount) {
+    int previousEnd = offsets.getAccessor().get(lastPopulatedValueIndex + 1);
+    for (int i = lastPopulatedValueIndex + 1; i < topLevelValueCount; i++) {
+      offsets.getMutator().setSafe(i+1, previousEnd);
     }
-    lastSet = groupCount - 1;
+    lastPopulatedValueIndex = topLevelValueCount - 1;
   }
 
   public class Mutator implements ValueVector.Mutator, RepeatedMutator {
 
     public void startNewGroup(int index) {
-      populateEmpties(index);
-      lastSet = index;
+      populateEmpties(index+1);
       offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
     }
 
     public int add(int index) {
-      int nextOffset = offsets.getAccessor().get(index+1);
-      boolean success = offsets.getMutator().setSafe(index+1, nextOffset+1);
+      int prevEnd = offsets.getAccessor().get(index+1);
+      boolean success = offsets.getMutator().setSafe(index+1, prevEnd+1);
       if (!success) {
         return -1;
       }
-      return nextOffset;
+      return prevEnd;
     }
 
-    @Override
-    public void setValueCount(int groupCount) {
-      populateEmpties(groupCount);
-      offsets.getMutator().setValueCount(groupCount+1);
-      int valueCount = offsets.getAccessor().get(groupCount);
+    public void setValueCount(int topLevelValueCount) {
+      populateEmpties(topLevelValueCount);
+      offsets.getMutator().setValueCount(topLevelValueCount+1);
+      int childValueCount = offsets.getAccessor().get(topLevelValueCount);
       for (ValueVector v : vectors.values()) {
-        v.getMutator().setValueCount(valueCount);
+        v.getMutator().setValueCount(childValueCount);
       }
     }
 
     @Override
     public void reset() {
-      lastSet = 0;
+      // the last non empty element index starts from -1
+      lastPopulatedValueIndex = -1;
     }
 
     @Override
@@ -521,7 +602,7 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
 
   @Override
   public void clear() {
-    lastSet = 0;
+    getMutator().reset();
     offsets.clear();
     for(ValueVector v : vectors.values()) {
       v.clear();;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2871c3ba/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
index 7aa9846..5d85d0a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
@@ -49,7 +49,8 @@ abstract class AbstractBaseWriter implements FieldWriter{
   int idx(){
     return index;
   }
-  protected void resetState(){
+
+  public void resetState(){
     state.reset();
   }
 

Reply via email to