DRILL-399: Support USE SCHEMA. Also fixes bugs found in using default schema in 
queries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/18ac7b4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/18ac7b4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/18ac7b4e

Branch: refs/heads/master
Commit: 18ac7b4e682656137f3a0d12c2011fa45176f57d
Parents: 84fa4f1
Author: vkorukanti <[email protected]>
Authored: Mon Apr 21 09:35:52 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Apr 21 11:12:36 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/src/main/codegen/data/Parser.tdd |    4 +-
 .../src/main/codegen/includes/parserImpls.ftl   |   13 +
 .../apache/drill/exec/client/DrillClient.java   |   22 +-
 .../org/apache/drill/exec/ops/QueryContext.java |    8 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java  |    1 +
 .../planner/sql/handlers/UseSchemaHandler.java  |   51 +
 .../exec/planner/sql/parser/SqlUseSchema.java   |   65 +
 .../apache/drill/exec/rpc/user/UserClient.java  |   14 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |   13 +-
 .../apache/drill/exec/rpc/user/UserSession.java |   59 +-
 .../exec/store/dfs/FileSystemSchemaFactory.java |    4 +-
 .../java/org/apache/drill/PlanningBase.java     |    2 +-
 .../exec/physical/impl/TestOptiqPlans.java      |    2 +-
 .../exec/store/hive/HiveTestDataGenerator.java  |   24 +-
 .../org/apache/drill/exec/proto/UserProtos.java | 1638 +++++++++++++++++-
 protocol/src/main/protobuf/User.proto           |   11 +-
 .../apache/drill/jdbc/DrillConnectionImpl.java  |    4 +-
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |   95 +-
 18 files changed, 1941 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/codegen/data/Parser.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd 
b/exec/java-exec/src/main/codegen/data/Parser.tdd
index b8dba50..4f47795 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -32,13 +32,15 @@
     "SCHEMAS",
     "SHOW",
     "TABLES"
+    "USE"
   ]
 
   # List of methods for parsing custom SQL statements.
   statementParserMethods: [
     "SqlShowTables()",
     "SqlShowSchemas()",
-    "SqlDescribeTable()"
+    "SqlDescribeTable()",
+    "SqlUseSchema()"
   ]
 
   # List of methods for parsing custom literals.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl 
b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 25b240a..cd5ee72 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -100,3 +100,16 @@ SqlNode SqlDescribeTable() :
         return new SqlDescribeTable(pos, table, column, columnPattern);
     }
 }
+
+SqlNode SqlUseSchema():
+{
+    SqlIdentifier schema;
+    SqlParserPos pos;
+}
+{
+    <USE> { pos = getPos(); }
+    schema = CompoundIdentifier()
+    {
+        return new SqlUseSchema(pos, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 73f2e29..bbd3e42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -27,6 +27,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Vector;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -38,8 +39,10 @@ import 
org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.proto.UserProtos.Property;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 import org.apache.drill.exec.rpc.ChannelClosedException;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
@@ -62,6 +65,7 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
 
   DrillConfig config;
   private UserClient client;
+  private UserProperties props = null;
   private volatile ClusterCoordinator clusterCoordinator;
   private volatile boolean connected = false;
   private final TopLevelAllocator allocator = new 
TopLevelAllocator(Long.MAX_VALUE);
@@ -106,10 +110,14 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
    * @throws IOException
    */
   public void connect() throws RpcException {
-    connect((String) null);
+    connect(null, new Properties());
   }
 
-  public synchronized void connect(String connect) throws RpcException {
+  public void connect(Properties props) throws RpcException {
+    connect(null, props);
+  }
+
+  public synchronized void connect(String connect, Properties props) throws 
RpcException {
     if (connected) return;
 
     if (ownsZkConnection) {
@@ -121,6 +129,14 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
       }
     }
 
+    if (props != null) {
+      UserProperties.Builder upBuilder = UserProperties.newBuilder();
+      for(String key : props.stringPropertyNames())
+        
upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key)));
+
+      this.props = upBuilder.build();
+    }
+
     Collection<DrillbitEndpoint> endpoints = 
clusterCoordinator.getAvailableEndpoints();
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
@@ -157,7 +173,7 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     FutureHandler f = new FutureHandler();
     try {
-      client.connect(f, endpoint);
+      client.connect(f, endpoint, props);
       f.checkedGet();
     } catch (InterruptedException e) {
       throw new RpcException(e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index d658c13..7e3b63d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -64,8 +64,7 @@ public class QueryContext{
   }
 
   public SchemaPlus getNewDefaultSchema(){
-    SchemaPlus rootSchema = Frameworks.createRootSchema();
-    drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), 
rootSchema);
+    SchemaPlus rootSchema = getRootSchema();
     SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
     if(defaultSchema == null){
       return rootSchema;
@@ -74,6 +73,11 @@ public class QueryContext{
     }
   }
 
+  public SchemaPlus getRootSchema(){
+    SchemaPlus rootSchema = Frameworks.createRootSchema();
+    drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), 
rootSchema);
+    return rootSchema;
+  }
 
   public DrillbitEndpoint getCurrentEndpoint(){
     return drillbitContext.getEndpoint();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index abba774..8892a8f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -91,6 +91,7 @@ public class DrillSqlWorker {
       if (sqlNode instanceof SqlShowTables){ handler = new 
ShowTablesHandler(planner, context); break; }
       else if (sqlNode instanceof SqlShowSchemas){ handler = new 
ShowSchemasHandler(planner, context); break; }
       else if (sqlNode instanceof SqlDescribeTable){ handler = new 
DescribeTableHandler(planner, context); break; }
+      else if (sqlNode instanceof SqlUseSchema){ handler = new 
UseSchemaHandler(context); break; }
       // fallthrough
     default:
       handler = new DefaultSqlHandler(planner, context);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
new file mode 100644
index 0000000..fc951ed
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+
+import net.hydromatic.optiq.tools.RelConversionException;
+import net.hydromatic.optiq.tools.ValidationException;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
+import org.eigenbase.sql.SqlNode;
+
+public class UseSchemaHandler implements SqlHandler{
+  QueryContext context;
+
+  public UseSchemaHandler(QueryContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, 
RelConversionException, IOException {
+    SqlUseSchema useSchema = DefaultSqlHandler.unwrap(sqlNode, 
SqlUseSchema.class);
+
+    String defaultSchema = useSchema.getSchema();
+    boolean status = context.getSession().setDefaultSchemaPath(defaultSchema, 
context.getRootSchema());
+
+    String msg;
+    if (status) msg = String.format("Default schema changed to '%s'", 
defaultSchema);
+    else msg = String.format("Failed to change default schema to '%s'", 
defaultSchema);
+
+    return DirectPlan.createDirectPlan(context, status, msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
new file mode 100644
index 0000000..a48963e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import com.google.common.collect.ImmutableList;
+import org.eigenbase.sql.*;
+import org.eigenbase.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+/**
+ * Sql parser tree node to represent <code>USE SCHEMA</code> statement.
+ */
+public class SqlUseSchema extends SqlCall {
+
+  public static final SqlSpecialOperator OPERATOR =
+      new SqlSpecialOperator("USE_SCHEMA", SqlKind.OTHER);
+  private SqlIdentifier schema;
+
+  public SqlUseSchema(SqlParserPos pos, SqlIdentifier schema) {
+    super(pos);
+    this.schema = schema;
+    assert schema != null;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    return ImmutableList.of((SqlNode)schema);
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("USE");
+    schema.unparse(writer, leftPrec, rightPrec);
+  }
+
+  /**
+   * Get the schema name. A schema identifier can contain more than one level 
of schema.
+   * Ex: "dfs.home" identifier contains two levels "dfs" and "home".
+   * @return schemas combined with "."
+   */
+  public String getSchema() {
+    return schema.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 8b1bdec..50d456d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -28,6 +28,7 @@ import 
org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
@@ -50,9 +51,16 @@ public class UserClient extends 
BasicClientWithConnection<RpcType, UserToBitHand
     send(queryResultHandler.getWrappedListener(resultsListener), 
RpcType.RUN_QUERY, query, QueryId.class);
   }
 
-  public void connect(RpcConnectionHandler<ServerConnection> handler, 
DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
-    UserToBitHandshake hs = 
UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
-    this.connectAsClient(handler, hs, endpoint.getAddress(), 
endpoint.getUserPort());
+  public void connect(RpcConnectionHandler<ServerConnection> handler, 
DrillbitEndpoint endpoint, UserProperties props)
+      throws RpcException, InterruptedException {
+    UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
+        .setRpcVersion(UserRpcConfig.RPC_VERSION)
+        .setSupportListening(true);
+
+    if (props != null)
+      hsBuilder.setProperties(props);
+
+    this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), 
endpoint.getUserPort());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 0ea00f1..ae4b01a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -32,6 +32,7 @@ import 
org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.BasicServer;
@@ -73,10 +74,6 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       throws RpcException {
     switch (rpcType) {
 
-    case RpcType.HANDSHAKE_VALUE:
-      // logger.debug("Received handshake, responding in kind.");
-      return new Response(RpcType.HANDSHAKE, 
BitToUserHandshake.getDefaultInstance());
-
     case RpcType.RUN_QUERY_VALUE:
       // logger.debug("Received query to run.  Returning query handle.");
       try {
@@ -113,8 +110,8 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       super(channel);
     }
 
-    void setUser(UserCredentials credentials) throws IOException{
-      session = new UserSession(this, credentials, worker.getSchemaFactory());
+    void setUser(UserCredentials credentials, UserProperties props) throws 
IOException{
+      session = new UserSession(credentials, props);
     }
 
     public UserSession getSession(){
@@ -131,7 +128,6 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
     public BufferAllocator getAllocator() {
       return alloc;
     }
-
   }
 
   @Override
@@ -147,10 +143,9 @@ public class UserServer extends BasicServer<RpcType, 
UserServer.UserClientConnec
       public MessageLite getHandshakeResponse(UserToBitHandshake inbound) 
throws Exception {
 //        logger.debug("Handling handshake from user to bit. {}", inbound);
         if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new 
RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", 
inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
-        connection.setUser(inbound.getCredentials());
+        connection.setUser(inbound.getCredentials(), inbound.getProperties());
         return 
BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build();
       }
-
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 0291978..f27317c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -18,45 +18,76 @@
 package org.apache.drill.exec.rpc.user;
 
 import java.io.IOException;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
-import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.proto.UserProtos.Property;
+import org.apache.drill.exec.proto.UserProtos.UserProperties;
 
 public class UserSession {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UserSession.class);
 
+  public static final String SCHEMA = "schema";
+
   private DrillUser user;
-  private String defaultSchema = "";
-  private UserClientConnection connection;
+  private boolean enableExchanges = true;
+  private UserCredentials credentials;
+  private Map<String, String> properties;
 
-  public UserSession(UserClientConnection connection, UserCredentials 
credentials, SchemaFactory factory) throws IOException{
-    this.connection = connection;
-  }
+  public UserSession(UserCredentials credentials, UserProperties properties) 
throws IOException{
+    this.credentials = credentials;
 
+    this.properties = Maps.newHashMap();
+    if (properties == null) return;
+    for (int i=0; i<properties.getPropertiesCount(); i++) {
+      Property prop = properties.getProperties(i);
+      this.properties.put(prop.getKey(), prop.getValue());
+    }
+  }
 
   public DrillUser getUser(){
     return user;
   }
 
-
   /**
    * Update the schema path for the session.
    * @param fullPath The desired path to set to.
    * @param schema The root schema to find this path within.
-   * @return true if the path was set succesfully.  false if this path was 
unavailable.
+   * @return true if the path was set successfully.  false if this path was 
unavailable.
    */
   public boolean setDefaultSchemaPath(String fullPath, SchemaPlus schema){
-    SchemaPlus newDefault = getDefaultSchema(schema);
+    SchemaPlus newDefault = findSchema(schema, fullPath);
     if(newDefault == null) return false;
-    this.defaultSchema = fullPath;
+    setProp(SCHEMA, fullPath);
     return true;
   }
 
+  /**
+   * Get default schema from current default schema path and given schema tree.
+   * @param rootSchema
+   * @return A {@link net.hydromatic.optiq.SchemaPlus} object.
+   */
   public SchemaPlus getDefaultSchema(SchemaPlus rootSchema){
-    String[] paths = defaultSchema.split("\\.");
+    return findSchema(rootSchema, getProp(SCHEMA));
+  }
+
+  public boolean setSessionOption(String name, String value){
+    return true;
+  }
+
+  private String getProp(String key) {
+    return properties.get(key) != null ? properties.get(key) : "";
+  }
+
+  private void setProp(String key, String value) {
+    properties.put(key, value);
+  }
+
+  private SchemaPlus findSchema(SchemaPlus rootSchema, String schemaPath) {
+    String[] paths = schemaPath.split("\\.");
     SchemaPlus schema = rootSchema;
     for(String p : paths){
       schema = schema.getSubSchema(p);
@@ -64,8 +95,4 @@ public class UserSession {
     }
     return schema;
   }
-
-  public boolean setSessionOption(String name, String value){
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index b3f240e..93bac0c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -96,12 +96,12 @@ public class FileSystemSchemaFactory implements 
SchemaFactory{
 
     @Override
     public Schema getSubSchema(String name) {
-      return defaultSchema.getSubSchema(name);
+      return schemaMap.get(name);
     }
 
     @Override
     public Set<String> getSubSchemaNames() {
-      return defaultSchema.getSubSchemaNames();
+      return schemaMap.keySet();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java 
b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index a9ea6da..a9c8e69 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -87,7 +87,7 @@ public class PlanningBase {
         context.getFunctionRegistry();
         result = functionRegistry;
         context.getSession();
-        result = new UserSession(null, null, null);
+        result = new UserSession(null, null);
         context.getCurrentEndpoint();
         result = DrillbitEndpoint.getDefaultInstance();
         context.getActiveEndpoints();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index c6b254a..f1d6c03 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -108,7 +108,7 @@ public class TestOptiqPlans {
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
     DrillbitContext bitContext = new 
DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, 
controller, com, cache, workBus);
-    QueryContext qc = new QueryContext(new UserSession(null, null, null), 
QueryId.getDefaultInstance(), bitContext);
+    QueryContext qc = new QueryContext(new UserSession(null, null), 
QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = 
reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), 
Charsets.UTF_8));
     PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc).optimize(

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 8a71e2a..66d5b62 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -63,15 +63,25 @@ public class HiveTestDataGenerator {
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     conf.set("hive.metastore.warehouse.dir", WH_DIR);
 
-    String tableName = "kv";
-
     SessionState ss = new SessionState(new HiveConf(SessionState.class));
     SessionState.start(ss);
     hiveDriver = new Driver(conf);
-    executeQuery(String.format("CREATE TABLE IF NOT EXISTS default.kv(key INT, 
value STRING) "+
-      "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE", 
tableName));
-    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO 
TABLE %s", generateTestDataFile(), tableName));
-      ss.close();
+
+    // generate (key, value) test data
+    String testDataFile = generateTestDataFile();
+
+    createTableAndLoadData("default", "kv", testDataFile);
+    executeQuery("CREATE DATABASE IF NOT EXISTS db1");
+    createTableAndLoadData("db1", "kv_db1", testDataFile);
+
+    ss.close();
+  }
+
+  private void createTableAndLoadData(String dbName, String tblName, String 
dataFile) {
+    executeQuery(String.format("USE %s", dbName));
+    executeQuery(String.format("CREATE TABLE IF NOT EXISTS %s.%s(key INT, 
value STRING) "+
+        "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE", 
dbName, tblName));
+    executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO 
TABLE %s.%s", dataFile, dbName, tblName));
   }
 
   private String generateTestDataFile() throws Exception {
@@ -111,4 +121,4 @@ public class HiveTestDataGenerator {
       throw new RuntimeException(String.format("Failed to execute command 
'%s', errorMsg = '%s'",
         query, (response != null ? response.getErrorMessage() : "")));
   }
-}
\ No newline at end of file
+}

Reply via email to