http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java new file mode 100644 index 0000000..0f83189 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import com.google.protobuf.ByteString; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.ipc.ClientProtos.ResultCode; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.PartitionedTableScanNode; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.session.Session; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.ws.rs.*; +import org.apache.tajo.ws.rs.responses.GetQueryResultDataResponse; +import org.apache.tajo.ws.rs.responses.ResultSetInfoResponse; + +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import javax.ws.rs.core.Response.Status; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +public class QueryResultResource { + + private static final Log LOG = LogFactory.getLog(QueryResultResource.class); + + private UriInfo uriInfo; + + private Application application; + + private String databaseName; + + private String queryId; + + private JerseyResourceDelegateContext context; + + private static final String databaseNameKeyName = "databaseName"; + private static final String queryIdKeyName = "queryId"; + private static final String sessionIdKeyName = "sessionId"; + private static final String cacheIdKeyName = "cacheId"; + private static final String offsetKeyName = "offset"; + private static final String countKeyName = "count"; + + private static final String tajoDigestHeaderName = "X-Tajo-Digest"; + + public UriInfo getUriInfo() { + return uriInfo; + } + + public void setUriInfo(UriInfo uriInfo) { + this.uriInfo = uriInfo; + } + + public Application getApplication() { + return application; + } + + public void setApplication(Application application) { + this.application = application; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + context.put(queryIdKey, queryId); + } + + private static NonForwardQueryResultScanner getNonForwardQueryResultScanner( + MasterContext masterContext, + Session session, + QueryId queryId) throws IOException { + NonForwardQueryResultScanner resultScanner = session.getNonForwardQueryResultScanner(queryId); + if (resultScanner == null) { + QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryId); + if (queryInfo == null) { + throw new RuntimeException("QueryInfo isnull."); + } + + TableDesc resultTableDesc = queryInfo.getResultDesc(); + if (resultTableDesc == null) { + throw new RuntimeException("Result Table Desc is null."); + } + + ScanNode scanNode; + if (resultTableDesc.hasPartition()) { + scanNode = LogicalPlan.createNodeWithoutPID(PartitionedTableScanNode.class); + scanNode.init(resultTableDesc); + } else { + scanNode = LogicalPlan.createNodeWithoutPID(ScanNode.class); + scanNode.init(resultTableDesc); + } + + resultScanner = new NonForwardQueryResultFileScanner(masterContext.getConf(), session.getSessionId(), queryId, + scanNode, resultTableDesc, Integer.MAX_VALUE); + resultScanner.init(); + session.addNonForwardQueryResultScanner(resultScanner); + } + + return resultScanner; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getQueryResult(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get query result request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetQueryResultDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetQueryResultDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + String queryId = context.get(queryIdKey); + JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.ClientApplicationKey, ClientApplication.class); + ClientApplication clientApplication = context.get(clientApplicationKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + UriInfo uriInfo = context.get(uriInfoKey); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + + try { + masterContext.getSessionManager().touch(sessionId); + Session session = masterContext.getSessionManager().getSession(sessionId); + QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId); + + masterContext.getSessionManager().touch(sessionId); + + QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryIdObj); + GetQueryResultDataResponse response = new GetQueryResultDataResponse(); + + if (queryInfo == null) { + response.setResultCode(ResultCode.ERROR); + response.setErrorMessage("Unable to find a query info for requested id : " + queryId); + return Response.status(Status.NOT_FOUND).entity(response).build(); + } + + NonForwardQueryResultScanner queryResultScanner = getNonForwardQueryResultScanner(masterContext, session, queryIdObj); + + if (queryInfo.getResultDesc() != null && queryInfo.getResultDesc().getSchema() != null) { + response.setSchema(queryInfo.getResultDesc().getSchema()); + } else { + response.setSchema(queryResultScanner.getLogicalSchema()); + } + + long cacheId = clientApplication.generateCacheIdIfAbsent(queryIdObj); + clientApplication.setCachedNonForwardResultScanner(queryIdObj, cacheId, queryResultScanner); + URI resultSetCacheUri = uriInfo.getBaseUriBuilder() + .path(QueryResource.class) + .path(QueryResource.class, "getQueryResult") + .path(QueryResultResource.class, "getQueryResultSet") + .build(databaseName, queryId, cacheId); + ResultSetInfoResponse resultSetInfoResponse = new ResultSetInfoResponse(); + resultSetInfoResponse.setId(cacheId); + resultSetInfoResponse.setLink(resultSetCacheUri); + response.setResultset(resultSetInfoResponse); + response.setResultCode(ResultCode.OK); + + return Response.status(Status.OK).entity(response).build(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + GetQueryResultDataResponse response = new GetQueryResultDataResponse(); + response.setResultCode(ResultCode.ERROR); + response.setErrorMessage(e.getMessage()); + response.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(e)); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(response).build(); + } + } + } + + @GET + @Path("{cacheId}") + @Produces(MediaType.APPLICATION_OCTET_STREAM) + public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId, + @PathParam("cacheId") String cacheId, + @DefaultValue("-1") @QueryParam("offset") int offset, + @DefaultValue("-1") @QueryParam("count") int count) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get query result set request."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + JerseyResourceDelegateContextKey<Long> cacheIdKey = + JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class); + context.put(cacheIdKey, Long.valueOf(cacheId)); + JerseyResourceDelegateContextKey<Integer> offsetKey = + JerseyResourceDelegateContextKey.valueOf(offsetKeyName, Integer.class); + context.put(offsetKey, offset); + JerseyResourceDelegateContextKey<Integer> countKey = + JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class); + context.put(countKey, count); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetQueryResultSetDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetQueryResultSetDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + String queryId = context.get(queryIdKey); + JerseyResourceDelegateContextKey<Long> cacheIdKey = + JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class); + Long cacheId = context.get(cacheIdKey); + JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.ClientApplicationKey, ClientApplication.class); + ClientApplication clientApplication = context.get(clientApplicationKey); + JerseyResourceDelegateContextKey<Integer> offsetKey = + JerseyResourceDelegateContextKey.valueOf(offsetKeyName, Integer.class); + int offset = context.get(offsetKey); + JerseyResourceDelegateContextKey<Integer> countKey = + JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class); + int count = context.get(countKey); + + if (sessionId == null || sessionId.isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "Session id is required. Please refer the header " + + QueryResource.tajoSessionIdHeaderName); + } + + if (queryId == null || queryId.isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "Query id is required. Please specify the query id"); + } + + QueryId queryIdObj; + try { + queryIdObj = TajoIdUtils.parseQueryId(queryId); + } catch (Throwable e) { + return ResourcesUtil.createExceptionResponse(LOG, "Invalid query id : " + queryId); + } + + if (cacheId == null || cacheId.longValue() == 0) { + return ResourcesUtil.createBadRequestResponse(LOG, "Cache id is null or empty."); + } + + if (count < 0) { + return ResourcesUtil.createBadRequestResponse(LOG, "Invalid count value : " + count); + } + + NonForwardQueryResultScanner cachedQueryResultScanner = + clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue()); + + try { + skipOffsetRow(cachedQueryResultScanner, offset); + + List<ByteString> output = cachedQueryResultScanner.getNextRows(count); + String digestString = getEncodedBase64DigestString(output); + + return Response.ok(new QueryResultStreamingOutput(output)) + .header(tajoDigestHeaderName, digestString) + .build(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + + return ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } catch (NoSuchAlgorithmException e) { + LOG.error(e.getMessage(), e); + + return ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + } + + private void skipOffsetRow(NonForwardQueryResultScanner queryResultScanner, int offset) throws IOException { + if (offset < 0) { + return; + } + + int currentRow = queryResultScanner.getCurrentRowNumber(); + + if (offset < (currentRow+1)) { + throw new RuntimeException("Offset must be over the current row number"); + } + + queryResultScanner.getNextRows(offset - currentRow - 1); + } + + private String getEncodedBase64DigestString(List<ByteString> outputList) throws NoSuchAlgorithmException { + MessageDigest messageDigest = MessageDigest.getInstance("SHA-1"); + + for (ByteString byteString: outputList) { + messageDigest.update(byteString.toByteArray()); + } + + return Base64.encodeBase64String(messageDigest.digest()); + } + } + + private static class QueryResultStreamingOutput implements StreamingOutput { + + private final List<ByteString> outputList; + + public QueryResultStreamingOutput(List<ByteString> outputList) { + this.outputList = outputList; + } + + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException { + DataOutputStream streamingOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream)); + + for (ByteString byteString: outputList) { + byte[] byteStringArray = byteString.toByteArray(); + streamingOutputStream.writeInt(byteStringArray.length); + streamingOutputStream.write(byteStringArray); + } + + streamingOutputStream.flush(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/SessionsResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/SessionsResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/SessionsResource.java new file mode 100644 index 0000000..25aa798 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/SessionsResource.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.session.InvalidSessionException; +import org.apache.tajo.session.Session; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; +import org.apache.tajo.ws.rs.requests.NewSessionRequest; +import org.apache.tajo.ws.rs.responses.ExceptionResponse; +import org.apache.tajo.ws.rs.responses.NewSessionResponse; + +import javax.ws.rs.*; +import javax.ws.rs.core.*; + +import java.net.URI; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +@Path("/sessions") +public class SessionsResource { + + private static final Log LOG = LogFactory.getLog(SessionsResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + JerseyResourceDelegateContext context; + + private static final String newSessionRequestKeyName = "NewSessionRequest"; + private static final String sessionIdKeyName = "SessionId"; + private static final String variablesKeyName = "VariablesMap"; + + private static final String variablesOutputKeyName = "variables"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + } + + /** + * Creates a new client session. + * + * @param request + * @return + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createNewSession(NewSessionRequest request) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a new session request. : " + request); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<NewSessionRequest> newSessionRequestKey = + JerseyResourceDelegateContextKey.valueOf(newSessionRequestKeyName, NewSessionRequest.class); + context.put(newSessionRequestKey, request); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new CreateNewSessionDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + return response; + } + + private static class CreateNewSessionDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + try { + JerseyResourceDelegateContextKey<NewSessionRequest> sessionRequestKey = + JerseyResourceDelegateContextKey.valueOf(newSessionRequestKeyName, NewSessionRequest.class); + NewSessionRequest request = context.get(sessionRequestKey); + + if (request.getUserName() == null || request.getUserName().isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "userName is null or empty."); + } + + String userName = request.getUserName(); + String databaseName = request.getDatabaseName(); + if (databaseName == null || databaseName.isEmpty()) { + databaseName = TajoConstants.DEFAULT_DATABASE_NAME; + } + + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + TajoMaster.MasterContext masterContext = context.get(masterContextKey); + + NewSessionResponse sessionResponse = new NewSessionResponse(); + String sessionId = masterContext.getSessionManager().createSession(userName, databaseName); + + LOG.info("Session " + sessionId + " is created. "); + + sessionResponse.setId(sessionId); + sessionResponse.setResultCode(ClientProtos.ResultCode.OK); + sessionResponse.setVariables(masterContext.getSessionManager().getAllVariables(sessionId)); + + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + UriInfo uriInfo = context.get(uriInfoKey); + + URI newSessionUri = uriInfo.getBaseUriBuilder() + .path(SessionsResource.class).path(sessionId).build(); + + return Response.created(newSessionUri).entity(sessionResponse).build(); + } catch (InvalidSessionException e) { + LOG.error(e.getMessage(), e); + + NewSessionResponse sessionResponse = new NewSessionResponse(); + sessionResponse.setResultCode(ClientProtos.ResultCode.ERROR); + sessionResponse.setMessage(e.getMessage()); + + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(sessionResponse).build(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + NewSessionResponse sessionResponse = new NewSessionResponse(); + sessionResponse.setResultCode(ClientProtos.ResultCode.ERROR); + sessionResponse.setMessage(e.getMessage()); + + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(sessionResponse).build(); + } + } + + } + + /** + * Removes existing sessions. + * + * @param sessionId + * @return + */ + @DELETE + @Path("/{session-id}") + public Response removeSession(@PathParam("session-id") String sessionId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent remove session request : Session Id (" + sessionId + ")"); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new RemoveSessionDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class RemoveSessionDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + TajoMaster.MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + + Session session = masterContext.getSessionManager().removeSession(sessionId); + + if (session != null) { + LOG.info("Session " + sessionId + " is removed."); + + return Response.status(Response.Status.OK).build(); + } else { + ExceptionResponse response = new ExceptionResponse(); + response.setMessage("Unable to find a session (" + sessionId + ")"); + + return Response.status(Response.Status.NOT_FOUND).entity(response).build(); + } + } + + } + + /** + * Retrieves all session variables + * + * @param sessionId + * @return + */ + @GET + @Path("/{session-id}/variables") + @Produces(MediaType.APPLICATION_JSON) + public Response getAllSessionVariables(@PathParam("session-id") String sessionId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a GetAllSessionVariables request for a session : " + sessionId); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetAllSessionVariablesDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetAllSessionVariablesDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + TajoMaster.MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + + try { + Map<String, Map<String, String>> variablesMap = new HashMap<String, Map<String, String>>(); + variablesMap.put(variablesOutputKeyName, + masterContext.getSessionManager().getAllVariables(sessionId)); + GenericEntity<Map<String, Map<String, String>>> variablesEntity = + new GenericEntity<Map<String, Map<String, String>>>(variablesMap, Map.class); + return Response.ok(variablesEntity).build(); + } catch (InvalidSessionException e) { + LOG.error("Unable to find a session : " + sessionId); + + return Response.status(Response.Status.NOT_FOUND).build(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + return ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + } + + } + + /** + * Updates the specified session varaible or variables + * + * @param sessionId + * @param variables + * @return + */ + @PUT + @Path("/{session-id}/variables") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateSessionVariables(@PathParam("session-id") String sessionId, + Map<String, Object> variables) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a update session variables request for a session : " + sessionId); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + JerseyResourceDelegateContextKey<Map> variablesMapKey = + JerseyResourceDelegateContextKey.valueOf(variablesKeyName, Map.class); + context.put(variablesMapKey, variables); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new UpdateSessionVariablesDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class UpdateSessionVariablesDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + JerseyResourceDelegateContextKey<Map> variablesKey = + JerseyResourceDelegateContextKey.valueOf(variablesKeyName, Map.class); + Map<String, Object> variables = context.get(variablesKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + TajoMaster.MasterContext masterContext = context.get(masterContextKey); + + try { + if (variables.containsKey(variablesOutputKeyName)) { + Map<String, String> variablesMap = (Map<String, String>) variables.get(variablesOutputKeyName); + for (Map.Entry<String, String> variableEntry: variablesMap.entrySet()) { + masterContext.getSessionManager().setVariable(sessionId, variableEntry.getKey(), variableEntry.getValue()); + } + + return Response.ok().build(); + } else { + Iterator<Map.Entry<String, Object>> iterator = variables.entrySet().iterator(); + if (iterator.hasNext()) { + Map.Entry<String, Object> entry = iterator.next(); + + masterContext.getSessionManager().setVariable(sessionId, entry.getKey(), (String) entry.getValue()); + + return Response.ok().build(); + } else { + return ResourcesUtil.createBadRequestResponse(LOG, "At least one variable is required."); + } + } + } catch (InvalidSessionException e) { + LOG.error("Unable to find a session : " + sessionId); + + return Response.status(Response.Status.NOT_FOUND).build(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + return ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/TablesResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/TablesResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/TablesResource.java new file mode 100644 index 0000000..3e638d6 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/TablesResource.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import javax.ws.rs.core.Response.Status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; + +@Path("/databases/{databaseName}/tables") +public class TablesResource { + + private static final Log LOG = LogFactory.getLog(TablesResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + @PathParam("databaseName") + String databaseName; + + JerseyResourceDelegateContext context; + + private static final String databaseNameKeyName = "databaseName"; + private static final String tableNameKeyName = "tableName"; + private static final String tableDescKeyName = "tableDesc"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public void setUriInfo(UriInfo uriInfo) { + this.uriInfo = uriInfo; + } + + public void setApplication(Application application) { + this.application = application; + } + + /** + * + * @param databaseName + * @param tableMeta + * @return + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response createNewTable(TableDesc tableDesc) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a create table request on " + databaseName + " database."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + JerseyResourceDelegateContextKey<TableDesc> tableDescKey = + JerseyResourceDelegateContextKey.valueOf(tableDescKeyName, TableDesc.class); + context.put(tableDescKey, tableDesc); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new CreateNewTableDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class CreateNewTableDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + JerseyResourceDelegateContextKey<TableDesc> tableDescKey = + JerseyResourceDelegateContextKey.valueOf(tableDescKeyName, TableDesc.class); + TableDesc tableDesc = context.get(tableDescKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + if (!CatalogUtil.isFQTableName(tableDesc.getName())) { + tableDesc.setName(CatalogUtil.getCanonicalTableName(databaseName, tableDesc.getName())); + } + + CatalogService catalogService = masterContext.getCatalog(); + boolean tableCreated = catalogService.createTable(tableDesc); + if (tableCreated) { + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + UriInfo uriInfo = context.get(uriInfoKey); + + URI tableUri = uriInfo.getBaseUriBuilder() + .path(TablesResource.class) + .path(TablesResource.class, "getTable") + .build(databaseName, CatalogUtil.extractSimpleName(tableDesc.getName())); + return Response.created(tableUri).build(); + } else { + return ResourcesUtil.createExceptionResponse(LOG, "Table Creation has been failed."); + } + } + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getAllTables() { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get all tables request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetAllTablesDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetAllTablesDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + + CatalogService catalogService = masterContext.getCatalog(); + + if (!catalogService.existDatabase(databaseName)) { + return Response.status(Status.NOT_FOUND).build(); + } + + Collection<String> tableNames = catalogService.getAllTableNames(databaseName); + List<TableDesc> tables = new ArrayList<TableDesc>(); + + for (String tableName: tableNames) { + tables.add(new TableDesc(tableName, null, null, null)); + } + + return Response.ok(tables).build(); + } + + } + + @GET + @Path("/{tableName}") + @Produces(MediaType.APPLICATION_JSON) + public Response getTable(@PathParam("tableName") String tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get table request."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + JerseyResourceDelegateContextKey<String> tableNameKey = + JerseyResourceDelegateContextKey.valueOf(tableNameKeyName, String.class); + context.put(tableNameKey, tableName); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetTableDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetTableDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + JerseyResourceDelegateContextKey<String> tableNameKey = + JerseyResourceDelegateContextKey.valueOf(tableNameKeyName, String.class); + String tableName = context.get(tableNameKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + if (CatalogUtil.isFQTableName(tableName)) { + tableName = CatalogUtil.extractSimpleName(tableName); + } + + CatalogService catalogService = masterContext.getCatalog(); + if (!catalogService.existDatabase(databaseName) || + !catalogService.existsTable(databaseName, tableName)) { + return Response.status(Status.NOT_FOUND).build(); + } + + TableDesc tableDesc = catalogService.getTableDesc(databaseName, tableName); + return Response.ok(tableDesc).build(); + } + } + + @DELETE + @Path("/{tableName}") + public Response deleteTable(@PathParam("tableName") String tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a drop table request for the table " + tableName); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + JerseyResourceDelegateContextKey<String> tableNameKey = + JerseyResourceDelegateContextKey.valueOf(tableNameKeyName, String.class); + context.put(tableNameKey, tableName); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new DeleteTableDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class DeleteTableDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + JerseyResourceDelegateContextKey<String> tableNameKey = + JerseyResourceDelegateContextKey.valueOf(tableNameKeyName, String.class); + String tableName = context.get(tableNameKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + if (CatalogUtil.isFQTableName(tableName)) { + tableName = CatalogUtil.extractSimpleName(tableName); + } + + CatalogService catalogService = masterContext.getCatalog(); + if (!catalogService.existDatabase(databaseName) || + !catalogService.existsTable(databaseName, tableName)) { + return Response.status(Status.NOT_FOUND).build(); + } + + String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName); + boolean tableDropped = + catalogService.dropTable(canonicalTableName); + if (tableDropped) { + return Response.ok().build(); + } else { + return ResourcesUtil.createExceptionResponse(LOG, "Unable to drop a " + canonicalTableName + " table."); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/DatabaseInfoResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/DatabaseInfoResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/DatabaseInfoResponse.java new file mode 100644 index 0000000..846ba2d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/DatabaseInfoResponse.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import com.google.gson.annotations.Expose; + +public class DatabaseInfoResponse { + + @Expose private int id; + @Expose private String tablespace; + @Expose private String name; + + public int getId() { + return id; + } + public void setId(int id) { + this.id = id; + } + public String getTablespace() { + return tablespace; + } + public void setTablespace(String tablespace) { + this.tablespace = tablespace; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ExceptionResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ExceptionResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ExceptionResponse.java new file mode 100644 index 0000000..36046b1 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ExceptionResponse.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import com.google.gson.annotations.Expose; + +public class ExceptionResponse { + @Expose private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/GetQueryResultDataResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/GetQueryResultDataResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/GetQueryResultDataResponse.java new file mode 100644 index 0000000..512f20f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/GetQueryResultDataResponse.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.ipc.ClientProtos; + +import com.google.gson.annotations.Expose; + +public class GetQueryResultDataResponse { + + @Expose private ClientProtos.ResultCode resultCode; + @Expose private Schema schema; + @Expose private int bytesNum; + @Expose private ResultSetInfoResponse resultset; + @Expose private String errorMessage; + @Expose private String errorTrace; + + public ClientProtos.ResultCode getResultCode() { + return resultCode; + } + public void setResultCode(ClientProtos.ResultCode resultCode) { + this.resultCode = resultCode; + } + public Schema getSchema() { + return schema; + } + public void setSchema(Schema schema) { + this.schema = schema; + } + public int getBytesNum() { + return bytesNum; + } + public void setBytesNum(int bytesNum) { + this.bytesNum = bytesNum; + } + public ResultSetInfoResponse getResultset() { + return resultset; + } + public void setResultset(ResultSetInfoResponse resultset) { + this.resultset = resultset; + } + public String getErrorMessage() { + return errorMessage; + } + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + public String getErrorTrace() { + return errorTrace; + } + public void setErrorTrace(String errorTrace) { + this.errorTrace = errorTrace; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/NewSessionResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/NewSessionResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/NewSessionResponse.java new file mode 100644 index 0000000..67b5b33 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/NewSessionResponse.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.util.TUtil; + +import com.google.gson.annotations.Expose; + +import java.util.Map; + +public class NewSessionResponse { + + @Expose private String id; + @Expose private String message; + @Expose private ClientProtos.ResultCode resultCode; + @Expose private Map<String, String> variables; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public ClientProtos.ResultCode getResultCode() { + return resultCode; + } + + public void setResultCode(ClientProtos.ResultCode resultCode) { + this.resultCode = resultCode; + } + + public Map<String, String> getVariables() { + if (variables == null) { + variables = TUtil.newHashMap(); + } + return variables; + } + + public void setVariables(Map<String, String> variables) { + getVariables().putAll(variables); + } + + @Override + public String toString() { + return "NewSessionResponse [id=" + id + ", message=" + message + ", resultCode=" + resultCode + ", variables=" + + variables + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ResultSetInfoResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ResultSetInfoResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ResultSetInfoResponse.java new file mode 100644 index 0000000..93d79c9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/ResultSetInfoResponse.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import java.net.URI; + +import com.google.gson.annotations.Expose; + +public class ResultSetInfoResponse { + + @Expose private long id; + @Expose private URI link; + + public long getId() { + return id; + } + public void setId(long id) { + this.id = id; + } + public URI getLink() { + return link; + } + public void setLink(URI link) { + this.link = link; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java new file mode 100644 index 0000000..3c8cb8b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import org.apache.tajo.master.cluster.WorkerConnectionInfo; + +import com.google.gson.annotations.Expose; + +public class WorkerConnectionInfoResponse { + + @Expose private int id; + + @Expose private String host; + + @Expose private int peerRpcPort; + + @Expose private int pullServerPort; + + @Expose private int queryMasterPort; + + @Expose private int clientPort; + + @Expose private int httpInfoPort; + + public WorkerConnectionInfoResponse(WorkerConnectionInfo connectionInfo) { + this.id = connectionInfo.getId(); + this.host = connectionInfo.getHost(); + this.peerRpcPort = connectionInfo.getPeerRpcPort(); + this.pullServerPort = connectionInfo.getPullServerPort(); + this.clientPort = connectionInfo.getClientPort(); + this.queryMasterPort = connectionInfo.getQueryMasterPort(); + this.httpInfoPort = connectionInfo.getHttpInfoPort(); + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java new file mode 100644 index 0000000..84b81f8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.responses; + +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerResource; + +import com.google.gson.annotations.Expose; + +public class WorkerResponse { + + @Expose private WorkerConnectionInfoResponse connectionInfo; + + @Expose private float diskSlots; + @Expose private int cpuCoreSlots; + @Expose private int memoryMB; + + @Expose private float usedDiskSlots; + @Expose private int usedMemoryMB; + @Expose private int usedCpuCoreSlots; + + @Expose private long maxHeap; + @Expose private long freeHeap; + @Expose private long totalHeap; + + @Expose private int numRunningTasks; + @Expose private int numQueryMasterTasks; + + @Expose private long lastHeartbeatTime; + + public WorkerResponse(Worker worker) { + this(worker.getResource()); + + this.connectionInfo = new WorkerConnectionInfoResponse(worker.getConnectionInfo()); + + this.lastHeartbeatTime = worker.getLastHeartbeatTime(); + } + + private WorkerResponse(WorkerResource resource) { + this.cpuCoreSlots = resource.getCpuCoreSlots(); + this.memoryMB = resource.getMemoryMB(); + this.usedDiskSlots = resource.getUsedDiskSlots(); + this.usedMemoryMB = resource.getUsedMemoryMB(); + this.usedCpuCoreSlots = resource.getUsedCpuCoreSlots(); + this.maxHeap = resource.getMaxHeap(); + this.freeHeap = resource.getFreeHeap(); + this.totalHeap = resource.getTotalHeap(); + this.numRunningTasks = resource.getNumRunningTasks(); + this.numQueryMasterTasks = resource.getNumQueryMasterTasks(); + } + + public WorkerConnectionInfoResponse getConnectionInfo() { + return connectionInfo; + } + + public void setConnectionInfo(WorkerConnectionInfoResponse connectionInfo) { + this.connectionInfo = connectionInfo; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 9d9310e..f7fb2f2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -336,6 +336,7 @@ public class TajoTestingCluster { c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0"); c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0"); c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir"); + c.setIntVar(ConfVars.REST_SERVICE_PORT, 0); LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI)); @@ -361,6 +362,10 @@ public class TajoTestingCluster { tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort()); this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)); this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS)); + + InetSocketAddress tajoRestAddress = tajoMaster.getContext().getRestServer().getBindAddress(); + + this.conf.setIntVar(ConfVars.REST_SERVICE_PORT, tajoRestAddress.getPort()); startTajoWorkers(numSlaves); http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index 7c91e22..6415588 100644 --- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -108,6 +108,8 @@ public class TestHAServiceHDFSImpl { masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setIntVar(TajoConf.ConfVars.REST_SERVICE_PORT, + NetUtils.getFreeSocketPort()); conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/RestTestUtils.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/RestTestUtils.java b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/RestTestUtils.java new file mode 100644 index 0000000..e0d9db3 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/RestTestUtils.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.lang.reflect.Type; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.json.FunctionAdapter; +import org.apache.tajo.catalog.json.TableMetaAdapter; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.function.Function; +import org.apache.tajo.json.ClassNameSerializer; +import org.apache.tajo.json.DataTypeAdapter; +import org.apache.tajo.json.DatumAdapter; +import org.apache.tajo.json.GsonSerDerAdapter; +import org.apache.tajo.json.PathSerializer; +import org.apache.tajo.json.TimeZoneGsonSerdeAdapter; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.function.AggFunction; +import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.serder.EvalNodeAdapter; +import org.apache.tajo.plan.serder.LogicalNodeAdapter; +import org.apache.tajo.util.TUtil; + +public class RestTestUtils { + + public static Map<Type, GsonSerDerAdapter<?>> registerTypeAdapterMap() { + Map<Type, GsonSerDerAdapter<?>> adapters = TUtil.newHashMap(); + adapters.put(Path.class, new PathSerializer()); + adapters.put(Class.class, new ClassNameSerializer()); + adapters.put(LogicalNode.class, new LogicalNodeAdapter()); + adapters.put(EvalNode.class, new EvalNodeAdapter()); + adapters.put(TableMeta.class, new TableMetaAdapter()); + adapters.put(Function.class, new FunctionAdapter()); + adapters.put(GeneralFunction.class, new FunctionAdapter()); + adapters.put(AggFunction.class, new FunctionAdapter()); + adapters.put(Datum.class, new DatumAdapter()); + adapters.put(DataType.class, new DataTypeAdapter()); + adapters.put(TimeZone.class, new TimeZoneGsonSerdeAdapter()); + + return adapters; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestClusterResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestClusterResource.java b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestClusterResource.java new file mode 100644 index 0000000..c572eee --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestClusterResource.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.GenericType; + +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ws.rs.netty.gson.GsonFeature; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.filter.LoggingFilter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestClusterResource extends QueryTestCaseBase { + + private URI restServiceURI; + private URI clusterURI; + private Client restClient; + + public TestClusterResource() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @Before + public void setUp() throws Exception { + int restPort = testBase.getTestingCluster().getConfiguration().getIntVar(ConfVars.REST_SERVICE_PORT); + restServiceURI = new URI("http", null, "127.0.0.1", restPort, "/rest", null, null); + clusterURI = new URI(restServiceURI + "/cluster"); + restClient = ClientBuilder.newBuilder() + .register(new GsonFeature(RestTestUtils.registerTypeAdapterMap())) + .register(LoggingFilter.class) + .property(ClientProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true) + .property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, true) + .build(); + } + + @After + public void tearDown() throws Exception { + restClient.close(); + } + + @Test + public void testGetCluster() throws Exception { + Map<String, List<Object>> workerMap = + restClient.target(clusterURI) + .request().get(new GenericType<Map<String, List<Object>>>(Map.class)); + + assertNotNull(workerMap); + assertFalse(workerMap.isEmpty()); + assertNotNull(workerMap.get("workers")); + + List<Object> workerList = workerMap.get("workers"); + + assertTrue(workerList.size() > 0); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestDatabasesResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestDatabasesResource.java b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestDatabasesResource.java new file mode 100644 index 0000000..59c53f1 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestDatabasesResource.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.Collection; +import java.util.Map; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ws.rs.netty.gson.GsonFeature; +import org.apache.tajo.ws.rs.requests.NewDatabaseRequest; +import org.apache.tajo.ws.rs.responses.DatabaseInfoResponse; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.filter.LoggingFilter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestDatabasesResource extends QueryTestCaseBase { + + private URI restServiceURI; + private URI databasesURI; + private Client restClient; + + public TestDatabasesResource() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @Before + public void setUp() throws Exception { + int restPort = testBase.getTestingCluster().getConfiguration().getIntVar(ConfVars.REST_SERVICE_PORT); + restServiceURI = new URI("http", null, "127.0.0.1", restPort, "/rest", null, null); + databasesURI = new URI(restServiceURI + "/databases"); + restClient = ClientBuilder.newBuilder() + .register(new GsonFeature(RestTestUtils.registerTypeAdapterMap())) + .register(LoggingFilter.class) + .property(ClientProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true) + .property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, true) + .build(); + } + + @After + public void tearDown() throws Exception { + restClient.close(); + } + + @Test + public void testGetAllDatabases() throws Exception { + Map<String, Collection<String>> databaseNames = restClient.target(databasesURI) + .request().get(new GenericType<Map<String, Collection<String>>>(Map.class)); + + assertNotNull(databaseNames); + assertFalse(databaseNames.isEmpty()); + assertNotNull(databaseNames.get("databases")); + + Collection<String> databaseNamesCollection = databaseNames.get("databases"); + + assertTrue(databaseNamesCollection.contains(TajoConstants.DEFAULT_DATABASE_NAME)); + } + + @Test + public void testCreateDatabase() throws Exception { + String databaseName = "TestDatabasesResource"; + NewDatabaseRequest request = new NewDatabaseRequest(); + + request.setDatabaseName(databaseName); + + Response response = restClient.target(databasesURI) + .request().post(Entity.entity(request, MediaType.APPLICATION_JSON)); + + assertNotNull(response); + assertEquals(Status.CREATED.getStatusCode(), response.getStatus()); + + Map<String, Collection<String>> databaseNames = restClient.target(databasesURI) + .request().get(new GenericType<Map<String, Collection<String>>>(Map.class)); + + assertNotNull(databaseNames); + assertFalse(databaseNames.isEmpty()); + assertNotNull(databaseNames.get("databases")); + + Collection<String> databaseNamesCollection = databaseNames.get("databases"); + + assertTrue(databaseNamesCollection.contains(databaseName)); + } + + @Test + public void testCreateDatabaseBadRequest() throws Exception { + NewDatabaseRequest request = new NewDatabaseRequest(); + + Response response = restClient.target(databasesURI) + .request().post(Entity.entity(request, MediaType.APPLICATION_JSON)); + + assertNotNull(response); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + } + + @Test + public void testGetDatabase() throws Exception { + DatabaseInfoResponse response = + restClient.target(databasesURI).path("/{databaseName}") + .resolveTemplate("databaseName", TajoConstants.DEFAULT_DATABASE_NAME) + .request().get(new GenericType<DatabaseInfoResponse>(DatabaseInfoResponse.class)); + + assertNotNull(response); + assertEquals(TajoConstants.DEFAULT_DATABASE_NAME, response.getName()); + assertTrue(response.getTablespace() != null && !response.getTablespace().isEmpty()); + } + + @Test + public void testGetDatabaseNotFound() throws Exception { + Response response = + restClient.target(databasesURI).path("/{databaseName}") + .resolveTemplate("databaseName", "testGetDatabaseNotFound") + .request().get(); + + assertNotNull(response); + assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); + } + + @Test + public void testDropDatabase() throws Exception { + String databaseName = "TestDropDatabase"; + NewDatabaseRequest request = new NewDatabaseRequest(); + + request.setDatabaseName(databaseName); + + Response response = restClient.target(databasesURI) + .request().post(Entity.entity(request, MediaType.APPLICATION_JSON)); + + assertNotNull(response); + assertEquals(Status.CREATED.getStatusCode(), response.getStatus()); + + Map<String, Collection<String>> databaseNames = restClient.target(databasesURI) + .request().get(new GenericType<Map<String, Collection<String>>>(Map.class)); + + assertNotNull(databaseNames); + assertFalse(databaseNames.isEmpty()); + assertNotNull(databaseNames.get("databases")); + + Collection<String> databaseNamesCollection = databaseNames.get("databases"); + + assertTrue(databaseNamesCollection.contains(databaseName)); + + response = restClient.target(databasesURI) + .path("/{databaseName}").resolveTemplate("databaseName", databaseName) + .request().delete(); + + assertNotNull(response); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + } + + @Test + public void testDropDatabaseNotFound() throws Exception { + Response response = restClient.target(databasesURI) + .path("/{databaseName}").resolveTemplate("databaseName", "TestDropDatabaseNotFound") + .request().delete(); + + assertNotNull(response); + assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestFunctionsResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestFunctionsResource.java b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestFunctionsResource.java new file mode 100644 index 0000000..2794ded --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/ws/rs/resources/TestFunctionsResource.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.List; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.GenericType; + +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.ws.rs.netty.gson.GsonFeature; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.filter.LoggingFilter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestFunctionsResource extends QueryTestCaseBase { + + private URI restServiceURI; + private URI functionsURI; + private Client restClient; + + public TestFunctionsResource() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @Before + public void setUp() throws Exception { + int restPort = testBase.getTestingCluster().getConfiguration().getIntVar(ConfVars.REST_SERVICE_PORT); + restServiceURI = new URI("http", null, "127.0.0.1", restPort, "/rest", null, null); + functionsURI = new URI(restServiceURI + "/databases/" + TajoConstants.DEFAULT_DATABASE_NAME + "/functions"); + restClient = ClientBuilder.newBuilder() + .register(new GsonFeature(RestTestUtils.registerTypeAdapterMap())) + .register(LoggingFilter.class) + .property(ClientProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true) + .property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, true) + .build(); + } + + @After + public void tearDown() throws Exception { + restClient.close(); + } + + @Test + public void testGetAllFunctions() throws Exception { + List<FunctionSignature> functionSignatures = + restClient.target(functionsURI) + .request().get(new GenericType<List<FunctionSignature>>(List.class)); + + assertNotNull(functionSignatures); + assertTrue(functionSignatures.size() > 0); + } +}
