http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 82bd855..376f63f 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -18,1031 +18,10 @@ package org.apache.tajo.client; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos.QueryState; -import org.apache.tajo.annotation.Nullable; import org.apache.tajo.annotation.ThreadSafe; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.partition.PartitionDesc; -import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto; -import org.apache.tajo.cli.InvalidClientSessionException; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.ipc.ClientProtos.*; -import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; -import org.apache.tajo.ipc.TajoMasterClientProtocol; -import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; -import org.apache.tajo.jdbc.FetchResultSet; -import org.apache.tajo.jdbc.SQLStates; -import org.apache.tajo.jdbc.TajoMemoryResultSet; -import org.apache.tajo.jdbc.TajoResultSet; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rpc.ServerCallable; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; -import org.apache.tajo.util.HAServiceUtil; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.NetUtils; import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet; @ThreadSafe -public class TajoClient implements Closeable { - public static final int UNKNOWN_ROW_NUMBER = -1; - - private final Log LOG = LogFactory.getLog(TajoClient.class); - - private final TajoConf conf; - - private final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>(); - - private final InetSocketAddress tajoMasterAddr; - - private final RpcConnectionPool connPool; - - private final String baseDatabase; - - private final UserGroupInformation userInfo; - - private volatile TajoIdProtos.SessionIdProto sessionId; - - private AtomicBoolean closed = new AtomicBoolean(false); - - public TajoClient(TajoConf conf) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); - } - - public TajoClient(TajoConf conf, @Nullable String baseDatabase) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase); - } - - /** - * Connect to TajoMaster - * - * @param conf TajoConf - * @param addr TajoMaster address - * @param baseDatabase The base database name. It is case sensitive. If it is null, - * the 'default' database will be used. - * @throws IOException - */ - public TajoClient(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException { - this.conf = conf; - this.conf.set("tajo.disk.scheduler.report.interval", "0"); - this.tajoMasterAddr = addr; - int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM); - // Don't share connection pool per client - connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum); - userInfo = UserGroupInformation.getCurrentUser(); - this.baseDatabase = baseDatabase != null ? baseDatabase : null; - } - - public void setSessionId(TajoIdProtos.SessionIdProto sessionId) { - this.sessionId = sessionId; - } - - public boolean isConnected() { - if(!closed.get()){ - try { - return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected(); - } catch (Throwable e) { - return false; - } - } - return false; - } - - public TajoClient(InetSocketAddress addr) throws IOException { - this(new TajoConf(), addr, null); - } - - public TajoClient(String hostname, int port, String baseDatabase) throws IOException { - this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase); - } - - public TajoIdProtos.SessionIdProto getSessionId() { - return sessionId; - } - - private InetSocketAddress getTajoMasterAddr() { - if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - return tajoMasterAddr; - } else { - if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) { - return HAServiceUtil.getMasterClientAddress(conf); - } else { - return tajoMasterAddr; - } - } - } - - public String getBaseDatabase() { - return baseDatabase; - } - - @Override - public void close() { - if(closed.getAndSet(true)){ - return; - } - - // remove session - try { - NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); - TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); - tajoMaster.removeSession(null, sessionId); - } catch (Throwable e) { - } - - if(connPool != null) { - connPool.shutdown(); - } - queryMasterMap.clear(); - } - - public TajoConf getConf() { - return conf; - } - - public UserGroupInformation getUserInfo() { - return userInfo; - } - - /** - * Call to QueryMaster closing query resources - * @param queryId - */ - public void closeQuery(final QueryId queryId) { - if(queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - try { - qmClient = connPool.getConnection(queryMasterMap.get(queryId), QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - queryMasterService.closeQuery(null, queryId.getProto()); - } catch (Exception e) { - LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connPool.closeConnection(qmClient); - queryMasterMap.remove(queryId); - } - } - } - - public void closeNonForwardQuery(final QueryId queryId) { - NettyClientBase tmClient = null; - try { - tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - checkSessionAndGet(tmClient); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - - builder.setSessionId(getSessionId()); - builder.setQueryId(queryId.getProto()); - tajoMasterService.closeNonForwardQuery(null, builder.build()); - } catch (Exception e) { - LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connPool.closeConnection(tmClient); - } - } - - private void checkSessionAndGet(NettyClientBase client) throws ServiceException { - if (sessionId == null) { - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); - builder.setUsername(userInfo.getUserName()).build(); - if (baseDatabase != null) { - builder.setBaseDatabaseName(baseDatabase); - } - CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); - if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) { - sessionId = response.getSessionId(); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); - } - } else { - throw new InvalidClientSessionException(response.getMessage()); - } - } - } - - private SessionedStringProto convertSessionedString(String str) { - SessionedStringProto.Builder builder = SessionedStringProto.newBuilder(); - builder.setSessionId(sessionId); - builder.setValue(str); - return builder.build(); - } - - public String getCurrentDatabase() throws ServiceException { - return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public String call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getCurrentDatabase(null, sessionId).getValue(); - } - }.withRetries(); - } - - public Boolean selectDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue(); - } - }.withRetries(); - } - - public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - KeyValueSet keyValueSet = new KeyValueSet(); - keyValueSet.putAll(variables); - UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .setSetVariables(keyValueSet.getProto()).build(); - - return tajoMasterService.updateSessionVariables(null, request).getValue(); - } - }.withRetries(); - } - - public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .addAllUnsetVariables(variables).build(); - return tajoMasterService.updateSessionVariables(null, request).getValue(); - } - }.withRetries(); - } - - public String getSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public String call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue(); - } - }.withRetries(); - } - - public Boolean existSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue(); - } - }.withRetries(); - } - - public Map<String, String> getAllSessionVariables() throws ServiceException { - return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, - false, true) { - - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - KeyValueSet keyValueSet = new KeyValueSet(tajoMasterService.getAllSessionVariables(null, sessionId)); - return keyValueSet.getAllKeyValus(); - } - }.withRetries(); - } - - /** - * It submits a query statement and get a response immediately. - * The response only contains a query id, and submission status. - * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)} - * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}. - */ - public SubmitQueryResponse executeQuery(final String sql) throws ServiceException { - return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public SubmitQueryResponse call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.submitQuery(null, builder.build()); - } - }.withRetries(); - } - - public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public SubmitQueryResponse call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQuery(json); - builder.setIsJson(true); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.submitQuery(null, builder.build()); - } - }.withRetries(); - } - - /** - * It submits a query statement and get a response. - * The main difference from {@link #executeQuery(String)} - * is a blocking method. So, this method is wait for - * the finish of the submitted query. - * - * @return If failed, return null. - */ - public ResultSet executeQueryAndGetResult(final String sql) - throws ServiceException, IOException { - SubmitQueryResponse response = executeQuery(sql); - - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); - } - QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded()) { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return this.createNullResultSet(queryId); - } else { - return this.getQueryResultAndWait(queryId); - } - } else { - // If a non-forwarded insert into query - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) { - return this.createNullResultSet(queryId); - } else { - if (response.hasResultSet() || response.hasTableDesc()) { - return createResultSet(this, response); - } else { - return this.createNullResultSet(queryId); - } - } - } - } - - public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { - SubmitQueryResponse response = executeQueryWithJson(json); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); - } - QueryId queryId = new QueryId(response.getQueryId()); - if (response.getIsForwarded()) { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return this.createNullResultSet(queryId); - } else { - return this.getQueryResultAndWait(queryId); - } - } else { - if (response.hasResultSet() || response.hasTableDesc()) { - return createResultSet(this, response); - } else { - return this.createNullResultSet(queryId); - } - } - } - - public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { - GetQueryStatusRequest.Builder builder = GetQueryStatusRequest.newBuilder(); - builder.setQueryId(queryId.getProto()); - - GetQueryStatusResponse res = null; - if(queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - try { - qmClient = connPool.getConnection(queryMasterMap.get(queryId), - QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connPool.releaseConnection(qmClient); - } - } else { - NettyClientBase tmClient = null; - try { - tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); - - checkSessionAndGet(tmClient); - builder.setSessionId(sessionId); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - res = tajoMasterService.getQueryStatus(null, builder.build()); - - String queryMasterHost = res.getQueryMasterHost(); - if(queryMasterHost != null && !queryMasterHost.isEmpty()) { - NettyClientBase qmClient = null; - try { - InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); - qmClient = connPool.getConnection( - qmAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - - queryMasterMap.put(queryId, qmAddr); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connPool.releaseConnection(qmClient); - } - } - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - } - return new QueryStatus(res); - } - - /* query submit */ - public static boolean isInPreNewState(QueryState state) { - return state == QueryState.QUERY_NOT_ASSIGNED || - state == QueryState.QUERY_MASTER_INIT || - state == QueryState.QUERY_MASTER_LAUNCHED; - } - - /* query submitted. but is not running */ - public static boolean isInInitState(QueryState state) { - return state == QueryState.QUERY_NEW || state == QueryState.QUERY_INIT; - } - - /* query started. but is not complete */ - public static boolean isInRunningState(QueryState state) { - return isInInitState(state) || state == QueryState.QUERY_RUNNING; - } - - /* query complete */ - public static boolean isInCompleteState(QueryState state) { - return !isInPreNewState(state) && !isInRunningState(state); - } - - public ResultSet getQueryResult(QueryId queryId) - throws ServiceException, IOException { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return createNullResultSet(queryId); - } - GetQueryResultResponse response = getResultResponse(queryId); - TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc()); - conf.setVar(ConfVars.USERNAME, response.getTajoUserName()); - return new TajoResultSet(this, queryId, conf, tableDesc); - } - - public static ResultSet createResultSet(TajoClient client, QueryId queryId, GetQueryResultResponse response) - throws IOException { - TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); - TajoConf conf = new TajoConf(client.getConf()); - conf.setVar(ConfVars.USERNAME, response.getTajoUserName()); - return new TajoResultSet(client, queryId, conf, desc); - } - - public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException { - if (response.hasTableDesc()) { - // non-forward query - // select * from table1 [limit 10] - int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM); - if (response.hasSessionVariables()) { - for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) { - if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) { - fetchRowNum = Integer.parseInt(eachKeyValue.getValue()); - } - } - } - TableDesc tableDesc = new TableDesc(response.getTableDesc()); - return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum); - } else { - // simple eval query - // select substr('abc', 1, 2) - SerializedResultSet serializedResultSet = response.getResultSet(); - return new TajoMemoryResultSet( - new Schema(serializedResultSet.getSchema()), - serializedResultSet.getSerializedTuplesList(), - response.getMaxRowNum()); - } - } - - private ResultSet getQueryResultAndWait(QueryId queryId) - throws ServiceException, IOException { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return createNullResultSet(queryId); - } - QueryStatus status = getQueryStatus(queryId); - - while(status != null && !isInCompleteState(status.getState())) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - status = getQueryStatus(queryId); - } - - if (status.getState() == QueryState.QUERY_SUCCEEDED) { - if (status.hasResult()) { - return getQueryResult(queryId); - } else { - return createNullResultSet(queryId); - } - - } else { - LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState()); - - //TODO throw SQLException(?) - return createNullResultSet(queryId); - } - } - - public ResultSet createNullResultSet(QueryId queryId) throws IOException { - return new TajoResultSet(this, queryId); - } - - public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException { - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - return null; - } - - NettyClientBase client = null; - try { - InetSocketAddress queryMasterAddr = queryMasterMap.get(queryId); - if(queryMasterAddr == null) { - LOG.warn("No Connection to QueryMaster for " + queryId); - return null; - } - client = connPool.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); - GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder(); - builder.setQueryId(queryId.getProto()); - GetQueryResultResponse response = queryMasterService.getQueryResult(null, - builder.build()); - - return response; - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connPool.releaseConnection(client); - } - } - - public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException { - try { - ServerCallable<SerializedResultSet> callable = - new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public SerializedResultSet call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQueryId(queryId.getProto()); - builder.setFetchRowNum(fetchRowNum); - try { - GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResultCode() == ResultCode.ERROR) { - throw new ServiceException(response.getErrorTrace()); - } - - return response.getResultSet(); - } catch (ServiceException e) { - abort(); - throw e; - } catch (Throwable t) { - throw new ServiceException(t.getMessage(), t); - } - } - }; - - SerializedResultSet serializedResultSet = callable.withRetries(); - - return new TajoMemoryResultSet( - new Schema(serializedResultSet.getSchema()), - serializedResultSet.getSerializedTuplesList(), - serializedResultSet.getSerializedTuplesCount()); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } - } - - public boolean updateQuery(final String sql) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { - return true; - } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); - } - return false; - } - } - }.withRetries(); - } - - public boolean updateQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQuery(json); - builder.setIsJson(true); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { - return true; - } else { - if (response.hasErrorMessage()) { - System.err.println("ERROR: " + response.getErrorMessage()); - } - return false; - } - } - }.withRetries(); - } - - /** - * Create a database. - * - * @param databaseName The database name to be created. This name is case sensitive. - * @return True if created successfully. - * @throws ServiceException - */ - public boolean createDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.createDatabase(null, convertSessionedString(databaseName)).getValue(); - } - }.withRetries(); - } - - /** - * Does the database exist? - * - * @param databaseName The database name to be checked. This name is case sensitive. - * @return True if so. - * @throws ServiceException - */ - public boolean existDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existDatabase(null, convertSessionedString(databaseName)).getValue(); - } - }.withRetries(); - } - - /** - * Drop the database - * - * @param databaseName The database name to be dropped. This name is case sensitive. - * @return True if the database is dropped successfully. - * @throws ServiceException - */ - public boolean dropDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.dropDatabase(null, convertSessionedString(databaseName)).getValue(); - } - }.withRetries(); - } - - public List<String> getAllDatabaseNames() throws ServiceException { - return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public List<String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getAllDatabases(null, sessionId).getValuesList(); - } - }.withRetries(); - } - - /** - * Does the table exist? - * - * @param tableName The table name to be checked. This name is case sensitive. - * @return True if so. - */ - public boolean existTable(final String tableName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existTable(null, convertSessionedString(tableName)).getValue(); - } - }.withRetries(); - } - - /** - * Create an external table. - * - * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not. - * If the table name is not qualified, the current database in the session will be used. - * @param schema The schema - * @param path The external table location - * @param meta Table meta - * @return the created table description. - * @throws SQLException - * @throws ServiceException - */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, - final TableMeta meta) - throws SQLException, ServiceException { - return createExternalTable(tableName, schema, path, meta, null); - } - - /** - * Create an external table. - * - * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not. - * If the table name is not qualified, the current database in the session will be used. - * @param schema The schema - * @param path The external table location - * @param meta Table meta - * @param partitionMethodDesc Table partition description - * @return the created table description. - * @throws SQLException - * @throws ServiceException - */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, - final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) - throws SQLException, ServiceException { - return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setName(tableName); - builder.setSchema(schema.getProto()); - builder.setMeta(meta.getProto()); - builder.setPath(path.toUri().toString()); - if (partitionMethodDesc != null) { - builder.setPartition(partitionMethodDesc.getProto()); - } - TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } - }.withRetries(); - } - - /** - * Drop a table - * - * @param tableName The table name to be dropped. This name is case sensitive. - * @return True if the table is dropped successfully. - */ - public boolean dropTable(final String tableName) throws ServiceException { - return dropTable(tableName, false); - } - - /** - * Drop a table. - * - * @param tableName The table name to be dropped. This name is case sensitive. - * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents. - * @return True if the table is dropped successfully. - */ - public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - DropTableRequest.Builder builder = DropTableRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setName(tableName); - builder.setPurge(purge); - return tajoMasterService.dropTable(null, builder.build()).getValue(); - } - }.withRetries(); - - } - - public List<BriefQueryInfo> getRunningQueryList() throws ServiceException { - return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder(); - builder.setSessionId(sessionId); - GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build()); - return res.getQueryListList(); - } - }.withRetries(); - } - - public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException { - return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder(); - builder.setSessionId(sessionId); - GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build()); - return res.getQueryListList(); - } - }.withRetries(); - } - - public List<WorkerResourceInfo> getClusterInfo() throws ServiceException { - return new ServerCallable<List<WorkerResourceInfo>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetClusterInfoRequest.Builder builder = GetClusterInfoRequest.newBuilder(); - builder.setSessionId(sessionId); - GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build()); - return res.getWorkerListList(); - } - }.withRetries(); - } - - /** - * Get a list of table names. - * - * @param databaseName The database name to show all tables. This name is case sensitive. - * If it is null, this method will show all tables - * in the current database of this session. - */ - public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { - return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public List<String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetTableListRequest.Builder builder = GetTableListRequest.newBuilder(); - builder.setSessionId(sessionId); - if (databaseName != null) { - builder.setDatabaseName(databaseName); - } - GetTableListResponse res = tajoMasterService.getTableList(null, builder.build()); - return res.getTablesList(); - } - }.withRetries(); - } - - /** - * Get a table description - * - * @param tableName The table name to get. This name is case sensitive. - * @return Table description - */ - public TableDesc getTableDesc(final String tableName) throws ServiceException { - return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetTableDescRequest.Builder builder = GetTableDescRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setTableName(tableName); - TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResultCode() == ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } - }.withRetries(); - } - - public QueryStatus killQuery(final QueryId queryId) - throws ServiceException, IOException { - - QueryStatus status = getQueryStatus(queryId); - - NettyClientBase tmClient = null; - try { - /* send a kill to the TM */ - tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - - checkSessionAndGet(tmClient); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(sessionId); - builder.setQueryId(queryId.getProto()); - tajoMasterService.killQuery(null, builder.build()); - - long currentTimeMillis = System.currentTimeMillis(); - long timeKillIssued = currentTimeMillis; - while ((currentTimeMillis < timeKillIssued + 10000L) - && ((status.getState() != QueryState.QUERY_KILLED) - || (status.getState() == QueryState.QUERY_KILL_WAIT))) { - try { - Thread.sleep(100L); - } catch(InterruptedException ie) { - break; - } - currentTimeMillis = System.currentTimeMillis(); - status = getQueryStatus(queryId); - } - - } catch(Exception e) { - LOG.debug("Error when checking for application status", e); - } finally { - connPool.releaseConnection(tmClient); - } - return status; - } - - public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { - return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false, true) { - public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - String paramFunctionName = functionName == null ? "" : functionName; - FunctionResponse res = tajoMasterService.getFunctionList(null,convertSessionedString(paramFunctionName)); - if (res.getResultCode() == ResultCode.OK) { - return res.getFunctionsList(); - } else { - throw new SQLException(res.getErrorMessage()); - } - } - }.withRetries(); - } +public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable { }
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java new file mode 100644 index 0000000..75de492 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryId; +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; +import org.apache.tajo.ipc.ClientProtos.GetQueryResultResponse; +import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; +import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; +import org.apache.tajo.jdbc.TajoMemoryResultSet; +import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.util.NetUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +@ThreadSafe +public class TajoClientImpl extends SessionConnection implements TajoClient, QueryClient, CatalogAdminClient { + + private final Log LOG = LogFactory.getLog(TajoClientImpl.class); + QueryClient queryClient; + CatalogAdminClient catalogClient; + + public TajoClientImpl(TajoConf conf) throws IOException { + this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); + } + + public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException { + this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase); + } + + public TajoClientImpl(InetSocketAddress addr) throws IOException { + this(new TajoConf(), addr, null); + } + + /** + * Connect to TajoMaster + * + * @param conf TajoConf + * @param addr TajoMaster address + * @param baseDatabase The base database name. It is case sensitive. If it is null, + * the 'default' database will be used. + * @throws java.io.IOException + */ + public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException { + super(conf, addr, baseDatabase); + this.queryClient = new QueryClientImpl(this); + this.catalogClient = new CatalogAdminClientImpl(this); + } + + public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException { + super(hostName, port, baseDatabase); + this.queryClient = new QueryClientImpl(this); + this.catalogClient = new CatalogAdminClientImpl(this); + } + + /*------------------------------------------------------------------------*/ + // QueryClient wrappers + /*------------------------------------------------------------------------*/ + + public void closeQuery(final QueryId queryId) { + queryClient.closeQuery(queryId); + } + + public void closeNonForwardQuery(final QueryId queryId) { + queryClient.closeNonForwardQuery(queryId); + } + + public SubmitQueryResponse executeQuery(final String sql) throws ServiceException { + return queryClient.executeQuery(sql); + } + + public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException { + return queryClient.executeQueryWithJson(json); + } + + public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException { + return queryClient.executeQueryAndGetResult(sql); + } + + public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException { + return queryClient.executeJsonQueryAndGetResult(json); + } + + public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { + return queryClient.getQueryStatus(queryId); + } + + public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException { + return queryClient.getQueryResult(queryId); + } + + public ResultSet createNullResultSet(QueryId queryId) throws IOException { + return new TajoResultSet(this, queryId); + } + + public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException { + return queryClient.getResultResponse(queryId); + } + + public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException { + return queryClient.fetchNextQueryResult(queryId, fetchRowNum); + } + + public boolean updateQuery(final String sql) throws ServiceException { + return queryClient.updateQuery(sql); + } + + public boolean updateQueryWithJson(final String json) throws ServiceException { + return queryClient.updateQueryWithJson(json); + } + + public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException { + return queryClient.killQuery(queryId); + } + + public List<BriefQueryInfo> getRunningQueryList() throws ServiceException { + return queryClient.getRunningQueryList(); + } + + public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException { + return queryClient.getFinishedQueryList(); + } + + public List<WorkerResourceInfo> getClusterInfo() throws ServiceException { + return queryClient.getClusterInfo(); + } + + /*------------------------------------------------------------------------*/ + // CatalogClient wrappers + /*------------------------------------------------------------------------*/ + + public boolean createDatabase(final String databaseName) throws ServiceException { + return catalogClient.createDatabase(databaseName); + } + + public boolean existDatabase(final String databaseName) throws ServiceException { + return catalogClient.existDatabase(databaseName); + } + + public boolean dropDatabase(final String databaseName) throws ServiceException { + return catalogClient.dropDatabase(databaseName); + } + + public List<String> getAllDatabaseNames() throws ServiceException { + return catalogClient.getAllDatabaseNames(); + } + + public boolean existTable(final String tableName) throws ServiceException { + return catalogClient.existTable(tableName); + } + + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta) throws SQLException, ServiceException { + return catalogClient.createExternalTable(tableName, schema, path, meta); + } + + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) + throws SQLException, ServiceException { + return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc); + } + + public boolean dropTable(final String tableName) throws ServiceException { + return dropTable(tableName, false); + } + + public boolean dropTable(final String tableName, final boolean purge) throws ServiceException { + return catalogClient.dropTable(tableName, purge); + } + + public List<String> getTableList(@Nullable final String databaseName) throws ServiceException { + return catalogClient.getTableList(databaseName); + } + + public TableDesc getTableDesc(final String tableName) throws ServiceException { + return catalogClient.getTableDesc(tableName); + } + + public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { + return catalogClient.getFunctions(functionName); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java new file mode 100644 index 0000000..7aed335 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.jdbc.FetchResultSet; +import org.apache.tajo.jdbc.TajoMemoryResultSet; +import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; + +import java.io.IOException; +import java.sql.ResultSet; + +public class TajoClientUtil { + + /* query submit */ + public static boolean isQueryWaitingForSchedule(TajoProtos.QueryState state) { + return state == TajoProtos.QueryState.QUERY_NOT_ASSIGNED || + state == TajoProtos.QueryState.QUERY_MASTER_INIT || + state == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED; + } + + /* query submitted. but is not running */ + public static boolean isQueryInited(TajoProtos.QueryState state) { + return state == TajoProtos.QueryState.QUERY_NEW || state == TajoProtos.QueryState.QUERY_INIT; + } + + /* query started. but is not complete */ + public static boolean isQueryRunning(TajoProtos.QueryState state) { + return isQueryInited(state) || state == TajoProtos.QueryState.QUERY_RUNNING; + } + + /* query complete */ + public static boolean isQueryComplete(TajoProtos.QueryState state) { + return !isQueryWaitingForSchedule(state) && !isQueryRunning(state); + } + + public static ResultSet createResultSet(TajoConf conf, TajoClient client, QueryId queryId, + ClientProtos.GetQueryResultResponse response) + throws IOException { + TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); + conf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); + return new TajoResultSet(client, queryId, conf, desc); + } + + public static ResultSet createResultSet(TajoConf conf, QueryClient client, ClientProtos.SubmitQueryResponse response) + throws IOException { + if (response.hasTableDesc()) { + // non-forward query + // select * from table1 [limit 10] + int fetchRowNum = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM); + if (response.hasSessionVariables()) { + for (PrimitiveProtos.KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) { + if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) { + fetchRowNum = Integer.parseInt(eachKeyValue.getValue()); + } + } + } + TableDesc tableDesc = new TableDesc(response.getTableDesc()); + return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum); + } else { + // simple eval query + // select substr('abc', 1, 2) + ClientProtos.SerializedResultSet serializedResultSet = response.getResultSet(); + return new TajoMemoryResultSet( + new Schema(serializedResultSet.getSchema()), + serializedResultSet.getSerializedTuplesList(), + response.getMaxRowNum()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java index 7628d9d..540f54b 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java @@ -104,9 +104,9 @@ public class TajoDump { System.exit(-1); } else if (hostName != null && port != null) { conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } else { - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } PrintWriter writer = new PrintWriter(System.out); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java index 2377427..88ab491 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java @@ -112,9 +112,9 @@ public class TajoGetConf { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } else if (hostName == null && port == null) { - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } processConfKey(writer, param); @@ -123,13 +123,13 @@ public class TajoGetConf { private void processConfKey(Writer writer, String param) throws ParseException, IOException, ServiceException, SQLException { - String value = tajoClient.getConf().getTrimmed(param); + String value = tajoConf.getTrimmed(param); // If there is no value in the configuration file, we need to find all ConfVars. if (value == null) { for(TajoConf.ConfVars vars : TajoConf.ConfVars.values()) { if (vars.varname.equalsIgnoreCase(param)) { - value = tajoClient.getConf().getVar(vars); + value = tajoConf.getVar(vars); break; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java index 11cb4ed..5d5cf71 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java @@ -126,9 +126,9 @@ public class TajoHAAdmin { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } else if (hostName == null && port == null) { - tajoClient = new TajoClient(tajoConf); + tajoClient = new TajoClientImpl(tajoConf); } if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { @@ -159,7 +159,7 @@ public class TajoHAAdmin { private void getState(Writer writer, String param) throws ParseException, IOException, ServiceException { tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient); - int retValue = HAServiceUtil.getState(param, tajoClient.getConf()); + int retValue = HAServiceUtil.getState(param, tajoConf); switch (retValue) { case 1: @@ -179,7 +179,7 @@ public class TajoHAAdmin { private void formatHA(Writer writer) throws ParseException, IOException, ServiceException { - int retValue = HAServiceUtil.formatHA(tajoClient.getConf()); + int retValue = HAServiceUtil.formatHA(tajoConf); switch (retValue) { case 1: http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java index f22d5ba..b93590c 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java @@ -71,7 +71,7 @@ public class TajoHAClientUtil { conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, HAServiceUtil.getMasterClientName(conf)); client.close(); - tajoClient = new TajoClient(conf, baseDatabase); + tajoClient = new TajoClientImpl(conf, baseDatabase); if (context != null && context.getCurrentDatabase() != null) { tajoClient.selectDatabase(context.getCurrentDatabase()); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java index 7ebce91..78674b1 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -20,6 +20,7 @@ package org.apache.tajo.jdbc; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.TajoClient; import org.apache.tajo.storage.Tuple; @@ -27,13 +28,13 @@ import java.io.IOException; import java.sql.SQLException; public class FetchResultSet extends TajoResultSetBase { - private TajoClient tajoClient; + private QueryClient tajoClient; private QueryId queryId; private int fetchRowNum; private TajoMemoryResultSet currentResultSet; private boolean finished = false; - public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) { + public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) { this.tajoClient = tajoClient; this.queryId = queryId; this.fetchRowNum = fetchRowNum; http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java index 65954f1..d78b04f 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.FileScanner; @@ -47,20 +48,20 @@ public class TajoResultSet extends TajoResultSetBase { private FileSystem fs; private Scanner scanner; - private TajoClient tajoClient; + private QueryClient tajoClient; private TajoConf conf; private TableDesc desc; private Long maxRowNum = null; private QueryId queryId; private AtomicBoolean closed = new AtomicBoolean(false); - public TajoResultSet(TajoClient tajoClient, QueryId queryId) { + public TajoResultSet(QueryClient tajoClient, QueryId queryId) { this.tajoClient = tajoClient; this.queryId = queryId; init(); } - public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException { + public TajoResultSet(QueryClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException { this.tajoClient = tajoClient; this.queryId = queryId; this.conf = conf; http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java index 91dcea1..b1b6450 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.store.MemStore; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.util.FileUtil; @@ -43,11 +44,11 @@ public abstract class BenchmarkSet { public void init(TajoConf conf, String dataDir) throws IOException { this.dataDir = dataDir; if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) { - tajo = new TajoClient(NetUtils.createSocketAddr( + tajo = new TajoClientImpl(NetUtils.createSocketAddr( System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname))); } else { conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName()); - tajo = new TajoClient(conf); + tajo = new TajoClientImpl(conf); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index cd3be98..f7c7b11 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -41,7 +41,7 @@ import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.QueryClient; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; @@ -761,7 +761,7 @@ public class GlobalEngine extends AbstractService { stats.setNumBytes(totalSize); if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing. - stats.setNumRows(TajoClient.UNKNOWN_ROW_NUMBER); + stats.setNumRows(QueryClient.UNKNOWN_ROW_NUMBER); } TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName), http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 7ea2e48..7790ac6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -9,13 +9,10 @@ import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.client.QueryStatus; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoHAClientUtil; +import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; -import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.JSPUtil; import org.apache.tajo.util.TajoIdUtils; import org.codehaus.jackson.map.DeserializationConfig; @@ -67,18 +64,21 @@ public class QueryExecutorServlet extends HttpServlet { //queryRunnerId -> QueryRunner private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>(); + private TajoConf tajoConf; private TajoClient tajoClient; private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5); private QueryRunnerCleaner queryRunnerCleaner; + @Override public void init(ServletConfig config) throws ServletException { om.getDeserializationConfig().disable( DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES); try { - tajoClient = new TajoClient(new TajoConf()); + tajoConf = new TajoConf(); + tajoClient = new TajoClientImpl(tajoConf); queryRunnerCleaner = new QueryRunnerCleaner(); queryRunnerCleaner.start(); @@ -273,8 +273,7 @@ public class QueryExecutorServlet extends HttpServlet { public void run() { startTime = System.currentTimeMillis(); try { - TajoConf conf = tajoClient.getConf(); - tajoClient = TajoHAClientUtil.getTajoClient(conf, tajoClient); + tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient); response = tajoClient.executeQuery(query); @@ -319,7 +318,7 @@ public class QueryExecutorServlet extends HttpServlet { // non-forwarded INSERT INTO query does not have any query id. // In this case, it just returns succeeded query information without printing the query results. } else { - res = TajoClient.createResultSet(tajoClient, response); + res = TajoClientUtil.createResultSet(tajoConf, tajoClient, response); MakeResultText(res, desc); } progress.set(100); @@ -399,8 +398,8 @@ public class QueryExecutorServlet extends HttpServlet { try { ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId); TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); - tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); - res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc); + tajoConf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); + res = new TajoResultSet(tajoClient, queryId, tajoConf, desc); MakeResultText(res, desc); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 805fe06..c6cac32 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -31,6 +31,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; @@ -102,7 +103,7 @@ public class LocalTajoTestingUtility { util = new TajoTestingCluster(); util.startMiniCluster(1); conf = util.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); FileSystem fs = util.getDefaultFileSystem(); Path rootDir = util.getMaster().getStorageManager().getWarehouseDir(); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index a272b15..becb73e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.cli.ParsedResult; import org.apache.tajo.cli.SimpleParser; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; @@ -178,7 +179,7 @@ public class QueryTestCaseBase { @BeforeClass public static void setUpClass() throws IOException { conf = testBase.getTestingCluster().getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index aec11f6..452a17e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.TajoMaster; @@ -583,7 +585,7 @@ public class TajoTestingCluster { Thread.sleep(1000); } TajoConf conf = util.getConfiguration(); - return new TajoClient(conf); + return new TajoClientImpl(conf); } public static ResultSet run(String[] names, @@ -620,7 +622,7 @@ public class TajoTestingCluster { Thread.sleep(1000); } TajoConf conf = util.getConfiguration(); - TajoClient client = new TajoClient(conf); + TajoClient client = new TajoClientImpl(conf); try { return run(names, schemas, tableOption, tables, query, client); @@ -645,7 +647,7 @@ public class TajoTestingCluster { Thread.sleep(1000); } TajoConf conf = util.getConfiguration(); - TajoClient client = new TajoClient(conf); + TajoClient client = new TajoClientImpl(conf); try { FileSystem fs = util.getDefaultFileSystem(); Path rootDir = util.getMaster(). @@ -721,7 +723,7 @@ public class TajoTestingCluster { QueryMasterTask qmt = null; int i = 0; - while (qmt == null || TajoClient.isInPreNewState(qmt.getState())) { + while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) { try { Thread.sleep(delay); if(qmt == null){ http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 4ede88e..719a775 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -62,7 +62,7 @@ public class TestTajoClient { public static void setUp() throws Exception { cluster = TpchTestBase.getInstance().getTestingCluster(); conf = cluster.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); testDir = CommonTestingUtil.getTestDir(); } @@ -658,7 +658,7 @@ public class TestTajoClient { QueryStatus queryStatus = client.getQueryStatus(queryId); assertNotNull(queryStatus); - assertTrue(TajoClient.isInCompleteState(queryStatus.getState())); + assertTrue(TajoClientUtil.isQueryComplete(queryStatus.getState())); TajoResultSet resultSet = (TajoResultSet) client.getQueryResult(queryId); assertNotNull(resultSet); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index e477939..08535ef 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -25,7 +25,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.QueryClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -419,7 +419,7 @@ public class TestTajoJdbc extends QueryTestCaseBase { assertNotNull(rsmd); assertEquals(0, rsmd.getColumnCount()); - TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient(); + QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient(); Map<String, String> variables = connTajoClient.getAllSessionVariables(); String value = variables.get("JOIN_TASK_INPUT_SIZE"); assertNotNull(value); @@ -461,7 +461,7 @@ public class TestTajoJdbc extends QueryTestCaseBase { assertNotNull(rsmd); assertEquals(0, rsmd.getColumnCount()); - TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient(); + QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient(); Map<String, String> variables = connTajoClient.getAllSessionVariables(); String value = variables.get("JOIN_TASK_INPUT_SIZE"); assertNotNull(value); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java index 86d18eb..249afae 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.junit.Test; @@ -55,7 +56,7 @@ public class TestHAServiceHDFSImpl { cluster.startMiniCluster(1); conf = cluster.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); FileSystem fs = cluster.getDefaultFileSystem(); startBackupMasters(); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java index 0f925bb..4a6ca00 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java @@ -21,6 +21,8 @@ package org.apache.tajo.master.querymaster; import org.apache.tajo.*; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; @@ -41,7 +43,7 @@ public class TestQueryProgress { public static void setUp() throws Exception { cluster = TpchTestBase.getInstance().getTestingCluster(); conf = cluster.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } @AfterClass @@ -67,7 +69,7 @@ public class TestQueryProgress { prevProgress = progress; assertTrue(progress <= 1.0f); - if (TajoClient.isInCompleteState(status.getState())) break; + if (TajoClientUtil.isQueryComplete(status.getState())) break; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java index 069ee27..18764c2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -20,6 +20,8 @@ package org.apache.tajo.scheduler; import org.apache.tajo.*; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.junit.AfterClass; @@ -41,7 +43,7 @@ public class TestFifoScheduler { public static void setUp() throws Exception { cluster = TpchTestBase.getInstance().getTestingCluster(); conf = cluster.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } @AfterClass @@ -75,7 +77,7 @@ public class TestFifoScheduler { cluster.waitForQueryRunning(queryId); assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); - ResultSet resSet = TajoClient.createResultSet(client, res2); + ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); assertNotNull(resSet); client.killQuery(queryId); //cleanup @@ -95,7 +97,7 @@ public class TestFifoScheduler { cluster.waitForQueryRunning(queryId); - assertTrue(TajoClient.isInRunningState(client.getQueryStatus(queryId).getState())); + assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState())); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index 3a85c14..c68d3a4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.QueryInProgress; @@ -49,7 +50,7 @@ public class TestHistory { cluster = TpchTestBase.getInstance().getTestingCluster(); master = cluster.getMaster(); conf = cluster.getConfiguration(); - client = new TajoClient(conf); + client = new TajoClientImpl(conf); } @After
