DRILL-399: Support USE SCHEMA. Also fixes bugs found in using default schema in queries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/18ac7b4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/18ac7b4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/18ac7b4e Branch: refs/heads/master Commit: 18ac7b4e682656137f3a0d12c2011fa45176f57d Parents: 84fa4f1 Author: vkorukanti <[email protected]> Authored: Mon Apr 21 09:35:52 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Apr 21 11:12:36 2014 -0700 ---------------------------------------------------------------------- exec/java-exec/src/main/codegen/data/Parser.tdd | 4 +- .../src/main/codegen/includes/parserImpls.ftl | 13 + .../apache/drill/exec/client/DrillClient.java | 22 +- .../org/apache/drill/exec/ops/QueryContext.java | 8 +- .../drill/exec/planner/sql/DrillSqlWorker.java | 1 + .../planner/sql/handlers/UseSchemaHandler.java | 51 + .../exec/planner/sql/parser/SqlUseSchema.java | 65 + .../apache/drill/exec/rpc/user/UserClient.java | 14 +- .../apache/drill/exec/rpc/user/UserServer.java | 13 +- .../apache/drill/exec/rpc/user/UserSession.java | 59 +- .../exec/store/dfs/FileSystemSchemaFactory.java | 4 +- .../java/org/apache/drill/PlanningBase.java | 2 +- .../exec/physical/impl/TestOptiqPlans.java | 2 +- .../exec/store/hive/HiveTestDataGenerator.java | 24 +- .../org/apache/drill/exec/proto/UserProtos.java | 1638 +++++++++++++++++- protocol/src/main/protobuf/User.proto | 11 +- .../apache/drill/jdbc/DrillConnectionImpl.java | 4 +- .../apache/drill/jdbc/test/TestJdbcQuery.java | 95 +- 18 files changed, 1941 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/codegen/data/Parser.tdd ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd index b8dba50..4f47795 100644 --- a/exec/java-exec/src/main/codegen/data/Parser.tdd +++ b/exec/java-exec/src/main/codegen/data/Parser.tdd @@ -32,13 +32,15 @@ "SCHEMAS", "SHOW", "TABLES" + "USE" ] # List of methods for parsing custom SQL statements. statementParserMethods: [ "SqlShowTables()", "SqlShowSchemas()", - "SqlDescribeTable()" + "SqlDescribeTable()", + "SqlUseSchema()" ] # List of methods for parsing custom literals. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/codegen/includes/parserImpls.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index 25b240a..cd5ee72 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -100,3 +100,16 @@ SqlNode SqlDescribeTable() : return new SqlDescribeTable(pos, table, column, columnPattern); } } + +SqlNode SqlUseSchema(): +{ + SqlIdentifier schema; + SqlParserPos pos; +} +{ + <USE> { pos = getPos(); } + schema = CompoundIdentifier() + { + return new SqlUseSchema(pos, schema); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 73f2e29..bbd3e42 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Properties; import java.util.Vector; import org.apache.drill.common.config.DrillConfig; @@ -38,8 +39,10 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.QueryType; import org.apache.drill.exec.proto.UserProtos.RpcType; +import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; import org.apache.drill.exec.rpc.ChannelClosedException; import org.apache.drill.exec.rpc.DrillRpcFuture; @@ -62,6 +65,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ DrillConfig config; private UserClient client; + private UserProperties props = null; private volatile ClusterCoordinator clusterCoordinator; private volatile boolean connected = false; private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE); @@ -106,10 +110,14 @@ public class DrillClient implements Closeable, ConnectionThrottle{ * @throws IOException */ public void connect() throws RpcException { - connect((String) null); + connect(null, new Properties()); } - public synchronized void connect(String connect) throws RpcException { + public void connect(Properties props) throws RpcException { + connect(null, props); + } + + public synchronized void connect(String connect, Properties props) throws RpcException { if (connected) return; if (ownsZkConnection) { @@ -121,6 +129,14 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } } + if (props != null) { + UserProperties.Builder upBuilder = UserProperties.newBuilder(); + for(String key : props.stringPropertyNames()) + upBuilder.addProperties(Property.newBuilder().setKey(key).setValue(props.getProperty(key))); + + this.props = upBuilder.build(); + } + Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints(); checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found"); // just use the first endpoint for now @@ -157,7 +173,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private void connect(DrillbitEndpoint endpoint) throws RpcException { FutureHandler f = new FutureHandler(); try { - client.connect(f, endpoint); + client.connect(f, endpoint, props); f.checkedGet(); } catch (InterruptedException e) { throw new RpcException(e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index d658c13..7e3b63d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -64,8 +64,7 @@ public class QueryContext{ } public SchemaPlus getNewDefaultSchema(){ - SchemaPlus rootSchema = Frameworks.createRootSchema(); - drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema); + SchemaPlus rootSchema = getRootSchema(); SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema); if(defaultSchema == null){ return rootSchema; @@ -74,6 +73,11 @@ public class QueryContext{ } } + public SchemaPlus getRootSchema(){ + SchemaPlus rootSchema = Frameworks.createRootSchema(); + drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema); + return rootSchema; + } public DrillbitEndpoint getCurrentEndpoint(){ return drillbitContext.getEndpoint(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index abba774..8892a8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -91,6 +91,7 @@ public class DrillSqlWorker { if (sqlNode instanceof SqlShowTables){ handler = new ShowTablesHandler(planner, context); break; } else if (sqlNode instanceof SqlShowSchemas){ handler = new ShowSchemasHandler(planner, context); break; } else if (sqlNode instanceof SqlDescribeTable){ handler = new DescribeTableHandler(planner, context); break; } + else if (sqlNode instanceof SqlUseSchema){ handler = new UseSchemaHandler(context); break; } // fallthrough default: handler = new DefaultSqlHandler(planner, context); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java new file mode 100644 index 0000000..fc951ed --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/UseSchemaHandler.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import java.io.IOException; + +import net.hydromatic.optiq.tools.RelConversionException; +import net.hydromatic.optiq.tools.ValidationException; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.parser.SqlUseSchema; +import org.eigenbase.sql.SqlNode; + +public class UseSchemaHandler implements SqlHandler{ + QueryContext context; + + public UseSchemaHandler(QueryContext context) { + this.context = context; + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException { + SqlUseSchema useSchema = DefaultSqlHandler.unwrap(sqlNode, SqlUseSchema.class); + + String defaultSchema = useSchema.getSchema(); + boolean status = context.getSession().setDefaultSchemaPath(defaultSchema, context.getRootSchema()); + + String msg; + if (status) msg = String.format("Default schema changed to '%s'", defaultSchema); + else msg = String.format("Failed to change default schema to '%s'", defaultSchema); + + return DirectPlan.createDirectPlan(context, status, msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java new file mode 100644 index 0000000..a48963e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import com.google.common.collect.ImmutableList; +import org.eigenbase.sql.*; +import org.eigenbase.sql.parser.SqlParserPos; + +import java.util.List; + +/** + * Sql parser tree node to represent <code>USE SCHEMA</code> statement. + */ +public class SqlUseSchema extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("USE_SCHEMA", SqlKind.OTHER); + private SqlIdentifier schema; + + public SqlUseSchema(SqlParserPos pos, SqlIdentifier schema) { + super(pos); + this.schema = schema; + assert schema != null; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableList.of((SqlNode)schema); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("USE"); + schema.unparse(writer, leftPrec, rightPrec); + } + + /** + * Get the schema name. A schema identifier can contain more than one level of schema. + * Ex: "dfs.home" identifier contains two levels "dfs" and "home". + * @return schemas combined with "." + */ + public String getSchema() { + return schema.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 8b1bdec..50d456d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; import org.apache.drill.exec.proto.UserProtos.QueryResult; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; +import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.BasicClientWithConnection; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; @@ -50,9 +51,16 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class); } - public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException { - UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build(); - this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort()); + public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props) + throws RpcException, InterruptedException { + UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() + .setRpcVersion(UserRpcConfig.RPC_VERSION) + .setSupportListening(true); + + if (props != null) + hsBuilder.setProperties(props); + + this.connectAsClient(handler, hsBuilder.build(), endpoint.getAddress(), endpoint.getUserPort()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 0ea00f1..ae4b01a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; +import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; @@ -73,10 +74,6 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec throws RpcException { switch (rpcType) { - case RpcType.HANDSHAKE_VALUE: - // logger.debug("Received handshake, responding in kind."); - return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance()); - case RpcType.RUN_QUERY_VALUE: // logger.debug("Received query to run. Returning query handle."); try { @@ -113,8 +110,8 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec super(channel); } - void setUser(UserCredentials credentials) throws IOException{ - session = new UserSession(this, credentials, worker.getSchemaFactory()); + void setUser(UserCredentials credentials, UserProperties props) throws IOException{ + session = new UserSession(credentials, props); } public UserSession getSession(){ @@ -131,7 +128,6 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec public BufferAllocator getAllocator() { return alloc; } - } @Override @@ -147,10 +143,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec public MessageLite getHandshakeResponse(UserToBitHandshake inbound) throws Exception { // logger.debug("Handling handshake from user to bit. {}", inbound); if(inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION)); - connection.setUser(inbound.getCredentials()); + connection.setUser(inbound.getCredentials(), inbound.getProperties()); return BitToUserHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(); } - }; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index 0291978..f27317c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -18,45 +18,76 @@ package org.apache.drill.exec.rpc.user; import java.io.IOException; +import java.util.Map; +import com.google.common.collect.Maps; import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.exec.proto.UserBitShared.UserCredentials; -import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; -import org.apache.drill.exec.store.SchemaFactory; +import org.apache.drill.exec.proto.UserProtos.Property; +import org.apache.drill.exec.proto.UserProtos.UserProperties; public class UserSession { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class); + public static final String SCHEMA = "schema"; + private DrillUser user; - private String defaultSchema = ""; - private UserClientConnection connection; + private boolean enableExchanges = true; + private UserCredentials credentials; + private Map<String, String> properties; - public UserSession(UserClientConnection connection, UserCredentials credentials, SchemaFactory factory) throws IOException{ - this.connection = connection; - } + public UserSession(UserCredentials credentials, UserProperties properties) throws IOException{ + this.credentials = credentials; + this.properties = Maps.newHashMap(); + if (properties == null) return; + for (int i=0; i<properties.getPropertiesCount(); i++) { + Property prop = properties.getProperties(i); + this.properties.put(prop.getKey(), prop.getValue()); + } + } public DrillUser getUser(){ return user; } - /** * Update the schema path for the session. * @param fullPath The desired path to set to. * @param schema The root schema to find this path within. - * @return true if the path was set succesfully. false if this path was unavailable. + * @return true if the path was set successfully. false if this path was unavailable. */ public boolean setDefaultSchemaPath(String fullPath, SchemaPlus schema){ - SchemaPlus newDefault = getDefaultSchema(schema); + SchemaPlus newDefault = findSchema(schema, fullPath); if(newDefault == null) return false; - this.defaultSchema = fullPath; + setProp(SCHEMA, fullPath); return true; } + /** + * Get default schema from current default schema path and given schema tree. + * @param rootSchema + * @return A {@link net.hydromatic.optiq.SchemaPlus} object. + */ public SchemaPlus getDefaultSchema(SchemaPlus rootSchema){ - String[] paths = defaultSchema.split("\\."); + return findSchema(rootSchema, getProp(SCHEMA)); + } + + public boolean setSessionOption(String name, String value){ + return true; + } + + private String getProp(String key) { + return properties.get(key) != null ? properties.get(key) : ""; + } + + private void setProp(String key, String value) { + properties.put(key, value); + } + + private SchemaPlus findSchema(SchemaPlus rootSchema, String schemaPath) { + String[] paths = schemaPath.split("\\."); SchemaPlus schema = rootSchema; for(String p : paths){ schema = schema.getSubSchema(p); @@ -64,8 +95,4 @@ public class UserSession { } return schema; } - - public boolean setSessionOption(String name, String value){ - return true; - } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java index b3f240e..93bac0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java @@ -96,12 +96,12 @@ public class FileSystemSchemaFactory implements SchemaFactory{ @Override public Schema getSubSchema(String name) { - return defaultSchema.getSubSchema(name); + return schemaMap.get(name); } @Override public Set<String> getSubSchemaNames() { - return defaultSchema.getSubSchemaNames(); + return schemaMap.keySet(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index a9ea6da..a9c8e69 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -87,7 +87,7 @@ public class PlanningBase { context.getFunctionRegistry(); result = functionRegistry; context.getSession(); - result = new UserSession(null, null, null); + result = new UserSession(null, null); context.getCurrentEndpoint(); result = DrillbitEndpoint.getDefaultInstance(); context.getActiveEndpoints(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index c6b254a..f1d6c03 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -108,7 +108,7 @@ public class TestOptiqPlans { }; RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus); - QueryContext qc = new QueryContext(new UserSession(null, null, null), QueryId.getDefaultInstance(), bitContext); + QueryContext qc = new QueryContext(new UserSession(null, null), QueryId.getDefaultInstance(), bitContext); PhysicalPlanReader reader = bitContext.getPlanReader(); LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc).optimize( http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/18ac7b4e/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 8a71e2a..66d5b62 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -63,15 +63,25 @@ public class HiveTestDataGenerator { conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); conf.set("hive.metastore.warehouse.dir", WH_DIR); - String tableName = "kv"; - SessionState ss = new SessionState(new HiveConf(SessionState.class)); SessionState.start(ss); hiveDriver = new Driver(conf); - executeQuery(String.format("CREATE TABLE IF NOT EXISTS default.kv(key INT, value STRING) "+ - "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE", tableName)); - executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO TABLE %s", generateTestDataFile(), tableName)); - ss.close(); + + // generate (key, value) test data + String testDataFile = generateTestDataFile(); + + createTableAndLoadData("default", "kv", testDataFile); + executeQuery("CREATE DATABASE IF NOT EXISTS db1"); + createTableAndLoadData("db1", "kv_db1", testDataFile); + + ss.close(); + } + + private void createTableAndLoadData(String dbName, String tblName, String dataFile) { + executeQuery(String.format("USE %s", dbName)); + executeQuery(String.format("CREATE TABLE IF NOT EXISTS %s.%s(key INT, value STRING) "+ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE", dbName, tblName)); + executeQuery(String.format("LOAD DATA LOCAL INPATH '%s' OVERWRITE INTO TABLE %s.%s", dataFile, dbName, tblName)); } private String generateTestDataFile() throws Exception { @@ -111,4 +121,4 @@ public class HiveTestDataGenerator { throw new RuntimeException(String.format("Failed to execute command '%s', errorMsg = '%s'", query, (response != null ? response.getErrorMessage() : ""))); } -} \ No newline at end of file +}
