TAJO-1140: Separate TajoClient into fine grained parts. Closes #213
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f80beaf6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f80beaf6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f80beaf6 Branch: refs/heads/hbase_storage Commit: f80beaf64d33850ee79e8ae32b33a852f56712f0 Parents: 8741e68 Author: Hyunsik Choi <[email protected]> Authored: Mon Oct 27 22:16:30 2014 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Mon Oct 27 22:16:30 2014 -0700 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/cli/DefaultTajoCliOutputFormatter.java | 6 +- .../org/apache/tajo/cli/DescTableCommand.java | 4 +- .../tajo/cli/InvalidClientSessionException.java | 27 - .../main/java/org/apache/tajo/cli/TajoCli.java | 20 +- .../org/apache/tajo/cli/TajoShellCommand.java | 2 +- .../apache/tajo/client/CatalogAdminClient.java | 137 +++ .../tajo/client/CatalogAdminClientImpl.java | 266 +++++ .../client/InvalidClientSessionException.java | 27 + .../org/apache/tajo/client/QueryClient.java | 116 ++ .../org/apache/tajo/client/QueryClientImpl.java | 622 +++++++++++ .../apache/tajo/client/SessionConnection.java | 326 ++++++ .../java/org/apache/tajo/client/TajoAdmin.java | 8 +- .../java/org/apache/tajo/client/TajoClient.java | 1023 +----------------- .../org/apache/tajo/client/TajoClientImpl.java | 215 ++++ .../org/apache/tajo/client/TajoClientUtil.java | 94 ++ .../java/org/apache/tajo/client/TajoDump.java | 4 +- .../org/apache/tajo/client/TajoGetConf.java | 8 +- .../org/apache/tajo/client/TajoHAAdmin.java | 8 +- .../apache/tajo/client/TajoHAClientUtil.java | 2 +- .../org/apache/tajo/jdbc/FetchResultSet.java | 5 +- .../org/apache/tajo/jdbc/TajoResultSet.java | 7 +- .../org/apache/tajo/benchmark/BenchmarkSet.java | 5 +- .../org/apache/tajo/master/GlobalEngine.java | 4 +- .../tajo/webapp/QueryExecutorServlet.java | 19 +- .../apache/tajo/LocalTajoTestingUtility.java | 3 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 3 +- .../org/apache/tajo/TajoTestingCluster.java | 10 +- .../org/apache/tajo/client/TestTajoClient.java | 4 +- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 6 +- .../tajo/master/ha/TestHAServiceHDFSImpl.java | 3 +- .../master/querymaster/TestQueryProgress.java | 6 +- .../tajo/scheduler/TestFifoScheduler.java | 8 +- .../org/apache/tajo/worker/TestHistory.java | 3 +- .../org/apache/tajo/jdbc/JdbcConnection.java | 444 ++++++++ .../org/apache/tajo/jdbc/TajoConnection.java | 437 -------- .../apache/tajo/jdbc/TajoDatabaseMetaData.java | 27 +- .../java/org/apache/tajo/jdbc/TajoDriver.java | 2 +- .../apache/tajo/jdbc/TajoPreparedStatement.java | 4 +- .../org/apache/tajo/jdbc/TajoStatement.java | 5 +- 40 files changed, 2346 insertions(+), 1576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index d239a19..ba2646d 100644 --- a/CHANGES +++ b/CHANGES @@ -11,6 +11,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1140: Separate TajoClient into fine grained parts. (hyunsik) + TAJO-1132: More detailed version info in tsql. (hyunsik) TAJO-1125: Separate logical plan and optimizer into a maven module. http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java index 5797778..c583aa2 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java @@ -23,8 +23,8 @@ import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.QueryStatus; -import org.apache.tajo.client.TajoClient; import org.apache.tajo.util.FileUtil; import java.io.InputStream; @@ -56,10 +56,10 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter { TableStats stat = tableDesc.getStats(); String volume = stat == null ? (endOfTuple ? "0 B" : "unknown bytes") : FileUtil.humanReadableByteCount(stat.getNumBytes(), false); - long resultRows = stat == null ? TajoClient.UNKNOWN_ROW_NUMBER : stat.getNumRows(); + long resultRows = stat == null ? QueryClient.UNKNOWN_ROW_NUMBER : stat.getNumRows(); String displayRowNum; - if (resultRows == TajoClient.UNKNOWN_ROW_NUMBER) { + if (resultRows == QueryClient.UNKNOWN_ROW_NUMBER) { if (endOfTuple) { displayRowNum = totalPrintedRows + " rows"; http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java index 8fab138..4b34858 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java @@ -23,7 +23,7 @@ import org.apache.commons.lang.StringEscapeUtils; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.QueryClient; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -82,7 +82,7 @@ public class DescTableCommand extends TajoShellCommand { if (desc.getStats() != null) { long row = desc.getStats().getNumRows(); - String rowText = row == TajoClient.UNKNOWN_ROW_NUMBER ? "unknown" : row + ""; + String rowText = row == QueryClient.UNKNOWN_ROW_NUMBER ? "unknown" : row + ""; sb.append("number of rows: ").append(rowText).append("\n"); sb.append("volume: ").append( FileUtil.humanReadableByteCount(desc.getStats().getNumBytes(), http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java b/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java deleted file mode 100644 index 5c6c96e..0000000 --- a/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.cli; - -import com.google.protobuf.ServiceException; - -public class InvalidClientSessionException extends ServiceException { - public InvalidClientSessionException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index 05b919c..c732fd9 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -21,17 +21,13 @@ package org.apache.tajo.cli; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; - import jline.UnsupportedTerminal; import jline.console.ConsoleReader; - import org.apache.commons.cli.*; import org.apache.tajo.*; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.client.QueryStatus; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoHAClientUtil; +import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; @@ -233,9 +229,9 @@ public class TajoCli { throw new RuntimeException("cannot find valid Tajo server address"); } else if (hostName != null && port != null) { conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port); - client = new TajoClient(conf, baseDatabase); + client = new TajoClientImpl(conf, baseDatabase); } else if (hostName == null && port == null) { - client = new TajoClient(conf, baseDatabase); + client = new TajoClientImpl(conf, baseDatabase); } try { @@ -565,7 +561,7 @@ public class TajoCli { if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { displayFormatter.printResult(sout, sin, desc, responseTime, res); } else { - res = TajoClient.createResultSet(client, response); + res = TajoClientUtil.createResultSet(conf, client, response); displayFormatter.printResult(sout, sin, desc, responseTime, res); } } catch (Throwable t) { @@ -597,17 +593,17 @@ public class TajoCli { while (true) { // TODO - configurable status = client.getQueryStatus(queryId); - if(TajoClient.isInPreNewState(status.getState())) { + if(TajoClientUtil.isQueryWaitingForSchedule(status.getState())) { Thread.sleep(Math.min(20 * initRetries, 1000)); initRetries++; continue; } - if (TajoClient.isInRunningState(status.getState()) || status.getState() == QueryState.QUERY_SUCCEEDED) { + if (TajoClientUtil.isQueryRunning(status.getState()) || status.getState() == QueryState.QUERY_SUCCEEDED) { displayFormatter.printProgress(sout, status); } - if (TajoClient.isInCompleteState(status.getState()) && status.getState() != QueryState.QUERY_KILL_WAIT) { + if (TajoClientUtil.isQueryComplete(status.getState()) && status.getState() != QueryState.QUERY_KILL_WAIT) { break; } else { Thread.sleep(Math.min(200 * progressRetries, 1000)); @@ -626,7 +622,7 @@ public class TajoCli { float responseTime = ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0f); ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId); if (status.hasResult()) { - res = TajoClient.createResultSet(client, queryId, response); + res = TajoClientUtil.createResultSet(conf, client, queryId, response); TableDesc desc = new TableDesc(response.getTableDesc()); displayFormatter.printResult(sout, sin, desc, responseTime, res); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java index 39f5377..138aec4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java @@ -40,7 +40,7 @@ public abstract class TajoShellCommand { protected int maxColumn; public TajoShellCommand(TajoCli.TajoCliContext context) { - maxColumn = context.getTajoClient().getConf().getIntVar(TajoConf.ConfVars.$CLI_MAX_COLUMN); + maxColumn = context.getConf().getIntVar(TajoConf.ConfVars.$CLI_MAX_COLUMN); this.context = context; client = context.getTajoClient(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java new file mode 100644 index 0000000..a36fc0e --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; + +import java.io.Closeable; +import java.sql.SQLException; +import java.util.List; + +public interface CatalogAdminClient extends Closeable { + /** + * Create a database. + * + * @param databaseName The database name to be created. This name is case sensitive. + * @return True if created successfully. + * @throws com.google.protobuf.ServiceException + */ + public boolean createDatabase(final String databaseName) throws ServiceException; + /** + * Does the database exist? + * + * @param databaseName The database name to be checked. This name is case sensitive. + * @return True if so. + * @throws ServiceException + */ + public boolean existDatabase(final String databaseName) throws ServiceException; + /** + * Drop the database + * + * @param databaseName The database name to be dropped. This name is case sensitive. + * @return True if the database is dropped successfully. + * @throws ServiceException + */ + public boolean dropDatabase(final String databaseName) throws ServiceException; + + public List<String> getAllDatabaseNames() throws ServiceException; + + /** + * Does the table exist? + * + * @param tableName The table name to be checked. This name is case sensitive. + * @return True if so. + */ + public boolean existTable(final String tableName) throws ServiceException; + + /** + * Create an external table. + * + * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not. + * If the table name is not qualified, the current database in the session will be used. + * @param schema The schema + * @param path The external table location + * @param meta Table meta + * @return the created table description. + * @throws java.sql.SQLException + * @throws ServiceException + */ + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta) throws SQLException, ServiceException; + + /** + * Create an external table. + * + * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not. + * If the table name is not qualified, the current database in the session will be used. + * @param schema The schema + * @param path The external table location + * @param meta Table meta + * @param partitionMethodDesc Table partition description + * @return the created table description. + * @throws SQLException + * @throws ServiceException + */ + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) + throws SQLException, ServiceException; + + /** + * Drop a table + * + * @param tableName The table name to be dropped. This name is case sensitive. + * @return True if the table is dropped successfully. + */ + public boolean dropTable(final String tableName) throws ServiceException; + + /** + * Drop a table. + * + * @param tableName The table name to be dropped. This name is case sensitive. + * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents. + * @return True if the table is dropped successfully. + */ + public boolean dropTable(final String tableName, final boolean purge) throws ServiceException; + + /** + * Get a list of table names. + * + * @param databaseName The database name to show all tables. This name is case sensitive. + * If it is null, this method will show all tables + * in the current database of this session. + */ + public List<String> getTableList(@Nullable final String databaseName) throws ServiceException; + + /** + * Get a table description + * + * @param tableName The table name to get. This name is case sensitive. + * @return Table description + */ + public TableDesc getTableDesc(final String tableName) throws ServiceException; + + public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java new file mode 100644 index 0000000..496161d --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.TajoMasterClientProtocol; +import org.apache.tajo.jdbc.SQLStates; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.ServerCallable; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; +import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; + +public class CatalogAdminClientImpl implements CatalogAdminClient { + private final SessionConnection connection; + + public CatalogAdminClientImpl(SessionConnection connection) { + this.connection = connection; + } + + @Override + public boolean createDatabase(final String databaseName) throws ServiceException { + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMaster = client.getStub(); + return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + } + + }.withRetries(); + } + + @Override + public boolean existDatabase(final String databaseName) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMaster = client.getStub(); + return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + } + + }.withRetries(); + } + + @Override + public boolean dropDatabase(final String databaseName) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + } + + }.withRetries(); + } + + @Override + public List<String> getAllDatabaseNames() throws ServiceException { + + return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public List<String> call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList(); + } + + }.withRetries(); + } + + public boolean existTable(final String tableName) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue(); + } + + }.withRetries(); + } + + @Override + public TableDesc createExternalTable(String tableName, Schema schema, Path path, TableMeta meta) + throws SQLException, ServiceException { + return createExternalTable(tableName, schema, path, meta, null); + } + + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) + throws SQLException, ServiceException { + + return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setName(tableName); + builder.setSchema(schema.getProto()); + builder.setMeta(meta.getProto()); + builder.setPath(path.toUri().toString()); + if (partitionMethodDesc != null) { + builder.setPartition(partitionMethodDesc.getProto()); + } + ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { + throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + } + } + + }.withRetries(); + } + + @Override + public boolean dropTable(String tableName) throws ServiceException { + return dropTable(tableName, false); + } + + @Override + public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setName(tableName); + builder.setPurge(purge); + return tajoMasterService.dropTable(null, builder.build()).getValue(); + } + + }.withRetries(); + + } + + @Override + public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { + return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public List<String> call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + if (databaseName != null) { + builder.setDatabaseName(databaseName); + } + ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build()); + return res.getTablesList(); + } + + }.withRetries(); + } + + @Override + public TableDesc getTableDesc(final String tableName) throws ServiceException { + + return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { + throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + } + } + + }.withRetries(); + } + + @Override + public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { + + return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.connPool, + connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { + + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + String paramFunctionName = functionName == null ? "" : functionName; + ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, + connection.convertSessionedString(paramFunctionName)); + if (res.getResultCode() == ClientProtos.ResultCode.OK) { + return res.getFunctionsList(); + } else { + throw new SQLException(res.getErrorMessage()); + } + } + + }.withRetries(); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java b/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java new file mode 100644 index 0000000..acbc33f --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; + +public class InvalidClientSessionException extends ServiceException { + public InvalidClientSessionException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java new file mode 100644 index 0000000..dbbafb6 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tajo.QueryId; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.jdbc.TajoMemoryResultSet; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; + +import static org.apache.tajo.TajoIdProtos.SessionIdProto; + +public interface QueryClient extends Closeable { + + int UNKNOWN_ROW_NUMBER = -1; + + public void setSessionId(SessionIdProto sessionId); + + public boolean isConnected(); + + public SessionIdProto getSessionId(); + + public String getBaseDatabase(); + + @Override + public void close(); + + public UserGroupInformation getUserInfo(); + + /** + * Call to QueryMaster closing query resources + * @param queryId + */ + public void closeQuery(final QueryId queryId); + + public void closeNonForwardQuery(final QueryId queryId); + + public String getCurrentDatabase() throws ServiceException; + + public Boolean selectDatabase(final String databaseName) throws ServiceException; + + public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException; + + public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException; + + public String getSessionVariable(final String varname) throws ServiceException; + + public Boolean existSessionVariable(final String varname) throws ServiceException; + + public Map<String, String> getAllSessionVariables() throws ServiceException; + + /** + * It submits a query statement and get a response immediately. + * The response only contains a query id, and submission status. + * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}. + */ + public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException; + + public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException; + + /** + * It submits a query statement and get a response. + * The main difference from {@link #executeQuery(String)} + * is a blocking method. So, this method is wait for + * the finish of the submitted query. + * + * @return If failed, return null. + */ + public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException; + + public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException; + + public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException; + + public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException; + + public ResultSet createNullResultSet(QueryId queryId) throws IOException; + + public ClientProtos.GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException; + + public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException; + + public boolean updateQuery(final String sql) throws ServiceException; + + public boolean updateQueryWithJson(final String json) throws ServiceException; + + public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException; + + public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException; + + public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException; + + public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java new file mode 100644 index 0000000..235ce19 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -0,0 +1,622 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.QueryMasterClientProtocol; +import org.apache.tajo.ipc.TajoMasterClientProtocol; +import org.apache.tajo.jdbc.TajoMemoryResultSet; +import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.util.NetUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; + +import static org.apache.tajo.conf.TajoConf.ConfVars; +import static org.apache.tajo.ipc.ClientProtos.*; +import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; +import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; + +public class QueryClientImpl implements QueryClient { + private static final Log LOG = LogFactory.getLog(QueryClientImpl.class); + private final SessionConnection connection; + + public QueryClientImpl(SessionConnection connection) { + this.connection = connection; + } + + @Override + public void setSessionId(TajoIdProtos.SessionIdProto sessionId) { + connection.setSessionId(sessionId); + } + + @Override + public boolean isConnected() { + return connection.isConnected(); + } + + @Override + public TajoIdProtos.SessionIdProto getSessionId() { + return connection.getSessionId(); + } + + @Override + public String getBaseDatabase() { + return connection.getBaseDatabase(); + } + + @Override + public void close() { + + } + + @Override + public UserGroupInformation getUserInfo() { + return connection.getUserInfo(); + } + + @Override + public void closeQuery(QueryId queryId) { + if(connection.queryMasterMap.containsKey(queryId)) { + NettyClientBase qmClient = null; + try { + qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false); + QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub(); + queryMaster.closeQuery(null, queryId.getProto()); + } catch (Exception e) { + LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); + } finally { + connection.connPool.closeConnection(qmClient); + connection.queryMasterMap.remove(queryId); + } + } + } + + @Override + public void closeNonForwardQuery(QueryId queryId) { + NettyClientBase tmClient = null; + try { + tmClient = connection.getTajoMasterConnection(false); + TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub(); + connection.checkSessionAndGet(tmClient); + + ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder(); + + builder.setSessionId(getSessionId()); + builder.setQueryId(queryId.getProto()); + tajoMaster.closeNonForwardQuery(null, builder.build()); + } catch (Exception e) { + LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); + } finally { + connection.connPool.closeConnection(tmClient); + } + } + + @Override + public String getCurrentDatabase() throws ServiceException { + return connection.getCurrentDatabase(); + } + + @Override + public Boolean selectDatabase(String databaseName) throws ServiceException { + return connection.selectDatabase(databaseName); + } + + @Override + public Boolean updateSessionVariables(Map<String, String> variables) throws ServiceException { + return connection.updateSessionVariables(variables); + } + + @Override + public Boolean unsetSessionVariables(List<String> variables) throws ServiceException { + return connection.unsetSessionVariables(variables); + } + + @Override + public String getSessionVariable(String varname) throws ServiceException { + return getSessionVariable(varname); + } + + @Override + public Boolean existSessionVariable(String varname) throws ServiceException { + return existSessionVariable(varname); + } + + @Override + public Map<String, String> getAllSessionVariables() throws ServiceException { + return getAllSessionVariables(); + } + + @Override + public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { + + return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + + final QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + + return tajoMasterService.submitQuery(null, builder.build()); + } + }.withRetries(); + } + + @Override + public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { + + return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + + final QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(json); + builder.setIsJson(true); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + + return tajoMasterService.submitQuery(null, builder.build()); + } + }.withRetries(); + } + + @Override + public ResultSet executeQueryAndGetResult(String sql) throws ServiceException, IOException { + + ClientProtos.SubmitQueryResponse response = executeQuery(sql); + + if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getErrorTrace()); + } + + QueryId queryId = new QueryId(response.getQueryId()); + + if (response.getIsForwarded()) { + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + return this.createNullResultSet(queryId); + } else { + return this.getQueryResultAndWait(queryId); + } + + } else { + // If a non-forwarded insert into query + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) { + return this.createNullResultSet(queryId); + } else { + if (response.hasResultSet() || response.hasTableDesc()) { + return TajoClientUtil.createResultSet(connection.getConf() , this, response); + } else { + return this.createNullResultSet(queryId); + } + } + } + } + + @Override + public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { + + ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json); + + if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getErrorTrace()); + } + + QueryId queryId = new QueryId(response.getQueryId()); + + if (response.getIsForwarded()) { + + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + return this.createNullResultSet(queryId); + } else { + return this.getQueryResultAndWait(queryId); + } + + } else { + + if (response.hasResultSet() || response.hasTableDesc()) { + return TajoClientUtil.createResultSet(connection.getConf(), this, response); + } else { + return this.createNullResultSet(queryId); + } + + } + } + + private ResultSet getQueryResultAndWait(QueryId queryId) throws ServiceException, IOException { + + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + return createNullResultSet(queryId); + } + + QueryStatus status = getQueryStatus(queryId); + + while(status != null && !TajoClientUtil.isQueryComplete(status.getState())) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + status = getQueryStatus(queryId); + } + + if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { + if (status.hasResult()) { + return getQueryResult(queryId); + } else { + return createNullResultSet(queryId); + } + + } else { + LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState()); + + //TODO throw SQLException(?) + return createNullResultSet(queryId); + } + } + + @Override + public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { + + ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder(); + builder.setQueryId(queryId.getProto()); + + ClientProtos.GetQueryStatusResponse res = null; + + if(connection.queryMasterMap.containsKey(queryId)) { + NettyClientBase qmClient = null; + + try { + + qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId), + QueryMasterClientProtocol.class, false); + QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); + res = queryMasterService.getQueryStatus(null, builder.build()); + + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(qmClient); + } + + } else { + + NettyClientBase tmClient = null; + + try { + tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + builder.setSessionId(connection.sessionId); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); + + res = tajoMasterService.getQueryStatus(null, builder.build()); + + String queryMasterHost = res.getQueryMasterHost(); + + if(queryMasterHost != null && !queryMasterHost.isEmpty()) { + NettyClientBase qmClient = null; + + try { + + InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); + qmClient = connection.connPool.getConnection( + qmAddr, QueryMasterClientProtocol.class, false); + QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); + res = queryMasterService.getQueryStatus(null, builder.build()); + + connection.queryMasterMap.put(queryId, qmAddr); + + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(qmClient); + } + } + + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(tmClient); + } + } + return new QueryStatus(res); + } + + @Override + public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException { + + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + return createNullResultSet(queryId); + } + + GetQueryResultResponse response = getResultResponse(queryId); + TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc()); + connection.getConf().setVar(ConfVars.USERNAME, response.getTajoUserName()); + + return new TajoResultSet(this, queryId, connection.getConf(), tableDesc); + } + + @Override + public ResultSet createNullResultSet(QueryId queryId) throws IOException { + return new TajoResultSet(this, queryId); + } + + @Override + public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException { + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + return null; + } + + NettyClientBase client = null; + + try { + + InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId); + if(queryMasterAddr == null) { + LOG.warn("No Connection to QueryMaster for " + queryId); + return null; + } + + client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false); + QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); + + GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder(); + builder.setQueryId(queryId.getProto()); + GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build()); + + return response; + + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(client); + } + } + + @Override + public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) + throws ServiceException { + + try { + ServerCallable<ClientProtos.SerializedResultSet> callable = + new ServerCallable<ClientProtos.SerializedResultSet>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + builder.setFetchRowNum(fetchRowNum); + try { + GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); + if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { + throw new ServiceException(response.getErrorTrace()); + } + + return response.getResultSet(); + } catch (ServiceException e) { + abort(); + throw e; + } catch (Throwable t) { + throw new ServiceException(t.getMessage(), t); + } + } + }; + + ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); + + return new TajoMemoryResultSet( + new Schema(serializedResultSet.getSchema()), + serializedResultSet.getSerializedTuplesList(), + serializedResultSet.getSerializedTuplesCount()); + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } + } + + @Override + public boolean updateQuery(final String sql) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); + + if (response.getResultCode() == ClientProtos.ResultCode.OK) { + return true; + } else { + if (response.hasErrorMessage()) { + System.err.println("ERROR: " + response.getErrorMessage()); + } + return false; + } + } + }.withRetries(); + } + + @Override + public boolean updateQueryWithJson(final String json) throws ServiceException { + + return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(json); + builder.setIsJson(true); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); + if (response.getResultCode() == ClientProtos.ResultCode.OK) { + return true; + } else { + if (response.hasErrorMessage()) { + System.err.println("ERROR: " + response.getErrorMessage()); + } + return false; + } + } + }.withRetries(); + } + + @Override + public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException { + + return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); + return res.getQueryListList(); + + } + }.withRetries(); + } + + @Override + public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException { + + return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); + return res.getQueryListList(); + + } + }.withRetries(); + } + + @Override + public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException { + + return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.connPool, connection.getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { + + public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException { + + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build()); + return res.getWorkerListList(); + } + + }.withRetries(); + } + + @Override + public QueryStatus killQuery(final QueryId queryId) + throws ServiceException, IOException { + + QueryStatus status = getQueryStatus(queryId); + + NettyClientBase tmClient = null; + try { + /* send a kill to the TM */ + tmClient = connection.getTajoMasterConnection(false); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); + + connection.checkSessionAndGet(tmClient); + + ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + tajoMasterService.killQuery(null, builder.build()); + + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + while ((currentTimeMillis < timeKillIssued + 10000L) + && ((status.getState() != TajoProtos.QueryState.QUERY_KILLED) + || (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT))) { + try { + Thread.sleep(100L); + } catch(InterruptedException ie) { + break; + } + currentTimeMillis = System.currentTimeMillis(); + status = getQueryStatus(queryId); + } + + } catch(Exception e) { + LOG.debug("Error when checking for application status", e); + } finally { + connection.connPool.releaseConnection(tmClient); + } + return status; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java new file mode 100644 index 0000000..42085a2 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.TajoMasterClientProtocol; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.NetUtils; +import org.jboss.netty.channel.ConnectTimeoutException; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; +import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse; +import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; + +public class SessionConnection implements Closeable { + + private final Log LOG = LogFactory.getLog(TajoClientImpl.class); + + private final TajoConf conf; + + final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>(); + + final InetSocketAddress tajoMasterAddr; + + final RpcConnectionPool connPool; + + private final String baseDatabase; + + private final UserGroupInformation userInfo; + + volatile TajoIdProtos.SessionIdProto sessionId; + + private AtomicBoolean closed = new AtomicBoolean(false); + + + public SessionConnection(TajoConf conf) throws IOException { + this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); + } + + public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException { + this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase); + } + + public SessionConnection(InetSocketAddress addr) throws IOException { + this(new TajoConf(), addr, null); + } + + public SessionConnection(String hostname, int port, String baseDatabase) throws IOException { + this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase); + } + + /** + * Connect to TajoMaster + * + * @param conf TajoConf + * @param addr TajoMaster address + * @param baseDatabase The base database name. It is case sensitive. If it is null, + * the 'default' database will be used. + * @throws java.io.IOException + */ + public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException { + this.conf = conf; + this.conf.set("tajo.disk.scheduler.report.interval", "0"); + this.tajoMasterAddr = addr; + int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM); + // Don't share connection pool per client + connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum); + userInfo = UserGroupInformation.getCurrentUser(); + this.baseDatabase = baseDatabase != null ? baseDatabase : null; + } + + public <T> T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, + ConnectTimeoutException, ClassNotFoundException { + InetSocketAddress addr = queryMasterMap.get(queryId); + return connPool.getConnection(addr, protocolClass, asyncMode).getStub(); + } + + public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, + ConnectTimeoutException, ClassNotFoundException { + return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); + } + + public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode) + throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { + InetSocketAddress addr = queryMasterMap.get(queryId); + return connPool.getConnection(addr, protocolClass, asyncMode); + } + + public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) + throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { + return connPool.getConnection(addr, protocolClass, asyncMode); + } + + @SuppressWarnings("unused") + public void setSessionId(TajoIdProtos.SessionIdProto sessionId) { + this.sessionId = sessionId; + } + + public TajoIdProtos.SessionIdProto getSessionId() { + return sessionId; + } + + public String getBaseDatabase() { + return baseDatabase; + } + + public boolean isConnected() { + if(!closed.get()){ + try { + return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected(); + } catch (Throwable e) { + return false; + } + } + return false; + } + + public TajoConf getConf() { + return conf; + } + + public UserGroupInformation getUserInfo() { + return userInfo; + } + + public String getCurrentDatabase() throws ServiceException { + return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public String call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getCurrentDatabase(null, sessionId).getValue(); + } + }.withRetries(); + } + + public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException { + return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(variables); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSetVariables(keyValueSet.getProto()).build(); + + return tajoMasterService.updateSessionVariables(null, request).getValue(); + } + }.withRetries(); + } + + public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException { + return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .addAllUnsetVariables(variables).build(); + return tajoMasterService.updateSessionVariables(null, request).getValue(); + } + }.withRetries(); + } + + public String getSessionVariable(final String varname) throws ServiceException { + return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public String call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue(); + } + }.withRetries(); + } + + public Boolean existSessionVariable(final String varname) throws ServiceException { + return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue(); + } + }.withRetries(); + } + + public Map<String, String> getAllSessionVariables() throws ServiceException { + return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, + false, true) { + + public Map<String, String> call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + KeyValueSet keyValueSet = new KeyValueSet(tajoMasterService.getAllSessionVariables(null, sessionId)); + return keyValueSet.getAllKeyValus(); + } + }.withRetries(); + } + + public Boolean selectDatabase(final String databaseName) throws ServiceException { + return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + + public Boolean call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue(); + } + }.withRetries(); + } + + @Override + public void close() { + if(closed.getAndSet(true)){ + return; + } + + // remove session + try { + + NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); + tajoMaster.removeSession(null, sessionId); + + } catch (Throwable e) { + } + + if(connPool != null) { + connPool.shutdown(); + } + + queryMasterMap.clear(); + } + + protected InetSocketAddress getTajoMasterAddr() { + if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + return tajoMasterAddr; + } else { + if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) { + return HAServiceUtil.getMasterClientAddress(conf); + } else { + return tajoMasterAddr; + } + } + } + + protected void checkSessionAndGet(NettyClientBase client) throws ServiceException { + + if (sessionId == null) { + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); + builder.setUsername(userInfo.getUserName()).build(); + + if (baseDatabase != null) { + builder.setBaseDatabaseName(baseDatabase); + } + + CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); + + if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) { + + sessionId = response.getSessionId(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); + } + + } else { + throw new InvalidClientSessionException(response.getMessage()); + } + } + } + + ClientProtos.SessionedStringProto convertSessionedString(String str) { + ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder(); + builder.setSessionId(sessionId); + builder.setValue(str); + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 498402c..817a698 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -143,9 +143,9 @@ public class TajoAdmin { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } else if (hostName == null && port == null) { - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } switch (cmdType) { @@ -196,7 +196,7 @@ public class TajoAdmin { long end = queryInfo.getFinishTime(); long start = queryInfo.getStartTime(); String executionTime = decimalF.format((end-start) / 1000) + " sec"; - if (TajoClient.isInCompleteState(queryInfo.getState())) { + if (TajoClientUtil.isQueryComplete(queryInfo.getState())) { writer.write("Finished Time: " + df.format(queryInfo.getFinishTime())); writer.write("\n"); } @@ -434,7 +434,7 @@ public class TajoAdmin { } writer.write("\n"); } else { - String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); + String confMasterServiceAddr = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr); writer.write(masterAddress.getHostName()); writer.write("\n");
