http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index 57ee74f..4ffedcf 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -32,6 +32,8 @@ import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; import java.io.Closeable; import org.apache.tajo.catalog.exception.CatalogException; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; import java.util.Collection; import java.util.List; @@ -81,8 +83,8 @@ public interface CatalogStore extends Closeable { void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException; List<TableDescriptorProto> getAllTables() throws CatalogException; - - List<TableOptionProto> getAllTableOptions() throws CatalogException; + + List<TableOptionProto> getAllTableProperties() throws CatalogException; List<TableStatsProto> getAllTableStats() throws CatalogException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java index c209ce5..d9ec3d3 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.exception.InternalException; +import org.apache.tajo.exception.TajoInternalError; import java.sql.*; @@ -80,7 +81,7 @@ public class DerbyStore extends AbstractDBStore { stmt = getConnection().createStatement(); stmt.executeUpdate("CREATE SCHEMA " + schemaName); } catch (SQLException e) { - throw new CatalogException(e); + throw new TajoInternalError(e); } finally { CatalogUtil.closeQuietly(stmt); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 8f1ac95..5763f31 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -76,7 +76,7 @@ public class MemStore implements CatalogStore { @Override public void createTablespace(String spaceName, String spaceUri) throws CatalogException { if (tablespaces.containsKey(spaceName)) { - throw new AlreadyExistsTablespaceException(spaceName); + throw new DuplicateTablespaceException(spaceName); } tablespaces.put(spaceName, spaceUri); @@ -90,7 +90,7 @@ public class MemStore implements CatalogStore { @Override public void dropTablespace(String spaceName) throws CatalogException { if (!tablespaces.containsKey(spaceName)) { - throw new NoSuchTablespaceException(spaceName); + throw new UndefinedTablespaceException(spaceName); } tablespaces.remove(spaceName); } @@ -119,7 +119,7 @@ public class MemStore implements CatalogStore { @Override public TablespaceProto getTablespace(String spaceName) throws CatalogException { if (!tablespaces.containsKey(spaceName)) { - throw new NoSuchTablespaceException(spaceName); + throw new UndefinedTablespaceException(spaceName); } TablespaceProto.Builder builder = TablespaceProto.newBuilder(); @@ -131,7 +131,7 @@ public class MemStore implements CatalogStore { @Override public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException { if (!tablespaces.containsKey(alterProto.getSpaceName())) { - throw new NoSuchTablespaceException(alterProto.getSpaceName()); + throw new UndefinedTablespaceException(alterProto.getSpaceName()); } if (alterProto.getCommandList().size() > 0) { @@ -147,7 +147,7 @@ public class MemStore implements CatalogStore { @Override public void createDatabase(String databaseName, String tablespaceName) throws CatalogException { if (databases.containsKey(databaseName)) { - throw new AlreadyExistsDatabaseException(databaseName); + throw new DuplicateDatabaseException(databaseName); } databases.put(databaseName, new HashMap<String, CatalogProtos.TableDescProto>()); @@ -161,7 +161,7 @@ public class MemStore implements CatalogStore { @Override public void dropDatabase(String databaseName) throws CatalogException { if (!databases.containsKey(databaseName)) { - throw new NoSuchDatabaseException(databaseName); + throw new UndefinedDatabaseException(databaseName); } databases.remove(databaseName); } @@ -197,7 +197,7 @@ public class MemStore implements CatalogStore { if (databaseMap.containsKey(databaseName)) { return databaseMap.get(databaseName); } else { - throw new NoSuchDatabaseException(databaseName); + throw new UndefinedDatabaseException(databaseName); } } @@ -215,7 +215,7 @@ public class MemStore implements CatalogStore { String tbName = tableName; if (database.containsKey(tbName)) { - throw new AlreadyExistsTableException(tbName); + throw new DuplicateTableException(tbName); } database.put(tbName, request); } @@ -251,7 +251,7 @@ public class MemStore implements CatalogStore { if (database.containsKey(tbName)) { database.remove(tbName); } else { - throw new NoSuchTableException(tbName); + throw new UndefinedTableException(tbName); } } @@ -280,7 +280,7 @@ public class MemStore implements CatalogStore { switch (alterTableDescProto.getAlterTableType()) { case RENAME_TABLE: if (database.containsKey(alterTableDescProto.getNewTableName())) { - throw new AlreadyExistsTableException(alterTableDescProto.getNewTableName()); + throw new DuplicateTableException(alterTableDescProto.getNewTableName()); } // Currently, we only use the default table space (i.e., WAREHOUSE directory). String spaceUri = tablespaces.get(TajoConstants.DEFAULT_TABLESPACE_NAME); @@ -315,7 +315,7 @@ public class MemStore implements CatalogStore { partitionName = partitionDesc.getPartitionName(); if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { - throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + throw new DuplicatePartitionException(partitionName); } else { CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); builder.setPartitionName(partitionName); @@ -344,7 +344,7 @@ public class MemStore implements CatalogStore { partitionDesc = alterTableDescProto.getPartitionDesc(); partitionName = partitionDesc.getPartitionName(); if(!partitions.containsKey(tableName)) { - throw new NoSuchPartitionException(databaseName, tableName, partitionName); + throw new UndefinedPartitionException(partitionName); } else { partitions.get(tableName).remove(partitionName); } @@ -395,7 +395,7 @@ public class MemStore implements CatalogStore { builder.setSchema(schemaProto); return builder.build(); } else { - throw new NoSuchTableException(tableName); + throw new UndefinedTableException(tableName); } } @@ -439,7 +439,7 @@ public class MemStore implements CatalogStore { } @Override - public List<TableOptionProto> getAllTableOptions() throws CatalogException { + public List<TableOptionProto> getAllTableProperties() throws CatalogException { List<TableOptionProto> optionList = new ArrayList<CatalogProtos.TableOptionProto>(); int tid = 0; @@ -534,7 +534,7 @@ public class MemStore implements CatalogStore { CatalogProtos.TableDescProto table = database.get(tableName); return table.hasPartition() ? table.getPartition() : null; } else { - throw new NoSuchTableException(tableName); + throw new UndefinedTableException(tableName); } } @@ -547,7 +547,7 @@ public class MemStore implements CatalogStore { CatalogProtos.TableDescProto table = database.get(tableName); return table.hasPartition(); } else { - throw new NoSuchTableException(tableName); + throw new UndefinedTableException(tableName); } } @@ -574,7 +574,7 @@ public class MemStore implements CatalogStore { if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { return partitions.get(tableName).get(partitionName); } else { - throw new NoSuchPartitionException(partitionName); + throw new UndefinedPartitionException(partitionName); } } @@ -614,7 +614,7 @@ public class MemStore implements CatalogStore { Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); if (index.containsKey(proto.getIndexName())) { - throw new AlreadyExistsIndexException(proto.getIndexName()); + throw new DuplicateIndexException(proto.getIndexName()); } index.put(proto.getIndexName(), proto); @@ -629,7 +629,7 @@ public class MemStore implements CatalogStore { public void dropIndex(String databaseName, String indexName) throws CatalogException { Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName); if (!index.containsKey(indexName)) { - throw new NoSuchIndexException(indexName); + throw new UndefinedIndexException(indexName); } index.remove(indexName); } @@ -641,7 +641,7 @@ public class MemStore implements CatalogStore { public IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException { Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName); if (!index.containsKey(indexName)) { - throw new NoSuchIndexException(indexName); + throw new UndefinedIndexException(indexName); } return index.get(indexName); @@ -656,7 +656,7 @@ public class MemStore implements CatalogStore { Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName); if (!indexByColumn.containsKey(columnName)) { - throw new NoSuchIndexException(columnName); + throw new UndefinedIndexException(CatalogUtil.buildFQName(databaseName, tableName), columnName); } return indexByColumn.get(columnName); http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java index 9173cb8..dd8e2a2 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java @@ -55,6 +55,7 @@ import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.store.object.*; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.util.TUtil; public class XMLCatalogSchemaManager { @@ -71,7 +72,7 @@ public class XMLCatalogSchemaManager { loadFromXmlFiles(); } catch (Exception e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } } @@ -102,7 +103,7 @@ public class XMLCatalogSchemaManager { public void dropBaseSchema(Connection conn) throws CatalogException { if (!isLoaded()) { - throw new CatalogException("Schema files are not loaded yet."); + throw new TajoInternalError("Schema files are not loaded yet."); } List<DatabaseObject> failedObjects = new ArrayList<DatabaseObject>(); @@ -111,7 +112,7 @@ public class XMLCatalogSchemaManager { try { stmt = conn.createStatement(); } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } for (DatabaseObject object: catalogStore.getSchema().getObjects()) { @@ -230,7 +231,7 @@ public class XMLCatalogSchemaManager { pstmt = getExistQuery(conn, type); if (pstmt == null) { - throw new CatalogException("Finding " + type + throw new TajoInternalError("Finding " + type + " type of database object is not supported on this database system."); } @@ -274,13 +275,13 @@ public class XMLCatalogSchemaManager { Statement stmt; if (!isLoaded()) { - throw new CatalogException("Database schema files are not loaded."); + throw new TajoInternalError("Database schema files are not loaded."); } try { stmt = conn.createStatement(); } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } for (DatabaseObject object: catalogStore.getSchema().getObjects()) { @@ -304,7 +305,7 @@ public class XMLCatalogSchemaManager { LOG.info(object.getName() + " " + object.getType() + " is created."); } } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } } @@ -313,7 +314,7 @@ public class XMLCatalogSchemaManager { public void upgradeBaseSchema(Connection conn, int currentVersion) { if (!isLoaded()) { - throw new CatalogException("Database schema files are not loaded."); + throw new TajoInternalError("Database schema files are not loaded."); } final List<SchemaPatch> candidatePatches = new ArrayList<SchemaPatch>(); @@ -329,7 +330,7 @@ public class XMLCatalogSchemaManager { try { stmt = conn.createStatement(); } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } for (SchemaPatch patch: candidatePatches) { @@ -338,7 +339,7 @@ public class XMLCatalogSchemaManager { stmt.executeUpdate(object.getSql()); LOG.info(object.getName() + " " + object.getType() + " was created or altered."); } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } } } @@ -369,14 +370,14 @@ public class XMLCatalogSchemaManager { } } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } return result; } public boolean isInitialized(Connection conn) throws CatalogException { if (!isLoaded()) { - throw new CatalogException("Database schema files are not loaded."); + throw new TajoInternalError("Database schema files are not loaded."); } boolean result = true; @@ -390,7 +391,7 @@ public class XMLCatalogSchemaManager { result &= checkExistence(conn, object.getType(), object.getName()); } } catch (SQLException e) { - throw new CatalogException(e.getMessage(), e); + throw new TajoInternalError(e); } if (!result) { @@ -490,7 +491,7 @@ public class XMLCatalogSchemaManager { protected void mergeXmlSchemas(final List<StoreObject> storeObjects) throws CatalogException { if (storeObjects.size() <= 0) { - throw new CatalogException("Unable to find a schema file."); + throw new TajoInternalError("Unable to find a schema file."); } this.catalogStore = new StoreObjectsMerger(storeObjects).merge(); @@ -571,7 +572,7 @@ public class XMLCatalogSchemaManager { !targetStore.getSchema().getSchemaName().isEmpty() && !targetStore.getSchema().getSchemaName().equalsIgnoreCase( sourceStore.getSchema().getSchemaName())) { - throw new CatalogException("different schema names are specified. One is " + + throw new TajoInternalError("different schema names are specified. One is " + sourceStore.getSchema().getSchemaName() + " and other is " + targetStore.getSchema().getSchemaName()); } @@ -610,7 +611,7 @@ public class XMLCatalogSchemaManager { int objIdx = object.getOrder(); if (objIdx < orderedObjects.size() && orderedObjects.get(objIdx) != null) { - throw new CatalogException("This catalog configuration contains duplicated order of DatabaseObject"); + throw new TajoInternalError("This catalog configuration contains duplicated order of DatabaseObject"); } orderedObjects.add(objIdx, object); @@ -637,7 +638,7 @@ public class XMLCatalogSchemaManager { protected void validatePatch(List<SchemaPatch> patches, SchemaPatch testPatch) { if (testPatch.getPriorVersion() > testPatch.getNextVersion()) { - throw new CatalogException("Prior version cannot proceed to next version of patch."); + throw new TajoInternalError("Prior version cannot proceed to next version of patch."); } for (SchemaPatch patch: patches) { @@ -649,7 +650,7 @@ public class XMLCatalogSchemaManager { LOG.warn("It has the same prior version (" + testPatch.getPriorVersion() + ") of patch."); if (testPatch.getNextVersion() == patch.getNextVersion()) { - throw new CatalogException("Duplicate versions of patch found. It will terminate Catalog Store. "); + throw new TajoInternalError("Duplicate versions of patch found. It will terminate Catalog Store. "); } } @@ -687,7 +688,7 @@ public class XMLCatalogSchemaManager { } if (occurredCount > 1) { - throw new CatalogException("Duplicate Query type (" + testQuery.getType() + ") has found."); + throw new TajoInternalError("Duplicate Query type (" + testQuery.getType() + ") has found."); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index 2d52e9c..6020e83 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.CatalogException; -import org.apache.tajo.catalog.exception.NoSuchFunctionException; +import org.apache.tajo.catalog.exception.UndefinedFunctionException; import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.store.PostgreSQLStore; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -39,6 +39,7 @@ import org.apache.tajo.catalog.store.OracleStore; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.function.Function; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -93,7 +94,7 @@ public class TestCatalog { // MySQLStore/MariaDB/PostgreSQL requires username (and password). if (isConnectionIdRequired(driverClass)) { if (connectionId == null) { - throw new CatalogException(String.format("%s driver requires %s", driverClass, CatalogConstants.CONNECTION_ID)); + throw new TajoInternalError(String.format("%s driver requires %s", driverClass, CatalogConstants.CONNECTION_ID)); } conf.set(CatalogConstants.CONNECTION_ID, connectionId); if (password != null) { @@ -694,7 +695,7 @@ public class TestCatalog { assertFalse(catalog.containFunction("test123", CatalogUtil.newSimpleDataTypeArray(Type.INT4))); catalog.getFunction("test123", CatalogUtil.newSimpleDataTypeArray(Type.INT4)); fail(); - } catch (NoSuchFunctionException nsfe) { + } catch (UndefinedFunctionException nsfe) { // succeed test } catch (Throwable e) { fail(e.getMessage()); @@ -1102,7 +1103,7 @@ public class TestCatalog { assertEquals(retrieved.getParamTypes()[1] , CatalogUtil.newSimpleDataType(Type.INT4)); } - @Test(expected=NoSuchFunctionException.class) + @Test(expected=UndefinedFunctionException.class) public final void testFindIntInvalidFunc() throws Exception { assertFalse(catalog.containFunction("testintinvalid", FunctionType.GENERAL)); FunctionDesc meta = new FunctionDesc("testintinvalid", TestIntFunc.class, FunctionType.GENERAL, @@ -1131,7 +1132,7 @@ public class TestCatalog { assertEquals(retrieved.getParamTypes()[1] , CatalogUtil.newSimpleDataType(Type.INT4)); } - @Test(expected=NoSuchFunctionException.class) + @Test(expected=UndefinedFunctionException.class) public final void testFindFloatInvalidFunc() throws Exception { assertFalse(catalog.containFunction("testfloatinvalid", FunctionType.GENERAL)); FunctionDesc meta = new FunctionDesc("testfloatinvalid", TestFloatFunc.class, FunctionType.GENERAL, http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 76ba7a9..77a7000 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -405,13 +405,19 @@ public class TajoAdmin { public void processKill(Writer writer, String queryIdStr) throws IOException, ServiceException { - QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); - if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { - writer.write(queryIdStr + " is killed successfully.\n"); - } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { - writer.write(queryIdStr + " will be finished after a while.\n"); - } else { - writer.write("ERROR:" + status.getErrorMessage()); + + try { + QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); + + if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { + writer.write(queryIdStr + " is killed successfully.\n"); + } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { + writer.write(queryIdStr + " will be finished after a while.\n"); + } else { + writer.write("ERROR:" + status.getErrorMessage()); + } + } catch (SQLException e) { + writer.write("ERROR:" + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 86046c4..bc5fa7a 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -31,6 +31,7 @@ import org.apache.tajo.cli.tsql.ParsedResult.StatementType; import org.apache.tajo.cli.tsql.SimpleParser.ParsingState; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; +import org.apache.tajo.exception.ReturnStateUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; @@ -279,7 +280,7 @@ public class TajoCli { addShutdownHook(); } - private void processConfVarCommand(String[] confCommands) throws ServiceException { + private void processConfVarCommand(String[] confCommands) throws SQLException { for (String eachParam: confCommands) { String[] tokens = eachParam.split("="); if (tokens.length != 2) { @@ -292,7 +293,7 @@ public class TajoCli { } } - private void processSessionVarCommand(String[] confCommands) throws ServiceException { + private void processSessionVarCommand(String[] confCommands) throws SQLException { for (String eachParam: confCommands) { String[] tokens = eachParam.split("="); if (tokens.length != 2) { @@ -484,13 +485,13 @@ public class TajoCli { return 0; } - private void executeJsonQuery(String json) throws ServiceException, IOException { + private void executeJsonQuery(String json) throws SQLException { long startTime = System.currentTimeMillis(); ClientProtos.SubmitQueryResponse response = client.executeQueryWithJson(json); if (response == null) { onError("response is null", null); - } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { + } else if (ReturnStateUtil.isSuccess(response.getState())) { if (response.getIsForwarded()) { QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); @@ -503,13 +504,13 @@ public class TajoCli { } } } else { - if (response.hasErrorMessage()) { - onError(response.getErrorMessage(), null); + if (ReturnStateUtil.isError(response.getState())) { + onError(response.getState().getMessage(), null); } } } - private int executeQuery(String statement) throws ServiceException, IOException { + private int executeQuery(String statement) throws SQLException { long startTime = System.currentTimeMillis(); ClientProtos.SubmitQueryResponse response = null; @@ -519,22 +520,22 @@ public class TajoCli { onError(null, te); } - if (response == null) { - onError("response is null", null); - } else if (response.getResultCode() == ClientProtos.ResultCode.OK) { - if (response.getIsForwarded()) { - QueryId queryId = new QueryId(response.getQueryId()); - waitForQueryCompleted(queryId); - } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - displayFormatter.printMessage(sout, "OK"); + if (response != null) { + if (ReturnStateUtil.isSuccess(response.getState())) { + if (response.getIsForwarded()) { + QueryId queryId = new QueryId(response.getQueryId()); + waitForQueryCompleted(queryId); } else { - localQueryCompleted(response, startTime); + if (!response.hasTableDesc() && !response.hasResultSet()) { + displayFormatter.printMessage(sout, "OK"); + } else { + localQueryCompleted(response, startTime); + } + } + } else { + if (ReturnStateUtil.isError(response.getState())) { + onError(response.getState().getMessage(), null); } - } - } else { - if (response.hasErrorMessage()) { - onError(response.getErrorMessage(), null); } } @@ -570,7 +571,7 @@ public class TajoCli { } } - private void waitForQueryCompleted(QueryId queryId) { + private void waitForQueryCompleted(QueryId queryId) throws SQLException { // if query is empty string if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return; @@ -661,7 +662,11 @@ public class TajoCli { if (t == null) { displayFormatter.printErrorMessage(sout, message); } else { - displayFormatter.printErrorMessage(sout, t); + if (t instanceof SQLException) { + displayFormatter.printErrorMessage(sout, t.getMessage()); + } else { + displayFormatter.printErrorMessage(sout, t); + } } if (reconnect && (t instanceof InvalidClientSessionException || (message != null && message.startsWith("org.apache.tajo.session.InvalidSessionException")))) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/ConnectDatabaseCommand.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/ConnectDatabaseCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/ConnectDatabaseCommand.java index ae644bd..e75171d 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/ConnectDatabaseCommand.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/ConnectDatabaseCommand.java @@ -21,6 +21,8 @@ package org.apache.tajo.cli.tsql.commands; import com.google.protobuf.ServiceException; import org.apache.tajo.cli.tsql.TajoCli; +import java.sql.SQLException; + public class ConnectDatabaseCommand extends TajoShellCommand { public ConnectDatabaseCommand(TajoCli.TajoCliContext context) { @@ -49,7 +51,7 @@ public class ConnectDatabaseCommand extends TajoShellCommand { context.getOutput().write(String.format("You are now connected to database \"%s\" as user \"%s\".%n", context.getCurrentDatabase(), client.getUserInfo().getUserName())); } - } catch (ServiceException se) { + } catch (SQLException se) { if (se.getMessage() != null) { context.getOutput().write(se.getMessage()); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/SetCommand.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/SetCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/SetCommand.java index 21c4be5..c1c286d 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/SetCommand.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/SetCommand.java @@ -23,6 +23,7 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.cli.tsql.TajoCli; import org.apache.tajo.util.StringUtils; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; @@ -39,20 +40,20 @@ public class SetCommand extends TajoShellCommand { return "\\set"; } - private void showAllSessionVars() throws ServiceException { + private void showAllSessionVars() throws SQLException { for (Map.Entry<String, String> entry: client.getAllSessionVariables().entrySet()) { context.getOutput().println(StringUtils.quote(entry.getKey()) + "=" + StringUtils.quote(entry.getValue())); } } - private void updateSessionVariable(String key, String val) throws ServiceException { + private void updateSessionVariable(String key, String val) throws SQLException { Map<String, String> variables = new HashMap<String, String>(); variables.put(key, val); client.updateSessionVariables(variables); } - public void set(String key, String val) throws ServiceException { - SessionVars sessionVar = null; + public void set(String key, String val) throws SQLException { + SessionVars sessionVar; if (SessionVars.exists(key)) { // if the variable is one of the session variables sessionVar = SessionVars.get(key); http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java index 1512b24..c020ef5 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java @@ -18,7 +18,6 @@ package org.apache.tajo.client; -import com.google.protobuf.ServiceException; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; @@ -37,27 +36,27 @@ public interface CatalogAdminClient extends Closeable { * * @param databaseName The database name to be created. This name is case sensitive. * @return True if created successfully. - * @throws com.google.protobuf.ServiceException + * @throws java.sql.SQLException */ - public boolean createDatabase(final String databaseName) throws ServiceException; + boolean createDatabase(final String databaseName) throws SQLException; /** * Does the database exist? * * @param databaseName The database name to be checked. This name is case sensitive. * @return True if so. - * @throws ServiceException + * @throws java.sql.SQLException */ - public boolean existDatabase(final String databaseName) throws ServiceException; + boolean existDatabase(final String databaseName) throws SQLException; /** * Drop the database * * @param databaseName The database name to be dropped. This name is case sensitive. * @return True if the database is dropped successfully. - * @throws ServiceException + * @throws java.sql.SQLException */ - public boolean dropDatabase(final String databaseName) throws ServiceException; + boolean dropDatabase(final String databaseName) throws SQLException; - public List<String> getAllDatabaseNames() throws ServiceException; + List<String> getAllDatabaseNames() throws SQLException; /** * Does the table exist? @@ -65,7 +64,7 @@ public interface CatalogAdminClient extends Closeable { * @param tableName The table name to be checked. This name is case sensitive. * @return True if so. */ - public boolean existTable(final String tableName) throws ServiceException; + boolean existTable(final String tableName) throws SQLException; /** * Create an external table. @@ -77,10 +76,9 @@ public interface CatalogAdminClient extends Closeable { * @param meta Table meta * @return the created table description. * @throws java.sql.SQLException - * @throws ServiceException */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, - final TableMeta meta) throws SQLException, ServiceException; + TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, + final TableMeta meta) throws SQLException; /** * Create an external table. @@ -92,20 +90,20 @@ public interface CatalogAdminClient extends Closeable { * @param meta Table meta * @param partitionMethodDesc Table partition description * @return the created table description. - * @throws SQLException - * @throws ServiceException + * @throws java.sql.SQLException */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, + TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) - throws SQLException, ServiceException; + throws SQLException; /** * Drop a table * * @param tableName The table name to be dropped. This name is case sensitive. * @return True if the table is dropped successfully. + * @throws java.sql.SQLException */ - public boolean dropTable(final String tableName) throws ServiceException; + boolean dropTable(final String tableName) throws SQLException; /** * Drop a table. @@ -113,8 +111,9 @@ public interface CatalogAdminClient extends Closeable { * @param tableName The table name to be dropped. This name is case sensitive. * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents. * @return True if the table is dropped successfully. + * @throws java.sql.SQLException */ - public boolean dropTable(final String tableName, final boolean purge) throws ServiceException; + boolean dropTable(final String tableName, final boolean purge) throws SQLException; /** * Get a list of table names. @@ -122,16 +121,18 @@ public interface CatalogAdminClient extends Closeable { * @param databaseName The database name to show all tables. This name is case sensitive. * If it is null, this method will show all tables * in the current database of this session. + * @throws java.sql.SQLException */ - public List<String> getTableList(@Nullable final String databaseName) throws ServiceException; + List<String> getTableList(@Nullable final String databaseName) throws SQLException; /** * Get a table description * * @param tableName The table name to get. This name is case sensitive. * @return Table description + * @throws java.sql.SQLException */ - public TableDesc getTableDesc(final String tableName) throws ServiceException; + TableDesc getTableDesc(final String tableName) throws SQLException; - public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException; + List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws SQLException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 1fe856a..e73a032 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -26,169 +26,197 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FunctionDescProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableResponse; +import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.ipc.ClientProtos.SessionedStringProto; -import org.apache.tajo.jdbc.SQLStates; +import org.apache.tajo.ipc.ClientProtos.DropTableRequest; import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; import java.io.IOException; import java.net.URI; import java.sql.SQLException; import java.util.List; +import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; +import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError; import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; public class CatalogAdminClientImpl implements CatalogAdminClient { - private final SessionConnection connection; + private final SessionConnection conn; - public CatalogAdminClientImpl(SessionConnection connection) { - this.connection = connection; + public CatalogAdminClientImpl(SessionConnection conn) { + this.conn = conn; } @Override - public boolean createDatabase(final String databaseName) throws ServiceException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMaster = client.getStub(); - return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + public boolean createDatabase(final String databaseName) throws SQLException { + + final BlockingInterface stub = conn.getTMStub(); + + try { + return isSuccess(stub.createDatabase(null, conn.getSessionedString(databaseName))); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override - public boolean existDatabase(final String databaseName) throws ServiceException { + public boolean existDatabase(final String databaseName) throws SQLException { + + final BlockingInterface stub = conn.getTMStub(); - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMaster = client.getStub(); - return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + try { + return isSuccess(stub.existDatabase(null, conn.getSessionedString(databaseName))); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override - public boolean dropDatabase(final String databaseName) throws ServiceException { + public boolean dropDatabase(final String databaseName) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue(); + final BlockingInterface stub = conn.getTMStub(); + + try { + return isSuccess(stub.dropDatabase(null, conn.getSessionedString(databaseName))); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override - public List<String> getAllDatabaseNames() throws ServiceException { + public List<String> getAllDatabaseNames() throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList(); + final BlockingInterface stub = conn.getTMStub(); + + try { + return stub.getAllDatabases(null, conn.sessionId).getValuesList(); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } - public boolean existTable(final String tableName) throws ServiceException { + public boolean existTable(final String tableName) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue(); + final BlockingInterface stub = conn.getTMStub(); + + try { + return isSuccess(stub.existTable(null, conn.getSessionedString(tableName))); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override public TableDesc createExternalTable(String tableName, Schema schema, URI path, TableMeta meta) - throws SQLException, ServiceException { + throws SQLException { return createExternalTable(tableName, schema, path, meta, null); } public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) - throws SQLException, ServiceException { + throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); + NettyClientBase client = conn.getTajoMasterConnection(); + conn.checkSessionAndGet(client); BlockingInterface tajoMasterService = client.getStub(); ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); - builder.setSessionId(connection.sessionId); + builder.setSessionId(conn.sessionId); builder.setName(tableName); builder.setSchema(schema.getProto()); builder.setMeta(meta.getProto()); builder.setPath(path.toString()); + if (partitionMethodDesc != null) { builder.setPartition(partitionMethodDesc.getProto()); } - ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); + + TableResponse res; + try { + res = tajoMasterService.createExternalTable(null, builder.build()); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + + if (isSuccess(res.getState())) { + return CatalogUtil.newTableDesc(res.getTable()); } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + throw SQLExceptionUtil.toSQLException(res.getState()); } } @Override - public boolean dropTable(String tableName) throws ServiceException { + public boolean dropTable(String tableName) throws SQLException { return dropTable(tableName, false); } @Override - public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { - - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setName(tableName); - builder.setPurge(purge); - return tajoMasterService.dropTable(null, builder.build()).getValue(); - + public boolean dropTable(final String tableName, final boolean purge) throws SQLException { + + final BlockingInterface stub = conn.getTMStub(); + final DropTableRequest request = DropTableRequest.newBuilder() + .setSessionId(conn.sessionId) + .setName(tableName) + .setPurge(purge) + .build(); + + try { + return isSuccess(stub.dropTable(null, request)); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override - public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { + public List<String> getTableList(@Nullable final String databaseName) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); - SessionedStringProto.Builder builder = SessionedStringProto.newBuilder(); - builder.setSessionId(connection.sessionId); - if (databaseName != null) { - builder.setValue(databaseName); + StringListResponse response; + try { + response = stub.getTableList(null, conn.getSessionedString(databaseName)); + } catch (ServiceException e) { + throw new RuntimeException(e); } - PrimitiveProtos.StringListProto res = tajoMasterService.getTableList(null, builder.build()); - return res.getValuesList(); + + throwIfError(response.getState()); + return response.getValuesList(); } @Override - public TableDesc getTableDesc(final String tableName) throws ServiceException { + public TableDesc getTableDesc(final String tableName) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); - SessionedStringProto.Builder builder = SessionedStringProto.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setValue(tableName); - ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState())); + TableResponse res; + try { + res = stub.getTableDesc(null, conn.getSessionedString(tableName)); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(res.getState()); + return CatalogUtil.newTableDesc(res.getTable()); } @Override - public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { + public List<FunctionDescProto> getFunctions(final String functionName) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); String paramFunctionName = functionName == null ? "" : functionName; - ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, - connection.convertSessionedString(paramFunctionName)); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { - return res.getFunctionsList(); - } else { - throw new ServiceException(new SQLException(res.getErrorMessage())); + CatalogProtos.FunctionListResponse res; + try { + res = stub.getFunctionList(null, conn.getSessionedString(paramFunctionName)); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(res.getState()); + return res.getFunctionList(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java index 39b5fc3..ffe3d96 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java @@ -24,11 +24,13 @@ import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; +import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import org.apache.tajo.jdbc.TajoMemoryResultSet; import java.io.Closeable; import java.io.IOException; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -36,55 +38,53 @@ import static org.apache.tajo.TajoIdProtos.SessionIdProto; public interface QueryClient extends Closeable { - public void setSessionId(SessionIdProto sessionId); + boolean isConnected(); - public boolean isConnected(); + String getSessionId(); - public SessionIdProto getSessionId(); + Map<String, String> getClientSideSessionVars(); - public Map<String, String> getClientSideSessionVars(); - - public String getBaseDatabase(); + String getBaseDatabase(); - public void setMaxRows(int maxRows); + void setMaxRows(int maxRows); - public int getMaxRows(); + int getMaxRows(); @Override - public void close(); + void close(); - public UserRoleInfo getUserInfo(); + UserRoleInfo getUserInfo(); /** * Call to QueryMaster closing query resources * @param queryId */ - public void closeQuery(final QueryId queryId); + void closeQuery(final QueryId queryId) throws SQLException; - public void closeNonForwardQuery(final QueryId queryId); + void closeNonForwardQuery(final QueryId queryId) throws SQLException; - public String getCurrentDatabase() throws ServiceException; + String getCurrentDatabase() throws SQLException; - public Boolean selectDatabase(final String databaseName) throws ServiceException; + Boolean selectDatabase(final String databaseName) throws SQLException; - public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException; + Map<String, String> updateSessionVariables(final Map<String, String> variables) throws SQLException; - public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException; + Map<String, String> unsetSessionVariables(final List<String> variables) throws SQLException; - public String getSessionVariable(final String varname) throws ServiceException; + String getSessionVariable(final String varname) throws SQLException; - public Boolean existSessionVariable(final String varname) throws ServiceException; + Boolean existSessionVariable(final String varname) throws SQLException; - public Map<String, String> getAllSessionVariables() throws ServiceException; + Map<String, String> getAllSessionVariables() throws SQLException; /** * It submits a query statement and get a response immediately. * The response only contains a query id, and submission status. * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}. */ - public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException; + SubmitQueryResponse executeQuery(final String sql) throws SQLException; - public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException; + SubmitQueryResponse executeQueryWithJson(final String json) throws SQLException; /** * It submits a query statement and get a response. @@ -94,33 +94,33 @@ public interface QueryClient extends Closeable { * * @return If failed, return null. */ - public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException; + ResultSet executeQueryAndGetResult(final String sql) throws SQLException; - public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException; + ResultSet executeJsonQueryAndGetResult(final String json) throws SQLException; - public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException; + QueryStatus getQueryStatus(QueryId queryId) throws SQLException; - public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException; + ResultSet getQueryResult(QueryId queryId) throws SQLException; - public ResultSet createNullResultSet(QueryId queryId) throws IOException; + ResultSet createNullResultSet(QueryId queryId) throws SQLException; - public ClientProtos.GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException; + ClientProtos.GetQueryResultResponse getResultResponse(QueryId queryId) throws SQLException; - public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException; + TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws SQLException; - public boolean updateQuery(final String sql) throws ServiceException; + boolean updateQuery(final String sql) throws SQLException; - public boolean updateQueryWithJson(final String json) throws ServiceException; + boolean updateQueryWithJson(final String json) throws SQLException; - public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException; + List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws SQLException; - public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException; + List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws SQLException; - public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException; + List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws SQLException; - public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException; + QueryStatus killQuery(final QueryId queryId) throws SQLException; - public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException; + QueryInfoProto getQueryInfo(final QueryId queryId) throws SQLException; - public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException; + QueryHistoryProto getQueryHistory(final QueryId queryId) throws SQLException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index da10f55..80a49c2 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -22,66 +22,67 @@ import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.*; +import org.apache.tajo.TajoIdProtos.SessionIdProto; import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.QueryMasterClientProtocol; +import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.util.ProtoUtil; -import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; +import static org.apache.tajo.exception.ReturnStateUtil.returnError; +import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError; import static org.apache.tajo.ipc.ClientProtos.*; import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; -import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; public class QueryClientImpl implements QueryClient { private static final Log LOG = LogFactory.getLog(QueryClientImpl.class); - private final SessionConnection connection; + private final SessionConnection conn; private final int defaultFetchRows; -//maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. + // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. private int maxRows; - public QueryClientImpl(SessionConnection connection) { - this.connection = connection; - this.defaultFetchRows = this.connection.getProperties().getInt(SessionVars.FETCH_ROWNUM.getConfVars().keyname(), + public QueryClientImpl(SessionConnection conn) { + this.conn = conn; + this.defaultFetchRows = this.conn.getProperties().getInt(SessionVars.FETCH_ROWNUM.getConfVars().keyname(), SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal); this.maxRows = 0; } @Override - public void setSessionId(TajoIdProtos.SessionIdProto sessionId) { - connection.setSessionId(sessionId); - } - - @Override public boolean isConnected() { - return connection.isConnected(); + return conn.isConnected(); } @Override - public TajoIdProtos.SessionIdProto getSessionId() { - return connection.getSessionId(); + public String getSessionId() { + return conn.getSessionId(); } @Override public Map<String, String> getClientSideSessionVars() { - return connection.getClientSideSessionVars(); + return conn.getClientSideSessionVars(); } @Override public String getBaseDatabase() { - return connection.getBaseDatabase(); + return conn.getBaseDatabase(); } @Override @@ -90,114 +91,95 @@ public class QueryClientImpl implements QueryClient { @Override public UserRoleInfo getUserInfo() { - return connection.getUserInfo(); + return conn.getUserInfo(); } @Override - public void closeQuery(QueryId queryId) { + public void closeQuery(QueryId queryId) throws SQLException { closeNonForwardQuery(queryId); } @Override - public void closeNonForwardQuery(QueryId queryId) { - NettyClientBase tmClient = null; + public void closeNonForwardQuery(QueryId queryId) throws SQLException { try { - tmClient = connection.getTajoMasterConnection(); - TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub(); - connection.checkSessionAndGet(tmClient); - - ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder(); - - builder.setSessionId(getSessionId()); - builder.setQueryId(queryId.getProto()); - tajoMaster.closeNonForwardQuery(null, builder.build()); - } catch (Exception e) { - LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); + throwIfError(conn.getTMStub().closeNonForwardQuery(null, buildQueryIdRequest(queryId))); + } catch (ServiceException e) { + throw new RuntimeException(e); } } @Override - public String getCurrentDatabase() throws ServiceException { - return connection.getCurrentDatabase(); + public String getCurrentDatabase() throws SQLException { + return conn.getCurrentDatabase(); } @Override - public Boolean selectDatabase(String databaseName) throws ServiceException { - return connection.selectDatabase(databaseName); + public Boolean selectDatabase(String databaseName) throws SQLException { + return conn.selectDatabase(databaseName); } @Override - public Map<String, String> updateSessionVariables(Map<String, String> variables) throws ServiceException { - return connection.updateSessionVariables(variables); + public Map<String, String> updateSessionVariables(Map<String, String> variables) throws SQLException { + return conn.updateSessionVariables(variables); } @Override - public Map<String, String> unsetSessionVariables(List<String> variables) throws ServiceException { - return connection.unsetSessionVariables(variables); + public Map<String, String> unsetSessionVariables(List<String> variables) throws SQLException { + return conn.unsetSessionVariables(variables); } @Override - public String getSessionVariable(String varname) throws ServiceException { - return connection.getSessionVariable(varname); + public String getSessionVariable(String varname) throws SQLException { + return conn.getSessionVariable(varname); } @Override - public Boolean existSessionVariable(String varname) throws ServiceException { - return connection.existSessionVariable(varname); + public Boolean existSessionVariable(String varname) throws SQLException { + return conn.existSessionVariable(varname); } @Override - public Map<String, String> getAllSessionVariables() throws ServiceException { - return connection.getAllSessionVariables(); + public Map<String, String> getAllSessionVariables() throws SQLException { + return conn.getAllSessionVariables(); } @Override - public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); + public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws SQLException { - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); + final QueryRequest request = buildQueryRequest(sql, false); + SubmitQueryResponse response; + try { + response = stub.submitQuery(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); + } - SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + if (isSuccess(response.getState())) { + conn.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); } + return response; } @Override - public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { - - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); + public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws SQLException { + final BlockingInterface stub = conn.getTMStub(); + final QueryRequest request = buildQueryRequest(json, true); - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(json); - builder.setIsJson(true); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - - return tajoMasterService.submitQuery(null, builder.build()); + try { + return stub.submitQuery(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); + } } @Override - public ResultSet executeQueryAndGetResult(String sql) throws ServiceException, IOException { + public ResultSet executeQueryAndGetResult(String sql) throws SQLException { ClientProtos.SubmitQueryResponse response = executeQuery(sql); - - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - if (response.hasErrorMessage()) { - throw new ServiceException(response.getErrorMessage()); - } else if (response.hasErrorTrace()) { - throw new ServiceException(response.getErrorTrace()); - } - } + throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); @@ -223,16 +205,12 @@ public class QueryClientImpl implements QueryClient { } @Override - public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { + public ResultSet executeJsonQueryAndGetResult(final String json) throws SQLException { ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json); - - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); - } + throwIfError(response.getState()); QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded()) { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { @@ -252,7 +230,7 @@ public class QueryClientImpl implements QueryClient { } } - private ResultSet getQueryResultAndWait(QueryId queryId) throws ServiceException, IOException { + private ResultSet getQueryResultAndWait(QueryId queryId) throws SQLException { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); @@ -276,225 +254,210 @@ public class QueryClientImpl implements QueryClient { } @Override - public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { + public QueryStatus getQueryStatus(QueryId queryId) throws SQLException { - ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder(); - builder.setQueryId(queryId.getProto()); + final BlockingInterface stub = conn.getTMStub(); + final GetQueryStatusRequest request = GetQueryStatusRequest.newBuilder() + .setSessionId(conn.sessionId) + .setQueryId(queryId.getProto()) + .build(); - GetQueryStatusResponse res = null; - - NettyClientBase tmClient = null; + GetQueryStatusResponse res; try { - tmClient = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(tmClient); - builder.setSessionId(connection.sessionId); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - - res = tajoMasterService.getQueryStatus(null, builder.build()); - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); + res = stub.getQueryStatus(null, request); + } catch (ServiceException t) { + throw new RuntimeException(t); } + + throwIfError(res.getState()); return new QueryStatus(res); } @Override - public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException { + public ResultSet getQueryResult(QueryId queryId) throws SQLException { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return createNullResultSet(queryId); } GetQueryResultResponse response = getResultResponse(queryId); + throwIfError(response.getState()); TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc()); return new FetchResultSet(this, tableDesc.getLogicalSchema(), queryId, defaultFetchRows); } @Override - public ResultSet createNullResultSet(QueryId queryId) throws IOException { + public ResultSet createNullResultSet(QueryId queryId) { return TajoClientUtil.createNullResultSet(queryId); } @Override - public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException { + public GetQueryResultResponse getResultResponse(QueryId queryId) throws SQLException { if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { return null; } - NettyClientBase tmClient = null; + final BlockingInterface stub = conn.getTMStub(); + final GetQueryResultRequest request = GetQueryResultRequest.newBuilder() + .setQueryId(queryId.getProto()) + .setSessionId(conn.sessionId) + .build(); + GetQueryResultResponse response; try { - - tmClient = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(tmClient); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - - GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder(); - builder.setQueryId(queryId.getProto()); - builder.setSessionId(connection.sessionId); - GetQueryResultResponse response = tajoMasterService.getQueryResult(null,builder.build()); - - return response; - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); + response = stub.getQueryResult(null, request); + } catch (ServiceException t) { + throw new RuntimeException(t); } + + throwIfError(response.getState()); + return response; } @Override - public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) - throws ServiceException { + public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws SQLException { - try { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - builder.setFetchRowNum(fetchRowNum); - - GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorMessage()); - } + final BlockingInterface stub = conn.getTMStub(); + final GetQueryResultDataRequest request = GetQueryResultDataRequest.newBuilder() + .setSessionId(conn.sessionId) + .setQueryId(queryId.getProto()) + .setFetchRowNum(fetchRowNum) + .build(); - ClientProtos.SerializedResultSet resultSet = response.getResultSet(); - return new TajoMemoryResultSet(queryId, - new Schema(resultSet.getSchema()), - resultSet.getSerializedTuplesList(), - resultSet.getSerializedTuplesCount(), - getClientSideSessionVars()); + GetQueryResultDataResponse response; + try { + response = stub.getQueryResultData(null, request); } catch (ServiceException e) { - throw e; - } catch (Throwable e) { - throw new ServiceException(e.getMessage(), e); + throw new RuntimeException(e); } + + throwIfError(response.getState()); + + ClientProtos.SerializedResultSet resultSet = response.getResultSet(); + return new TajoMemoryResultSet(queryId, + new Schema(resultSet.getSchema()), + resultSet.getSerializedTuplesList(), + resultSet.getSerializedTuplesCount(), + getClientSideSessionVars()); } @Override - public boolean updateQuery(final String sql) throws ServiceException { - - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + public boolean updateQuery(final String sql) throws SQLException { - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); + final BlockingInterface stub = conn.getTMStub(); + final QueryRequest request = buildQueryRequest(sql, false); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return true; - } else { - if (response.hasErrorMessage()) { - LOG.error("ERROR: " + response.getErrorMessage()); - } - return false; + UpdateQueryResponse response; + try { + response = stub.updateQuery(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(response.getState()); + conn.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + + return true; } @Override - public boolean updateQueryWithJson(final String json) throws ServiceException { + public boolean updateQueryWithJson(final String json) throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); + final QueryRequest request = buildQueryRequest(json, true); - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(json); - builder.setIsJson(true); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { - return true; - } else { - if (response.hasErrorMessage()) { - LOG.error("ERROR: " + response.getErrorMessage()); - } - return false; + UpdateQueryResponse response; + try { + response = stub.updateQuery(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(response.getState()); + return true; } @Override - public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException { + public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stmb = conn.getTMStub(); - TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder(); - builder.setId(connection.sessionId.getId()); - ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); + GetQueryListResponse res; + try { + res = stmb.getRunningQueryList(null, conn.sessionId); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + + throwIfError(res.getState()); return res.getQueryListList(); } @Override - public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException { + public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws SQLException { + + final BlockingInterface stub = conn.getTMStub(); - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + GetQueryListResponse res; + try { + res = stub.getFinishedQueryList(null, conn.sessionId); + } catch (ServiceException e) { + throw new RuntimeException(e); + } - TajoIdProtos.SessionIdProto.Builder builder = TajoIdProtos.SessionIdProto.newBuilder(); - builder.setId(connection.sessionId.getId()); - ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); + throwIfError(res.getState()); return res.getQueryListList(); } @Override - public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException { + public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws SQLException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + final BlockingInterface stub = conn.getTMStub(); + final GetClusterInfoRequest request = GetClusterInfoRequest.newBuilder() + .setSessionId(conn.sessionId) + .build(); - ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build()); + GetClusterInfoResponse res; + try { + res = stub.getClusterInfo(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); + } + + throwIfError(res.getState()); return res.getWorkerListList(); } @Override - public QueryStatus killQuery(final QueryId queryId) - throws ServiceException, IOException { + public QueryStatus killQuery(final QueryId queryId) throws SQLException { + final BlockingInterface stub = conn.getTMStub(); QueryStatus status = getQueryStatus(queryId); - NettyClientBase tmClient = null; + /* send a kill to the TM */ + QueryIdRequest request = buildQueryIdRequest(queryId); try { - /* send a kill to the TM */ - tmClient = connection.getTajoMasterConnection(); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - - connection.checkSessionAndGet(tmClient); - - ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - tajoMasterService.killQuery(null, builder.build()); - - long currentTimeMillis = System.currentTimeMillis(); - long timeKillIssued = currentTimeMillis; - while ((currentTimeMillis < timeKillIssued + 10000L) - && ((status.getState() != TajoProtos.QueryState.QUERY_KILLED) - || (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT))) { - try { - Thread.sleep(100L); - } catch(InterruptedException ie) { - break; - } - currentTimeMillis = System.currentTimeMillis(); - status = getQueryStatus(queryId); - } + stub.killQuery(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); + } - } catch(Exception e) { - LOG.debug("Error when checking for application status", e); + + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + while ((currentTimeMillis < timeKillIssued + 10000L) + && ((status.getState() != TajoProtos.QueryState.QUERY_KILLED) + || (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT))) { + try { + Thread.sleep(100L); + } catch (InterruptedException ie) { + break; + } + currentTimeMillis = System.currentTimeMillis(); + status = getQueryStatus(queryId); } + return status; } @@ -508,57 +471,90 @@ public class QueryClientImpl implements QueryClient { return this.maxRows; } - public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { - NettyClientBase client = connection.getTajoMasterConnection(); - connection.checkSessionAndGet(client); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return res.getQueryInfo(); - } else { - throw new ServiceException(res.getErrorMessage()); + public QueryInfoProto getQueryInfo(final QueryId queryId) throws SQLException { + + final BlockingInterface stub = conn.getTMStub(); + final QueryIdRequest request = buildQueryIdRequest(queryId); + + GetQueryInfoResponse res; + try { + res = stub.getQueryInfo(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(res.getState()); + return res.getQueryInfo(); } - public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { + public QueryHistoryProto getQueryHistory(final QueryId queryId) throws SQLException { final QueryInfoProto queryInfo = getQueryInfo(queryId); if (queryInfo.getHostNameOfQM() == null || queryInfo.getQueryMasterClientPort() == 0) { return null; } + InetSocketAddress qmAddress = new InetSocketAddress( queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); RpcClientManager manager = RpcClientManager.getInstance(); - NettyClientBase queryMasterClient; - try { - queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false, - manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false); - } catch (Exception e) { - throw new ServiceException(e); - } + NettyClientBase qmClient = null; try { - connection.checkSessionAndGet(connection.getTajoMasterConnection()); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub(); - GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return res.getQueryHistory(); - } else { - throw new ServiceException(res.getErrorMessage()); + qmClient = manager.newClient( + qmAddress, + QueryMasterClientProtocol.class, + false, + manager.getRetries(), + manager.getTimeoutSeconds(), + TimeUnit.SECONDS, + false + ); + + conn.checkSessionAndGet(conn.getTajoMasterConnection()); + + QueryIdRequest request = QueryIdRequest.newBuilder() + .setSessionId(conn.sessionId) + .setQueryId(queryId.getProto()) + .build(); + + QueryMasterClientProtocolService.BlockingInterface stub = qmClient.getStub(); + GetQueryHistoryResponse res; + try { + res = stub.getQueryHistory(null, request); + } catch (ServiceException e) { + throw new RuntimeException(e); } + + throwIfError(res.getState()); + return res.getQueryHistory(); + + } catch (ConnectException e) { + throw SQLExceptionUtil.makeUnableToEstablishConnection(e); + } catch (ClassNotFoundException e) { + throw SQLExceptionUtil.makeUnableToEstablishConnection(e); + } catch (NoSuchMethodException e) { + throw SQLExceptionUtil.makeUnableToEstablishConnection(e); + } catch (SQLException e) { + throw e; } finally { - queryMasterClient.close(); + qmClient.close(); } } + + private QueryIdRequest buildQueryIdRequest(QueryId queryId) { + return ClientProtos.QueryIdRequest.newBuilder() + .setSessionId(SessionIdProto.newBuilder().setId(getSessionId())) + .setQueryId(queryId.getProto()) + .build(); + } + + private QueryRequest buildQueryRequest(String query, boolean json) { + return QueryRequest.newBuilder() + .setSessionId(conn.sessionId) + .setQuery(query) + .setIsJson(json) + .build(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5d62c409/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java index 4a38934..809c675 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryStatus.java @@ -36,7 +36,7 @@ public class QueryStatus { public QueryStatus(GetQueryStatusResponse proto) { queryId = new QueryId(proto.getQueryId()); - state = proto.getState(); + state = proto.getQueryState(); progress = proto.getProgress(); submitTime = proto.getSubmitTime(); finishTime = proto.getFinishTime();
