DRILL-836: Drill needs to return complex types (e.g., map and array) as a JSON 
string


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fc00bc4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fc00bc4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fc00bc4b

Branch: refs/heads/master
Commit: fc00bc4bc20f34b24df43a30c1340e5f1f96de54
Parents: 5da52cb
Author: Aditya Kishore <[email protected]>
Authored: Thu Jun 12 14:50:28 2014 -0700
Committer: Aditya Kishore <[email protected]>
Committed: Fri Jun 13 03:49:30 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/config/Flatten.java     |  71 ++++++++++++
 .../impl/project/FlattenBatchCreator.java       |  42 +++++++
 .../impl/project/ProjectRecordBatch.java        |  26 ++++-
 .../exec/planner/physical/FlattenPrel.java      |  62 ++++++++++
 .../physical/visitor/FlattenPrelVisitor.java    |  40 +++++++
 .../planner/sql/handlers/DefaultSqlHandler.java |  45 ++++++--
 .../apache/drill/exec/rpc/user/UserClient.java  |   6 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |  29 +++--
 .../apache/drill/exec/rpc/user/UserSession.java |  11 +-
 .../java/org/apache/drill/PlanningBase.java     |   4 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 .../drill/exec/proto/SchemaUserProtos.java      |   7 ++
 .../apache/drill/exec/proto/UserBitShared.java  |  15 ++-
 .../org/apache/drill/exec/proto/UserProtos.java | 114 ++++++++++++++++---
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 .../exec/proto/beans/UserToBitHandshake.java    |  23 ++++
 protocol/src/main/protobuf/User.proto           |  49 ++++----
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 18 files changed, 471 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
new file mode 100644
index 0000000..2bcaab3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("flatten")
+public class Flatten extends AbstractSingle {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Flatten.class);
+
+  @JsonCreator
+  public Flatten(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E{
+    return physicalVisitor.visitOp(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
+  }
+
+  @Override
+  public Size getSize() {
+    return child.getSize();
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Flatten(child);
+  }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return child.getSVMode();
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.FLATTEN_VALUE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
new file mode 100644
index 0000000..9bea73c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Flatten;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class FlattenBatchCreator implements BatchCreator<Flatten> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Flatten flatten, 
List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new ProjectRecordBatch(new Project(null, flatten.getChild()),
+                                  children.iterator().next(),
+                                  context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 93cd19d..61c256b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -18,17 +18,20 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 
+import org.apache.drill.common.expression.ConvertExpression;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCallFactory;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -48,7 +51,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
@@ -170,7 +172,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
   protected void setupNewSchema() throws SchemaChangeException{
     this.allocationVectors = Lists.newArrayList();
     container.clear();
-    final List<NamedExpression> exprs = popConfig.getExprs();
+    final List<NamedExpression> exprs = getExpressionList();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
 
@@ -250,5 +252,23 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
     }
   }
 
+  private List<NamedExpression> getExpressionList() {
+    if (popConfig.getExprs() != null) {
+      return popConfig.getExprs();
+    }
+
+    List<NamedExpression> exprs = Lists.newArrayList();
+    for (MaterializedField field : incoming.getSchema()) {
+      if (Types.isComplex(field.getType())) {
+        exprs.add(new NamedExpression(
+            FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, 
"JSON", field.getPath(), ExpressionPosition.UNKNOWN),
+            new FieldReference(field.getPath()))
+            );
+      } else {
+        exprs.add(new NamedExpression(field.getPath(), new 
FieldReference(field.getPath())));
+      }
+    }
+    return exprs;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
new file mode 100644
index 0000000..f541422
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Flatten;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.SingleRel;
+
+public class FlattenPrel extends SingleRel implements Prel {
+
+  public FlattenPrel(Prel phyRelNode) {
+    super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+    Flatten p = new Flatten(((Prel) getChild()).getPhysicalOperator(creator));
+    p.setOperatorId(creator.getOperatorId(this));
+    return p;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
new file mode 100644
index 0000000..5892782
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import java.util.Collections;
+
+import org.apache.drill.exec.planner.physical.FlattenPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.eigenbase.rel.RelNode;
+
+public class FlattenPrelVisitor extends BasePrelVisitor<Prel, Void, 
RuntimeException> {
+
+  private static final FlattenPrelVisitor INSTANCE = new FlattenPrelVisitor();
+
+  public static Prel addFlattenPrel(Prel prel) {
+    return prel.accept(INSTANCE, null);
+  }
+
+  @Override
+  public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException 
{
+    return prel.copy(prel.getTraitSet(), 
Collections.singletonList((RelNode)new FlattenPrel((Prel)prel.getChild())));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 883b039..aca0ae6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -41,6 +41,7 @@ import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
+import org.apache.drill.exec.planner.physical.visitor.FlattenPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
 import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
@@ -140,22 +141,46 @@ public class DefaultSqlHandler extends AbstractSqlHandler 
{
     RelTraitSet traits = 
drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
     Prel phyRelNode = (Prel) 
planner.transform(DrillSqlWorker.PHYSICAL_MEM_RULES, traits, drel);
 
-    // Join might cause naming conflicts from its left and right child.
-    // In such case, we have to insert Project to rename the conflicting names.
+    /*  The order of the following transformation is important */
+
+    /*
+     * 1.)
+     * Join might cause naming conflicts from its left and right child.
+     * In such case, we have to insert Project to rename the conflicting names.
+     */
     phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
 
-    // Since our operators work via names rather than indices, we have to make 
to reorder any output
-    // before we return data to the user as we may have accindentally shuffled 
things.  This adds
-    // a trivial project to reorder columns prior to output.
+    /*
+     * 2.)
+     * Since our operators work via names rather than indices, we have to make 
to reorder any
+     * output before we return data to the user as we may have accidentally 
shuffled things.
+     * This adds a trivial project to reorder columns prior to output.
+     */
     phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode);
 
-    // Make sure that the no rels are repeats.  This could happen in the case 
of querying the same table twice as Optiq may canonicalize these.
+    /* 3.)
+     * Next, we add any required selection vector removers given the supported 
encodings of each
+     * operator. This will ultimately move to a new trait but we're managing 
here for now to avoid
+     * introducing new issues in planning before the next release
+     */
+    phyRelNode = 
SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
+
+    /* 4.)
+     * if the client does not support complex types (Map, Repeated)
+     * insert a project which which would convert
+     */
+    if (!context.getSession().isSupportComplexTypes()) {
+      logger.debug("Client does not support complex types, add Flatten 
operator.");
+      phyRelNode = FlattenPrelVisitor.addFlattenPrel(phyRelNode);
+    }
+
+    /* 5.)
+     * Finally, Make sure that the no rels are repeats.
+     * This could happen in the case of querying the same table twice as Optiq 
may canonicalize these.
+     */
     phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode);
 
-    // the last thing we do is add any required selection vector removers 
given the supported encodings of each
-    // operator. This will ultimately move to a new trait but we're managing 
here for now to avoid introducing new
-    // issues in planning before the next release
-    return 
SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
+    return phyRelNode;
   }
 
   protected PhysicalOperator convertToPop(Prel prel) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 0e3cc6b..277bb0c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -56,10 +56,12 @@ public class UserClient extends 
BasicClientWithConnection<RpcType, UserToBitHand
       throws RpcException, InterruptedException {
     UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
-        .setSupportListening(true);
+        .setSupportListening(true)
+        .setSupportComplexTypes(true);
 
-    if (props != null)
+    if (props != null) {
       hsBuilder.setProperties(props);
+    }
 
     this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), 
endpoint.getUserPort());
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 2a4c2cc..e96ba6c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -27,12 +27,10 @@ import java.io.IOException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.BasicServer;
@@ -61,7 +59,7 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
 
   @Override
   protected MessageLite getResponseDefaultInstance(int rpcType) throws 
RpcException {
-    // a user server only expects acknowledgements on messages it creates.
+    // a user server only expects acknowledgments on messages it creates.
     switch (rpcType) {
     case RpcType.ACK_VALUE:
       return Ack.getDefaultInstance();
@@ -76,7 +74,7 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
     switch (rpcType) {
 
     case RpcType.RUN_QUERY_VALUE:
-      // logger.debug("Received query to run.  Returning query handle.");
+      logger.trace("Received query to run.  Returning query handle.");
       try {
         RunQuery query = RunQuery.PARSER.parseFrom(new 
ByteBufInputStream(pBody));
         return new Response(RpcType.QUERY_HANDLE, 
worker.submitWork(connection, query));
@@ -85,7 +83,7 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       }
 
     case RpcType.REQUEST_RESULTS_VALUE:
-      // logger.debug("Received results requests.  Returning empty query 
result.");
+      logger.trace("Received results requests.  Returning empty query 
result.");
       try {
         RequestResults req = RequestResults.PARSER.parseFrom(new 
ByteBufInputStream(pBody));
         return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, 
req));
@@ -112,8 +110,9 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       super(channel);
     }
 
-    void setUser(UserCredentials credentials, UserProperties props) throws 
IOException{
-      session = new UserSession(worker.getSystemOptions(), credentials, props);
+    void setUser(UserToBitHandshake inbound) throws IOException {
+      session = new UserSession(worker.getSystemOptions(), 
inbound.getCredentials(), inbound.getProperties());
+      session.setSupportComplexTypes(inbound.getSupportComplexTypes());
     }
 
     public UserSession getSession(){
@@ -121,7 +120,7 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
     }
 
     public void sendResult(RpcOutcomeListener<Ack> listener, 
QueryWritableBatch result){
-//      logger.debug("Sending result to client with {}", result);
+      logger.trace("Sending result to client with {}", result);
       send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), 
Ack.class, result.getBuffers());
     }
 
@@ -130,7 +129,6 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       return alloc;
     }
 
-
   }
 
   @Override
@@ -140,16 +138,23 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
 
   @Override
   protected ServerHandshakeHandler<UserToBitHandshake> 
getHandshakeHandler(final UserClientConnection connection) {
+
     return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, 
UserToBitHandshake.PARSER){
 
       @Override
       public MessageLite getHandshakeResponse(UserToBitHandshake inbound) 
throws Exception {
-//        logger.debug("Handling handshake from user to bit. {}", inbound);
-        if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new 
RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", 
inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
-        connection.setUser(inbound.getCredentials(), inbound.getProperties());
+        logger.trace("Handling handshake from user to bit. {}", inbound);
+        if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) {
+          throw new RpcException(String.format("Invalid rpc version. Expected 
%d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
+        }
+
+        connection.setUser(inbound);
+
         return 
BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build();
       }
+
     };
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 6ecffaf..18e365e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.rpc.user;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 
 import net.hydromatic.optiq.SchemaPlus;
@@ -38,6 +37,7 @@ public class UserSession {
 
   private DrillUser user;
   private boolean enableExchanges = true;
+  private boolean supportComplexTypes = false;
   private UserCredentials credentials;
   private Map<String, String> properties;
   private OptionManager options;
@@ -107,4 +107,13 @@ public class UserSession {
     return schema;
   }
 
+  public boolean isSupportComplexTypes() {
+    return supportComplexTypes;
+  }
+
+  public UserSession setSupportComplexTypes(boolean supportComplexType) {
+    this.supportComplexTypes = supportComplexType;
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java 
b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index ad114ab..a819453 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -100,7 +100,7 @@ public class PlanningBase extends ExecTest{
     registry.init();
     final FunctionImplementationRegistry functionRegistry = new 
FunctionImplementationRegistry(config);
     final SchemaPlus root = Frameworks.createRootSchema(false);
-    registry.getSchemaFactory().registerSchemas(new UserSession(null, null, 
null), root);
+    registry.getSchemaFactory().registerSchemas(new UserSession(null, null, 
null).setSupportComplexTypes(true), root);
 
 
 
@@ -113,7 +113,7 @@ public class PlanningBase extends ExecTest{
         context.getFunctionRegistry();
         result = functionRegistry;
         context.getSession();
-        result = new UserSession(null, null, null);
+        result = new UserSession(null, null, 
null).setSupportComplexTypes(true);
         context.getCurrentEndpoint();
         result = DrillbitEndpoint.getDefaultInstance();
         context.getActiveEndpoints();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 199ecfc..f6200f0 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -110,7 +110,7 @@ public class TestOptiqPlans extends ExecTest {
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
     DrillbitContext bitContext = new 
DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
-    QueryContext qc = new QueryContext(new UserSession(null, null, null), 
QueryId.getDefaultInstance(), bitContext);
+    QueryContext qc = new QueryContext(new UserSession(null, null, 
null).setSupportComplexTypes(true), QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = 
reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), 
Charsets.UTF_8));
     PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, 
connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
index c984876..3b056cf 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserProtos.java
@@ -278,6 +278,8 @@ public final class SchemaUserProtos
                 if(message.hasProperties())
                     output.writeObject(5, message.getProperties(), 
org.apache.drill.exec.proto.SchemaUserProtos.UserProperties.WRITE, false);
 
+                if(message.hasSupportComplexTypes())
+                    output.writeBool(6, message.getSupportComplexTypes(), 
false);
             }
             public boolean 
isInitialized(org.apache.drill.exec.proto.UserProtos.UserToBitHandshake message)
             {
@@ -334,6 +336,9 @@ public final class SchemaUserProtos
                             
builder.setProperties(input.mergeObject(org.apache.drill.exec.proto.UserProtos.UserProperties.newBuilder(),
 org.apache.drill.exec.proto.SchemaUserProtos.UserProperties.MERGE));
 
                             break;
+                        case 6:
+                            builder.setSupportComplexTypes(input.readBool());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -379,6 +384,7 @@ public final class SchemaUserProtos
                 case 3: return "rpcVersion";
                 case 4: return "credentials";
                 case 5: return "properties";
+                case 6: return "supportComplexTypes";
                 default: return null;
             }
         }
@@ -395,6 +401,7 @@ public final class SchemaUserProtos
             fieldMap.put("rpcVersion", 3);
             fieldMap.put("credentials", 4);
             fieldMap.put("properties", 5);
+            fieldMap.put("supportComplexTypes", 6);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 105281a..c100968 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -454,6 +454,10 @@ public final class UserBitShared {
      * <code>INFO_SCHEMA_SUB_SCAN = 30;</code>
      */
     INFO_SCHEMA_SUB_SCAN(30, 30),
+    /**
+     * <code>FLATTEN = 31;</code>
+     */
+    FLATTEN(31, 31),
     ;
 
     /**
@@ -580,6 +584,10 @@ public final class UserBitShared {
      * <code>INFO_SCHEMA_SUB_SCAN = 30;</code>
      */
     public static final int INFO_SCHEMA_SUB_SCAN_VALUE = 30;
+    /**
+     * <code>FLATTEN = 31;</code>
+     */
+    public static final int FLATTEN_VALUE = 31;
 
 
     public final int getNumber() { return value; }
@@ -617,6 +625,7 @@ public final class UserBitShared {
         case 28: return TEXT_SUB_SCAN;
         case 29: return JSON_SUB_SCAN;
         case 30: return INFO_SCHEMA_SUB_SCAN;
+        case 31: return FLATTEN;
         default: return null;
       }
     }
@@ -16439,7 +16448,7 @@ public final class UserBitShared {
       
"*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010P"
 +
       "HYSICAL\020\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022" +
       
"\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n",
-      
"\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\345"
 +
+      
"\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\362"
 +
       "\004\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024" +
       
"\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH" +
       "_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOI" +
@@ -16455,8 +16464,8 @@ public final class UserBitShared {
       
"K_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRE" +
       "CT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_S" 
+
       "UB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCH" 
+
-      "EMA_SUB_SCAN\020\036B.\n\033org.apache.drill.exec." +
-      "protoB\rUserBitSharedH\001"
+      "EMA_SUB_SCAN\020\036\022\013\n\007FLATTEN\020\037B.\n\033org.apach" +
+      "e.drill.exec.protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
index d9f4c20..048bd20 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
@@ -1669,6 +1669,16 @@ public final class UserProtos {
      * <code>optional .exec.user.UserProperties properties = 5;</code>
      */
     org.apache.drill.exec.proto.UserProtos.UserPropertiesOrBuilder 
getPropertiesOrBuilder();
+
+    // optional bool support_complex_types = 6 [default = false];
+    /**
+     * <code>optional bool support_complex_types = 6 [default = false];</code>
+     */
+    boolean hasSupportComplexTypes();
+    /**
+     * <code>optional bool support_complex_types = 6 [default = false];</code>
+     */
+    boolean getSupportComplexTypes();
   }
   /**
    * Protobuf type {@code exec.user.UserToBitHandshake}
@@ -1768,6 +1778,11 @@ public final class UserProtos {
               bitField0_ |= 0x00000010;
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              supportComplexTypes_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1900,12 +1915,29 @@ public final class UserProtos {
       return properties_;
     }
 
+    // optional bool support_complex_types = 6 [default = false];
+    public static final int SUPPORT_COMPLEX_TYPES_FIELD_NUMBER = 6;
+    private boolean supportComplexTypes_;
+    /**
+     * <code>optional bool support_complex_types = 6 [default = false];</code>
+     */
+    public boolean hasSupportComplexTypes() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional bool support_complex_types = 6 [default = false];</code>
+     */
+    public boolean getSupportComplexTypes() {
+      return supportComplexTypes_;
+    }
+
     private void initFields() {
       channel_ = org.apache.drill.exec.proto.UserBitShared.RpcChannel.USER;
       supportListening_ = false;
       rpcVersion_ = 0;
       credentials_ = 
org.apache.drill.exec.proto.UserBitShared.UserCredentials.getDefaultInstance();
       properties_ = 
org.apache.drill.exec.proto.UserProtos.UserProperties.getDefaultInstance();
+      supportComplexTypes_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1940,6 +1972,9 @@ public final class UserProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeMessage(5, properties_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(6, supportComplexTypes_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1969,6 +2004,10 @@ public final class UserProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, properties_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(6, supportComplexTypes_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -2105,6 +2144,8 @@ public final class UserProtos {
           propertiesBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        supportComplexTypes_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -2161,6 +2202,10 @@ public final class UserProtos {
         } else {
           result.properties_ = propertiesBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.supportComplexTypes_ = supportComplexTypes_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -2192,6 +2237,9 @@ public final class UserProtos {
         if (other.hasProperties()) {
           mergeProperties(other.getProperties());
         }
+        if (other.hasSupportComplexTypes()) {
+          setSupportComplexTypes(other.getSupportComplexTypes());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2561,6 +2609,39 @@ public final class UserProtos {
         return propertiesBuilder_;
       }
 
+      // optional bool support_complex_types = 6 [default = false];
+      private boolean supportComplexTypes_ ;
+      /**
+       * <code>optional bool support_complex_types = 6 [default = 
false];</code>
+       */
+      public boolean hasSupportComplexTypes() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional bool support_complex_types = 6 [default = 
false];</code>
+       */
+      public boolean getSupportComplexTypes() {
+        return supportComplexTypes_;
+      }
+      /**
+       * <code>optional bool support_complex_types = 6 [default = 
false];</code>
+       */
+      public Builder setSupportComplexTypes(boolean value) {
+        bitField0_ |= 0x00000020;
+        supportComplexTypes_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool support_complex_types = 6 [default = 
false];</code>
+       */
+      public Builder clearSupportComplexTypes() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        supportComplexTypes_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.user.UserToBitHandshake)
     }
 
@@ -4250,25 +4331,26 @@ public final class UserProtos {
       "\032\023UserBitShared.proto\"&\n\010Property\022\013\n\003key" +
       "\030\001 \002(\t\022\r\n\005value\030\002 
\002(\t\"9\n\016UserProperties\022" +
       "\'\n\nproperties\030\001 \003(\0132\023.exec.user.Property" +
-      "\"\326\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" 
+
+      "\"\374\001\n\022UserToBitHandshake\022.\n\007channel\030\001 \001(\016" 
+
       "2\027.exec.shared.RpcChannel:\004USER\022\031\n\021suppo" +
       "rt_listening\030\002 \001(\010\022\023\n\013rpc_version\030\003 
\001(\005\022" +
       "1\n\013credentials\030\004 \001(\0132\034.exec.shared.UserC" +
       "redentials\022-\n\nproperties\030\005 \001(\0132\031.exec.us" +
-      "er.UserProperties\"S\n\016RequestResults\022&\n\010q",
-      "uery_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021m" +
-      "aximum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014re" +
-      "sults_mode\030\001 \001(\0162\033.exec.user.QueryResult" +
-      "sMode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryT" +
-      "ype\022\014\n\004plan\030\003 \001(\t\")\n\022BitToUserHandshake\022" +
-      "\023\n\013rpc_version\030\002 
\001(\005*\270\001\n\007RpcType\022\r\n\tHAND" +
-      
"SHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUE"
 +
-      
"RY\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULT" +
-      
"S\020\005\022\020\n\014QUERY_RESULT\020\006\022\020\n\014QUERY_HANDLE\020\007\022"
 +
-      "\026\n\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION",
-      "_LIST\020\t*#\n\020QueryResultsMode\022\017\n\013STREAM_FU" +
-      "LL\020\001B+\n\033org.apache.drill.exec.protoB\nUse" +
-      "rProtosH\001"
+      "er.UserProperties\022$\n\025support_complex_typ",
+      "es\030\006 \001(\010:\005false\"S\n\016RequestResults\022&\n\010que" +
+      "ry_id\030\001 \001(\0132\024.exec.shared.QueryId\022\031\n\021max" +
+      "imum_responses\030\002 \001(\005\"q\n\010RunQuery\0221\n\014resu" +
+      "lts_mode\030\001 \001(\0162\033.exec.user.QueryResultsM" +
+      "ode\022$\n\004type\030\002 \001(\0162\026.exec.shared.QueryTyp" +
+      "e\022\014\n\004plan\030\003 
\001(\t\")\n\022BitToUserHandshake\022\023\n" +
+      "\013rpc_version\030\002 
\001(\005*\270\001\n\007RpcType\022\r\n\tHANDSH" +
+      
"AKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\r\n\tRUN_QUERY"
 +
+      
"\020\003\022\020\n\014CANCEL_QUERY\020\004\022\023\n\017REQUEST_RESULTS\020" +
+      
"\005\022\020\n\014QUERY_RESULT\020\006\022\020\n\014QUERY_HANDLE\020\007\022\026\n",
+      "\022REQ_META_FUNCTIONS\020\010\022\026\n\022RESP_FUNCTION_L" +
+      "IST\020\t*#\n\020QueryResultsMode\022\017\n\013STREAM_FULL" +
+      "\020\001B+\n\033org.apache.drill.exec.protoB\nUserP" +
+      "rotosH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4292,7 +4374,7 @@ public final class UserProtos {
           internal_static_exec_user_UserToBitHandshake_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_user_UserToBitHandshake_descriptor,
-              new java.lang.String[] { "Channel", "SupportListening", 
"RpcVersion", "Credentials", "Properties", });
+              new java.lang.String[] { "Channel", "SupportListening", 
"RpcVersion", "Credentials", "Properties", "SupportComplexTypes", });
           internal_static_exec_user_RequestResults_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_exec_user_RequestResults_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 3690625..abd7b78 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -52,7 +52,8 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
     TEXT_WRITER(27),
     TEXT_SUB_SCAN(28),
     JSON_SUB_SCAN(29),
-    INFO_SCHEMA_SUB_SCAN(30);
+    INFO_SCHEMA_SUB_SCAN(30),
+    FLATTEN(31);
     
     public final int number;
     
@@ -101,6 +102,7 @@ public enum CoreOperatorType implements 
com.dyuproject.protostuff.EnumLite<CoreO
             case 28: return TEXT_SUB_SCAN;
             case 29: return JSON_SUB_SCAN;
             case 30: return INFO_SCHEMA_SUB_SCAN;
+            case 31: return FLATTEN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
----------------------------------------------------------------------
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
index 70235b1..67ac4e5 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/UserToBitHandshake.java
@@ -46,12 +46,14 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
 
     static final UserToBitHandshake DEFAULT_INSTANCE = new 
UserToBitHandshake();
 
+    static final Boolean DEFAULT_SUPPORT_COMPLEX_TYPES = new Boolean(false);
     
     private RpcChannel channel;
     private Boolean supportListening;
     private int rpcVersion;
     private UserCredentials credentials;
     private UserProperties properties;
+    private Boolean supportComplexTypes = DEFAULT_SUPPORT_COMPLEX_TYPES;
 
     public UserToBitHandshake()
     {
@@ -125,6 +127,19 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
         return this;
     }
 
+    // supportComplexTypes
+
+    public Boolean getSupportComplexTypes()
+    {
+        return supportComplexTypes;
+    }
+
+    public UserToBitHandshake setSupportComplexTypes(Boolean 
supportComplexTypes)
+    {
+        this.supportComplexTypes = supportComplexTypes;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -196,6 +211,9 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
                     message.properties = input.mergeObject(message.properties, 
UserProperties.getSchema());
                     break;
 
+                case 6:
+                    message.supportComplexTypes = input.readBool();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -221,6 +239,9 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
         if(message.properties != null)
              output.writeObject(5, message.properties, 
UserProperties.getSchema(), false);
 
+
+        if(message.supportComplexTypes != null && message.supportComplexTypes 
!= DEFAULT_SUPPORT_COMPLEX_TYPES)
+            output.writeBool(6, message.supportComplexTypes, false);
     }
 
     public String getFieldName(int number)
@@ -232,6 +253,7 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
             case 3: return "rpcVersion";
             case 4: return "credentials";
             case 5: return "properties";
+            case 6: return "supportComplexTypes";
             default: return null;
         }
     }
@@ -250,6 +272,7 @@ public final class UserToBitHandshake implements 
Externalizable, Message<UserToB
         __fieldMap.put("rpcVersion", 3);
         __fieldMap.put("credentials", 4);
         __fieldMap.put("properties", 5);
+        __fieldMap.put("supportComplexTypes", 6);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/User.proto 
b/protocol/src/main/protobuf/User.proto
index ea12323..6c41a37 100644
--- a/protocol/src/main/protobuf/User.proto
+++ b/protocol/src/main/protobuf/User.proto
@@ -7,28 +7,23 @@ option optimize_for = SPEED;
 import "SchemaDef.proto";
 import "UserBitShared.proto";
 
-
-
 ////// UserToBit RPC ///////
 enum RpcType {
-    HANDSHAKE = 0;
-    ACK = 1;
-    GOODBYE = 2;
-    
-    // user to bit
-    RUN_QUERY = 3;
-    CANCEL_QUERY = 4;
-    REQUEST_RESULTS = 5;
-    
-    
-    // bit to user
-       QUERY_RESULT = 6;
-       QUERY_HANDLE = 7;
-       
-       REQ_META_FUNCTIONS = 8;
-       RESP_FUNCTION_LIST = 9;
-       
-       
+  HANDSHAKE = 0;
+  ACK = 1;
+  GOODBYE = 2;
+
+  // user to bit
+  RUN_QUERY = 3;
+  CANCEL_QUERY = 4;
+  REQUEST_RESULTS = 5;
+
+  // bit to user
+  QUERY_RESULT = 6;
+  QUERY_HANDLE = 7;
+
+  REQ_META_FUNCTIONS = 8;
+  RESP_FUNCTION_LIST = 9;
 }
 
 message Property {
@@ -46,6 +41,7 @@ message UserToBitHandshake {
   optional int32 rpc_version = 3;
   optional exec.shared.UserCredentials credentials = 4;
   optional UserProperties properties = 5;
+  optional bool support_complex_types = 6 [default = false];
 }
 
 message RequestResults {
@@ -60,16 +56,11 @@ message RunQuery {
 }
 
 enum QueryResultsMode {
-       STREAM_FULL = 1; // Server will inform the client regularly on the 
status of the query. Once the query is completed, service will inform the 
client as each query chunk is made available.
-       // STREAM_FIRST = 2; // Server will inform the client regularly on the 
status of the query.  Once the query is completed, server will inform the 
client of the first query chunk.
-       // QUERY_FOR_STATUS = 3; // Client will need to query for status of 
query.
+  STREAM_FULL = 1; // Server will inform the client regularly on the status of 
the query. Once the query is completed, service will inform the client as each 
query chunk is made available.
+  // STREAM_FIRST = 2; // Server will inform the client regularly on the 
status of the query.  Once the query is completed, server will inform the 
client of the first query chunk.
+  // QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
 }
 
-
 message BitToUserHandshake {
-       optional int32 rpc_version = 2;
+  optional int32 rpc_version = 2;
 }
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc00bc4b/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto 
b/protocol/src/main/protobuf/UserBitShared.proto
index 4bafeb8..b754ee5 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -197,4 +197,5 @@ enum CoreOperatorType {
   TEXT_SUB_SCAN = 28;
   JSON_SUB_SCAN = 29;
   INFO_SCHEMA_SUB_SCAN = 30;
+  FLATTEN = 31;
 }

Reply via email to