http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/RestServer.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/RestServer.java new file mode 100644 index 0000000..5b59c0c --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/RestServer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest; + +import static org.apache.vxquery.rest.Constants.URLs.QUERY_ENDPOINT; +import static org.apache.vxquery.rest.Constants.URLs.QUERY_RESULT_ENDPOINT; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.http.server.HttpServer; +import org.apache.hyracks.http.server.WebManager; +import org.apache.vxquery.exceptions.VXQueryRuntimeException; +import org.apache.vxquery.rest.service.VXQueryService; +import org.apache.vxquery.rest.servlet.QueryAPIServlet; +import org.apache.vxquery.rest.servlet.QueryResultAPIServlet; + +/** + * REST Server class responsible for starting a new server on a given port. + * + * @author Erandi Ganepola + */ +public class RestServer { + + public static final Logger LOGGER = Logger.getLogger(RestServer.class.getName()); + + private WebManager webManager; + private int port; + + public RestServer(VXQueryService vxQueryService, int port) { + if (port == 0) { + throw new IllegalArgumentException("REST Server port cannot be 0"); + } + + this.port = port; + + webManager = new WebManager(); + HttpServer restServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), this.port); + restServer.addServlet(new QueryAPIServlet(vxQueryService, restServer.ctx(), QUERY_ENDPOINT)); + restServer.addServlet(new QueryResultAPIServlet(vxQueryService, restServer.ctx(), QUERY_RESULT_ENDPOINT)); + webManager.add(restServer); + } + + public void start() { + try { + LOGGER.log(Level.FINE, "Starting rest server on port: " + port); + webManager.start(); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error occurred when starting rest server", e); + throw new VXQueryRuntimeException("Unable to start REST server", e); + } + LOGGER.log(Level.INFO, "Rest server started on port: " + port); + } + + public void stop() { + try { + LOGGER.log(Level.FINE, "Stopping rest server"); + webManager.stop(); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error occurred when stopping VXQueryService", e); + throw new VXQueryRuntimeException("Error occurred when stopping rest server", e); + } + LOGGER.log(Level.INFO, "Rest server stopped"); + } + + public int getPort() { + return port; + } +}
http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java new file mode 100644 index 0000000..a88ae1c --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryRequest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.rest.request; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.vxquery.rest.RestServer; + +/** + * Request to represent a query request coming to the {@link RestServer} + * + * @author Erandi Ganepola + */ +public class QueryRequest { + + public static final int DEFAULT_FRAMESIZE = 65536; + public static final int DEFAULT_OPTIMIZATION = 0; + + private String statement; + private boolean async = true; + private boolean compileOnly; + private int optimization = DEFAULT_OPTIMIZATION; + /** Frame size in bytes. (default: 65,536) */ + private int frameSize = DEFAULT_FRAMESIZE; + private int repeatExecutions = 1; + private boolean showMetrics = false; + private boolean showAbstractSyntaxTree = false; + private boolean showTranslatedExpressionTree = false; + private boolean showOptimizedExpressionTree = false; + private boolean showRuntimePlan = false; + /** A unique UUID to uniquely identify a given request */ + private String requestId; + + /** An optional map of source files. Required for XTests */ + private Map<String, File> sourceFileMap = new HashMap<>(); + + public QueryRequest(String statement) { + this(null, statement); + } + + public QueryRequest(String requestId, String statement) { + if (statement == null) { + throw new IllegalArgumentException("Statement cannot be null"); + } + + this.statement = statement; + this.requestId = requestId; + } + + public String getStatement() { + return statement; + } + + public boolean isCompileOnly() { + return compileOnly; + } + + public void setCompileOnly(boolean compileOnly) { + this.compileOnly = compileOnly; + } + + public int getOptimization() { + return optimization; + } + + public void setOptimization(int optimization) { + this.optimization = optimization; + } + + public int getFrameSize() { + return frameSize; + } + + public void setFrameSize(int frameSize) { + this.frameSize = frameSize; + } + + public int getRepeatExecutions() { + return repeatExecutions; + } + + public void setRepeatExecutions(int repeatExecutions) { + this.repeatExecutions = repeatExecutions; + } + + public boolean isShowAbstractSyntaxTree() { + return showAbstractSyntaxTree; + } + + public void setShowAbstractSyntaxTree(boolean showAbstractSyntaxTree) { + this.showAbstractSyntaxTree = showAbstractSyntaxTree; + } + + public boolean isShowTranslatedExpressionTree() { + return showTranslatedExpressionTree; + } + + public void setShowTranslatedExpressionTree(boolean showTranslatedExpressionTree) { + this.showTranslatedExpressionTree = showTranslatedExpressionTree; + } + + public boolean isShowOptimizedExpressionTree() { + return showOptimizedExpressionTree; + } + + public void setShowOptimizedExpressionTree(boolean showOptimizedExpressionTree) { + this.showOptimizedExpressionTree = showOptimizedExpressionTree; + } + + public boolean isShowRuntimePlan() { + return showRuntimePlan; + } + + public void setShowRuntimePlan(boolean showRuntimePlan) { + this.showRuntimePlan = showRuntimePlan; + } + + public boolean isShowMetrics() { + return showMetrics; + } + + public void setShowMetrics(boolean showMetrics) { + this.showMetrics = showMetrics; + } + + public String toString() { + return String.format("{ statement : %s }", statement); + } + + public String getRequestId() { + return requestId; + } + + public boolean isAsync() { + return async; + } + + public void setAsync(boolean async) { + this.async = async; + } + + public Map<String, File> getSourceFileMap() { + return sourceFileMap; + } + + public void setSourceFileMap(Map<String, File> sourceFileMap) { + this.sourceFileMap = sourceFileMap; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryResultRequest.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryResultRequest.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryResultRequest.java new file mode 100644 index 0000000..5e43181 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/request/QueryResultRequest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.request; + +import org.apache.vxquery.rest.RestServer; + +/** + * Request to represent a query request coming to the {@link RestServer} + * + * @author Erandi Ganepola + */ +public class QueryResultRequest { + + private long resultId; + private boolean showMetrics = false; + private String requestId; + + public QueryResultRequest(long resultId) { + this(resultId, null); + } + + public QueryResultRequest(long resultId, String requestId) { + this.resultId = resultId; + this.requestId = requestId; + } + + public long getResultId() { + return resultId; + } + + public boolean isShowMetrics() { + return showMetrics; + } + + public void setShowMetrics(boolean showMetrics) { + this.showMetrics = showMetrics; + } + + public String getRequestId() { + return requestId; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/APIResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/APIResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/APIResponse.java new file mode 100644 index 0000000..57b67b7 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/APIResponse.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import static org.apache.vxquery.rest.Constants.RESULT_URL_PREFIX; + +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.vxquery.rest.request.QueryRequest; +import org.apache.vxquery.rest.service.Status; + +/** + * Base class for any type of response which can be sent by the REST server. + * These responses can be query responses, error responses or query result + * responses. + * + * @author Erandi Ganepola + */ +public class APIResponse { + + private String status; + private String requestId; + + public APIResponse() { + status = Status.SUCCESS.toString(); + } + + public APIResponse(String status) { + this.status = status; + } + + public String getStatus() { + return status; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public static ErrorResponse newErrorResponse(String requestId, Error error) { + ErrorResponse response = new ErrorResponse(); + response.setRequestId(requestId); + response.setError(error); + return response; + } + + public static QueryResponse newQueryResponse(QueryRequest request, ResultSetId resultSetId) { + QueryResponse response; + if (request.isAsync()) { + AsyncQueryResponse asyncQueryResponse = new AsyncQueryResponse(); + if (!request.isCompileOnly()) { + asyncQueryResponse.setResultId(resultSetId.getId()); + asyncQueryResponse.setResultUrl(RESULT_URL_PREFIX + resultSetId.getId()); + } + response = asyncQueryResponse; + } else { + response = new SyncQueryResponse(); + } + + response.setRequestId(request.getRequestId()); + return response; + } + + public static QueryResultResponse newQueryResultResponse(String requestId) { + QueryResultResponse response = new QueryResultResponse(); + response.setRequestId(requestId); + return response; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/AsyncQueryResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/AsyncQueryResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/AsyncQueryResponse.java new file mode 100644 index 0000000..3216dce --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/AsyncQueryResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Resource class to represent a response to a given user query + * + * @author Erandi Ganepola + */ +@XmlRootElement +public class AsyncQueryResponse extends QueryResponse { + + private long resultId; + private String resultUrl; + + public long getResultId() { + return resultId; + } + + public void setResultId(long resultId) { + this.resultId = resultId; + } + + public String getResultUrl() { + return resultUrl; + } + + public void setResultUrl(String resultUrl) { + this.resultUrl = resultUrl; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Error.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Error.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Error.java new file mode 100644 index 0000000..96a709d --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Error.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Represents the + * + * <pre> + * error + * </pre> + * + * part of an {@link ErrorResponse}. + * + * <pre> + * {@code + * <error> + * <code>405</code> + * <message>Invalid Input</message> + * </error>} + * </pre> + * + * @author Erandi Ganepola + */ +@XmlRootElement +public class Error { + + private int code; + private String message; + + public Error() { + } + + public Error(int code, String message) { + this.code = code; + this.message = message; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private int code = -1; + private String message = null; + + public Builder withCode(int code) { + this.code = code; + return this; + } + + public Builder withMessage(String message) { + this.message = message; + return this; + } + + public Error build() { + if (code == -1) { + code = 500; + } + + if (message == null) { + message = "unexpected Error"; + } + + return new Error(code, message); + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/ErrorResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/ErrorResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/ErrorResponse.java new file mode 100644 index 0000000..2de2b25 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/ErrorResponse.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.vxquery.rest.service.Status; + +/** + * <pre> + * {@code + * <errorResponse> + * <status>FATAL</status> + * <requestId>jabsa-jkk77j-hbah45-jknasj-jjlas</requestId> + * <error> + * <code>405</code> + * <message>Invalid Input</message> + * </error> + * </errorResponse> + * } + * </pre> + * + * @author Erandi Ganepola + */ +@XmlRootElement +public class ErrorResponse extends APIResponse { + + private Error error; + + public ErrorResponse() { + super(Status.FATAL.toString()); + } + + public Error getError() { + return error; + } + + public void setError(Error error) { + this.error = error; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Metrics.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Metrics.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Metrics.java new file mode 100644 index 0000000..e34e1c6 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/Metrics.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +public class Metrics { + private long compileTime; + private long elapsedTime; + + public long getCompileTime() { + return compileTime; + } + + public void setCompileTime(long compileTime) { + this.compileTime = compileTime; + } + + public long getElapsedTime() { + return elapsedTime; + } + + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResponse.java new file mode 100644 index 0000000..4916ffe --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResponse.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import org.apache.vxquery.rest.service.Status; + +/** + * The base class of the query response (the response returned when a query is + * sent for execution) + * + * @author Erandi Ganepola + */ +public class QueryResponse extends APIResponse { + + private String statement; + private String abstractSyntaxTree; + private String translatedExpressionTree; + private String optimizedExpressionTree; + private String runtimePlan; + private Metrics metrics = new Metrics(); + + public QueryResponse() { + super(Status.SUCCESS.toString()); + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public String getAbstractSyntaxTree() { + return abstractSyntaxTree; + } + + public void setAbstractSyntaxTree(String abstractSyntaxTree) { + this.abstractSyntaxTree = abstractSyntaxTree; + } + + public String getTranslatedExpressionTree() { + return translatedExpressionTree; + } + + public void setTranslatedExpressionTree(String translatedExpressionTree) { + this.translatedExpressionTree = translatedExpressionTree; + } + + public String getOptimizedExpressionTree() { + return optimizedExpressionTree; + } + + public void setOptimizedExpressionTree(String optimizedExpressionTree) { + this.optimizedExpressionTree = optimizedExpressionTree; + } + + public String getRuntimePlan() { + return runtimePlan; + } + + public void setRuntimePlan(String runtimePlan) { + this.runtimePlan = runtimePlan; + } + + public Metrics getMetrics() { + return metrics; + } + + public void setMetrics(Metrics metrics) { + this.metrics = metrics; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResultResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResultResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResultResponse.java new file mode 100644 index 0000000..2f74865 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/QueryResultResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.vxquery.rest.service.Status; + +@XmlRootElement +public class QueryResultResponse extends APIResponse { + + private String results; + private Metrics metrics = new Metrics(); + + public QueryResultResponse() { + super(Status.SUCCESS.toString()); + } + + public String getResults() { + return results; + } + + public void setResults(String results) { + this.results = results; + } + + public Metrics getMetrics() { + return metrics; + } + + public void setMetrics(Metrics metrics) { + this.metrics = metrics; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/SyncQueryResponse.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/SyncQueryResponse.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/SyncQueryResponse.java new file mode 100644 index 0000000..b42a912 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/response/SyncQueryResponse.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.response; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +public class SyncQueryResponse extends QueryResponse { + + private String results; + + public String getResults() { + return results; + } + + public void setResults(String results) { + this.results = results; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/HyracksJobContext.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/HyracksJobContext.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/HyracksJobContext.java new file mode 100644 index 0000000..9dc967f --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/HyracksJobContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.service; + +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobId; + +/** + * A class to map {@link ResultSetId} with {@link JobId} when a job is submitted + * to hyracks. This mapping will later be used to determine the {@link JobId} + * instance of the corresponding {@link ResultSetId} + * + * @author Erandi Ganepola + */ +public class HyracksJobContext { + + private JobId jobId; + private int frameSize; + private ResultSetId resultSetId; + + public HyracksJobContext(JobId jobId, int frameSize, ResultSetId resultSetId) { + this.jobId = jobId; + this.frameSize = frameSize; + this.resultSetId = resultSetId; + } + + public JobId getJobId() { + return jobId; + } + + public int getFrameSize() { + return frameSize; + } + + public ResultSetId getResultSetId() { + return resultSetId; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/State.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/State.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/State.java new file mode 100644 index 0000000..d6b4b3c --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/State.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.service; + +/** + * An enum to represent states of {@link VXQueryService} class + * + * @author Erandi Ganepola + */ +public enum State { + STARTING, + STARTED, + STOPPING, + STOPPED +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/Status.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/Status.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/Status.java new file mode 100644 index 0000000..5757ae7 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/Status.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.service; + +/** + * There can only 2 states for a response from the + * {@link org.apache.vxquery.rest.RestServer}. They are, + * + * <pre> + * SUCCESS + * </pre> + * + * and + * + * <pre> + * FATAL + * </pre> + * + * . This enum represents those two types. + * + * @author Erandi Ganepola + */ +public enum Status { + SUCCESS("success"), + FATAL("fatal"); + + private final String name; + + Status(String name) { + this.name = name; + } + + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryConfig.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryConfig.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryConfig.java new file mode 100644 index 0000000..4f20c6b --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryConfig.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.service; + +/** + * A class to store default/user specified configurations required at runtime by + * the {@link VXQueryService} class. These configuration will be loaded through + * a properties file. + * + * @author Erandi Ganepola + */ +public class VXQueryConfig { + + /** Number of available processors. (default: java's available processors) */ + private int availableProcessors = Runtime.getRuntime().availableProcessors(); + /** Setting frame size. (default: 65,536) */ + private int frameSize = 65536; + /** Join hash size in bytes. (default: 67,108,864) */ + private long joinHashSize = -1; + /** Maximum possible data size in bytes. (default: 150,323,855,000) */ + private long maximumDataSize = -1; + /** Directory path to Hadoop configuration files */ + private String hdfsConf = null; + + private String hyracksClientIp; + private int hyracksClientPort; + + public int getAvailableProcessors() { + return availableProcessors; + } + + public void setAvailableProcessors(int availableProcessors) { + if (availableProcessors > 0) { + this.availableProcessors = availableProcessors; + } + } + + public long getJoinHashSize() { + return joinHashSize; + } + + public void setJoinHashSize(long joinHashSize) { + this.joinHashSize = joinHashSize; + } + + public long getMaximumDataSize() { + return maximumDataSize; + } + + public void setMaximumDataSize(long maximumDataSize) { + this.maximumDataSize = maximumDataSize; + } + + public String getHdfsConf() { + return hdfsConf; + } + + public void setHdfsConf(String hdfsConf) { + this.hdfsConf = hdfsConf; + } + + public int getHyracksClientPort() { + return hyracksClientPort; + } + + public void setHyracksClientPort(int hyracksClientPort) { + this.hyracksClientPort = hyracksClientPort; + } + + public String getHyracksClientIp() { + return hyracksClientIp; + } + + public void setHyracksClientIp(String hyracksClientIp) { + this.hyracksClientIp = hyracksClientIp; + } + + public int getFrameSize() { + return frameSize; + } + + public void setFrameSize(int frameSize) { + this.frameSize = frameSize; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java new file mode 100644 index 0000000..1d51b6a --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/service/VXQueryService.java @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.rest.service; + +import static java.util.logging.Level.SEVERE; +import static org.apache.vxquery.rest.Constants.ErrorCodes.NOT_FOUND; +import static org.apache.vxquery.rest.Constants.ErrorCodes.PROBLEM_WITH_QUERY; +import static org.apache.vxquery.rest.Constants.ErrorCodes.UNFORSEEN_PROBLEM; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.dataset.DatasetJobRecord; +import org.apache.hyracks.api.dataset.IHyracksDatasetReader; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; +import org.apache.vxquery.compiler.CompilerControlBlock; +import org.apache.vxquery.compiler.algebricks.VXQueryGlobalDataFactory; +import org.apache.vxquery.compiler.algebricks.prettyprint.VXQueryLogicalExpressionPrettyPrintVisitor; +import org.apache.vxquery.context.DynamicContext; +import org.apache.vxquery.context.DynamicContextImpl; +import org.apache.vxquery.context.RootStaticContextImpl; +import org.apache.vxquery.context.StaticContextImpl; +import org.apache.vxquery.exceptions.ErrorCode; +import org.apache.vxquery.exceptions.SystemException; +import org.apache.vxquery.exceptions.VXQueryRuntimeException; +import org.apache.vxquery.rest.request.QueryRequest; +import org.apache.vxquery.rest.request.QueryResultRequest; +import org.apache.vxquery.rest.response.APIResponse; +import org.apache.vxquery.rest.response.Error; +import org.apache.vxquery.rest.response.QueryResponse; +import org.apache.vxquery.rest.response.QueryResultResponse; +import org.apache.vxquery.rest.response.SyncQueryResponse; +import org.apache.vxquery.result.ResultUtils; +import org.apache.vxquery.xmlquery.ast.ModuleNode; +import org.apache.vxquery.xmlquery.query.Module; +import org.apache.vxquery.xmlquery.query.XMLQueryCompiler; +import org.apache.vxquery.xmlquery.query.XQueryCompilationListener; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.xml.DomDriver; + +/** + * Main class responsible for handling query requests. This class will first + * compile, then submit query to hyracks and finally fetch results for a given + * query. + * + * @author Erandi Ganepola + */ +public class VXQueryService { + + private static final Logger LOGGER = Logger.getLogger(VXQueryService.class.getName()); + + private static final Pattern EMBEDDED_SYSERROR_PATTERN = Pattern.compile("(\\p{javaUpperCase}{4}\\d{4})"); + + private volatile State state = State.STOPPED; + private VXQueryConfig vxQueryConfig; + private AtomicLong atomicLong = new AtomicLong(0); + private Map<Long, HyracksJobContext> jobContexts = new ConcurrentHashMap<>(); + private IHyracksClientConnection hyracksClientConnection; + private HyracksDataset hyracksDataset; + + public VXQueryService(VXQueryConfig config) { + vxQueryConfig = config; + } + + /** + * Starts VXQueryService class by creating a {@link IHyracksClientConnection} + * which will later be used to submit and retrieve queries and results to/from + * hyracks. + */ + public synchronized void start() { + if (!State.STOPPED.equals(state)) { + throw new IllegalStateException("VXQueryService is at state : " + state); + } + + if (vxQueryConfig.getHyracksClientIp() == null) { + throw new IllegalArgumentException("hyracksClientIp is required to connect to hyracks"); + } + + setState(State.STARTING); + + try { + hyracksClientConnection = + new HyracksConnection(vxQueryConfig.getHyracksClientIp(), vxQueryConfig.getHyracksClientPort()); + } catch (Exception e) { + LOGGER.log(SEVERE, String.format("Unable to create a hyracks client connection to %s:%d", + vxQueryConfig.getHyracksClientIp(), vxQueryConfig.getHyracksClientPort())); + throw new VXQueryRuntimeException("Unable to create a hyracks client connection", e); + } + + LOGGER.log(Level.FINE, String.format("Using hyracks connection to %s:%d", vxQueryConfig.getHyracksClientIp(), + vxQueryConfig.getHyracksClientPort())); + + setState(State.STARTED); + LOGGER.log(Level.INFO, "VXQueryService started successfully"); + } + + private synchronized void setState(State newState) { + state = newState; + } + + /** + * Submits a query to hyracks to be run after compiling. Required intermediate + * results and metrics are also calculated according to the + * {@link QueryRequest}. Checks if this class has started before moving further. + * + * @param request + * {@link QueryRequest} containing information about the query to be + * executed and the merics required along with the results + * @return AsyncQueryResponse if no error occurs | ErrorResponse else + */ + public APIResponse execute(final QueryRequest request) { + QueryRequest indexingRequest = new QueryRequest("show-indexes()"); + indexingRequest.setAsync(false); + SyncQueryResponse indexingResponse = (SyncQueryResponse) execute(indexingRequest, new ArrayList<>()); + LOGGER.log(Level.FINE, String.format("Found indexes: %s", indexingResponse.getResults())); + + List<String> collections = Arrays.asList(indexingResponse.getResults().split("\n")); + return execute(request, collections); + } + + private APIResponse execute(final QueryRequest request, List<String> collections) { + if (!State.STARTED.equals(state)) { + throw new IllegalStateException("VXQueryService is at state : " + state); + } + + String query = request.getStatement(); + final ResultSetId resultSetId = createResultSetId(); + + QueryResponse response = APIResponse.newQueryResponse(request, resultSetId); + response.setStatement(query); + + // Obtaining the node controller information from hyracks client connection + Map<String, NodeControllerInfo> nodeControllerInfos = null; + try { + nodeControllerInfos = hyracksClientConnection.getNodeControllerInfos(); + } catch (HyracksException e) { + LOGGER.log(Level.SEVERE, String.format("Error occurred when obtaining NC info: '%s'", e.getMessage())); + return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM) + .withMessage("Hyracks connection problem: " + e.getMessage()).build()); + } + + // Adding a query compilation listener + VXQueryCompilationListener listener = new VXQueryCompilationListener(response, + request.isShowAbstractSyntaxTree(), request.isShowTranslatedExpressionTree(), + request.isShowOptimizedExpressionTree(), request.isShowRuntimePlan()); + + Date start = new Date(); + // Compiling the XQuery given + final XMLQueryCompiler compiler = new XMLQueryCompiler(listener, nodeControllerInfos, request.getFrameSize(), + vxQueryConfig.getAvailableProcessors(), vxQueryConfig.getJoinHashSize(), + vxQueryConfig.getMaximumDataSize(), vxQueryConfig.getHdfsConf()); + CompilerControlBlock compilerControlBlock = new CompilerControlBlock( + new StaticContextImpl(RootStaticContextImpl.INSTANCE), resultSetId, request.getSourceFileMap()); + try { + compiler.compile(null, new StringReader(query), compilerControlBlock, request.getOptimization(), + collections); + } catch (AlgebricksException e) { + LOGGER.log(Level.SEVERE, String.format("Error occurred when compiling query: '%s' with message: '%s'", + query, e.getMessage())); + return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(PROBLEM_WITH_QUERY) + .withMessage("Query compilation failure: " + e.getMessage()).build()); + } catch (SystemException e) { + LOGGER.log(Level.SEVERE, String.format("Error occurred when compiling query: '%s' with message: '%s'", + query, e.getMessage())); + return APIResponse.newErrorResponse(request.getRequestId(), + new Error(PROBLEM_WITH_QUERY, "Query compilation failure: " + e.getCode())); + } + + if (request.isShowMetrics()) { + response.getMetrics().setCompileTime(new Date().getTime() - start.getTime()); + } + + if (request.isCompileOnly()) { + return response; + } + + Module module = compiler.getModule(); + JobSpecification js = module.getHyracksJobSpecification(); + DynamicContext dCtx = new DynamicContextImpl(module.getModuleContext()); + js.setGlobalJobDataFactory(new VXQueryGlobalDataFactory(dCtx.createFactory())); + + HyracksJobContext hyracksJobContext; + start = new Date(); + if (!request.isAsync()) { + for (int i = 0; i < request.getRepeatExecutions(); i++) { + try { + hyracksJobContext = executeJob(js, resultSetId, request); + + } catch (Exception e) { + LOGGER.log(SEVERE, "Error occurred when submitting job to hyracks for query: " + query, e); + return APIResponse.newErrorResponse(request.getRequestId(), + Error.builder().withCode(UNFORSEEN_PROBLEM) + .withMessage("Error occurred when starting hyracks job").build()); + } + try { + String results = readResults(hyracksJobContext); + ((SyncQueryResponse) response).setResults(results); + } catch (HyracksException e) { + LOGGER.log(Level.SEVERE, "Error occurred when reading results", e); + SystemException se = getSystemException(e); + return APIResponse.newErrorResponse(request.getRequestId(), new Error(UNFORSEEN_PROBLEM, + String.format("Error occurred when reading results: %s", se != null ? se.getCode() : ""))); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error occurred when reading results", e); + return APIResponse.newErrorResponse(request.getRequestId(), + new Error(UNFORSEEN_PROBLEM, "Error occurred when reading results: " + e.getMessage())); + } + } + } else { + try { + hyracksJobContext = executeJob(js, resultSetId, request); + } catch (Exception e) { + LOGGER.log(SEVERE, "Error occurred when submitting job to hyracks for query: " + query, e); + return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(UNFORSEEN_PROBLEM) + .withMessage("Error occurred when starting hyracks job").build()); + } + jobContexts.put(resultSetId.getId(), hyracksJobContext); + } + + if (request.isShowMetrics()) { + response.getMetrics().setElapsedTime(new Date().getTime() - start.getTime()); + } + + return response; + } + + private HyracksJobContext executeJob(JobSpecification js, ResultSetId resultSetId, QueryRequest request) + throws Exception { + HyracksJobContext hyracksJobContext; + JobId jobId = hyracksClientConnection.startJob(js, EnumSet.of(JobFlag.PROFILE_RUNTIME)); + hyracksJobContext = new HyracksJobContext(jobId, js.getFrameSize(), resultSetId); + + return hyracksJobContext; + } + + private static SystemException getSystemException(HyracksException e) { + Throwable t = e; + Throwable candidate = t instanceof SystemException ? t : null; + while (t.getCause() != null) { + t = t.getCause(); + if (t instanceof SystemException) { + candidate = t; + } + } + + t = candidate == null ? t : candidate; + final String message = t.getMessage(); + if (message != null) { + Matcher m = EMBEDDED_SYSERROR_PATTERN.matcher(message); + if (m.find()) { + String eCode = m.group(1); + return new SystemException(ErrorCode.valueOf(eCode), e); + } + } + return null; + } + + /** + * Returns the query results for a given result set id. + * + * @param request + * {@link QueryResultRequest} with result ID required + * @return Either a {@link QueryResultResponse} if no error occurred | + * {@link org.apache.vxquery.rest.response.ErrorResponse} else. + */ + public APIResponse getResult(QueryResultRequest request) { + if (jobContexts.containsKey(request.getResultId())) { + QueryResultResponse resultResponse = APIResponse.newQueryResultResponse(request.getRequestId()); + Date start = new Date(); + try { + String results = readResults(jobContexts.get(request.getResultId())); + resultResponse.setResults(results); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error occurred when reading results for id : " + request.getResultId()); + return APIResponse.newErrorResponse(request.getRequestId(), new Error(UNFORSEEN_PROBLEM, + "Error occurred when reading results for: " + request.getResultId())); + } + + if (request.isShowMetrics()) { + resultResponse.getMetrics().setElapsedTime(new Date().getTime() - start.getTime()); + } + + return resultResponse; + } else { + return APIResponse.newErrorResponse(request.getRequestId(), Error.builder().withCode(NOT_FOUND) + .withMessage("No query found for result ID : " + request.getResultId()).build()); + } + } + + /** + * Reads results from hyracks given the {@link HyracksJobContext} containing + * {@link ResultSetId} and {@link JobId} mapping. + * + * @param jobContext + * mapoing between the {@link ResultSetId} and corresponding hyracks + * {@link JobId} + * @return Results of the given query + * @throws Exception + * IOErrors and etc + */ + private String readResults(HyracksJobContext jobContext) throws Exception { + int nReaders = 1; + + if (hyracksDataset == null) { + hyracksDataset = new HyracksDataset(hyracksClientConnection, jobContext.getFrameSize(), nReaders); + } + + FrameManager resultDisplayFrameMgr = new FrameManager(jobContext.getFrameSize()); + IFrame frame = new VSizeFrame(resultDisplayFrameMgr); + IHyracksDatasetReader reader = hyracksDataset.createReader(jobContext.getJobId(), jobContext.getResultSetId()); + OutputStream resultStream = new ByteArrayOutputStream(); + + // This loop is required for XTests to reliably identify the error code of + // SystemException. + while (reader.getResultStatus() == DatasetJobRecord.Status.RUNNING) { + Thread.sleep(100); + } + + IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(); + try (PrintWriter writer = new PrintWriter(resultStream, true)) { + while (reader.read(frame) > 0) { + writer.print(ResultUtils.getStringFromBuffer(frame.getBuffer(), frameTupleAccessor)); + writer.flush(); + frame.getBuffer().clear(); + } + } + + hyracksClientConnection.waitForCompletion(jobContext.getJobId()); + LOGGER.log(Level.FINE, String.format("Result for resultId %d completed", jobContext.getResultSetId().getId())); + return resultStream.toString(); + } + + /** + * Create a unique result set id to get the correct query back from the cluster. + * + * @return Result Set id generated with current system time. + */ + protected ResultSetId createResultSetId() { + long resultSetId = atomicLong.incrementAndGet(); + LOGGER.log(Level.FINE, String.format("Creating result set with ID : %d", resultSetId)); + return new ResultSetId(resultSetId); + } + + public synchronized void stop() { + if (!State.STOPPED.equals(state)) { + setState(State.STOPPING); + LOGGER.log(Level.FINE, "Stooping VXQueryService"); + setState(State.STOPPED); + LOGGER.log(Level.INFO, "VXQueryService stopped successfully"); + } else { + LOGGER.log(Level.INFO, "VXQueryService is already in state : " + state); + } + } + + public State getState() { + return state; + } + + /** + * A {@link XQueryCompilationListener} implementation to be used to add + * AbstractSyntaxTree, RuntimePlan and etc to the {@link QueryResponse} if + * requested by the user. + */ + private class VXQueryCompilationListener implements XQueryCompilationListener { + private QueryResponse response; + private boolean showAbstractSyntaxTree; + private boolean showTranslatedExpressionTree; + private boolean showOptimizedExpressionTree; + private boolean showRuntimePlan; + + public VXQueryCompilationListener(QueryResponse response, boolean showAbstractSyntaxTree, + boolean showTranslatedExpressionTree, boolean showOptimizedExpressionTree, boolean showRuntimePlan) { + this.response = response; + this.showAbstractSyntaxTree = showAbstractSyntaxTree; + this.showTranslatedExpressionTree = showTranslatedExpressionTree; + this.showOptimizedExpressionTree = showOptimizedExpressionTree; + this.showRuntimePlan = showRuntimePlan; + } + + @Override + public void notifyParseResult(ModuleNode moduleNode) { + if (showAbstractSyntaxTree) { + response.setAbstractSyntaxTree(new XStream(new DomDriver()).toXML(moduleNode)); + } + } + + @Override + public void notifyTranslationResult(Module module) { + if (showTranslatedExpressionTree) { + response.setTranslatedExpressionTree(appendPrettyPlan(new StringBuilder(), module).toString()); + } + } + + @Override + public void notifyTypecheckResult(Module module) { + } + + @Override + public void notifyCodegenResult(Module module) { + if (showRuntimePlan) { + JobSpecification jobSpec = module.getHyracksJobSpecification(); + try { + response.setRuntimePlan(jobSpec.toJSON().toString()); + } catch (IOException e) { + LOGGER.log(SEVERE, + "Error occurred when obtaining runtime plan from job specification : " + jobSpec.toString(), + e); + } + } + } + + @Override + public void notifyOptimizedResult(Module module) { + if (showOptimizedExpressionTree) { + response.setOptimizedExpressionTree(appendPrettyPlan(new StringBuilder(), module).toString()); + } + } + + @SuppressWarnings("Duplicates") + private StringBuilder appendPrettyPlan(StringBuilder sb, Module module) { + try { + ILogicalExpressionVisitor<String, Integer> ev = + new VXQueryLogicalExpressionPrettyPrintVisitor(module.getModuleContext()); + AlgebricksAppendable buffer = new AlgebricksAppendable(); + LogicalOperatorPrettyPrintVisitor v = new LogicalOperatorPrettyPrintVisitor(buffer, ev); + PlanPrettyPrinter.printPlan(module.getBody(), v, 0); + sb.append(buffer.toString()); + } catch (AlgebricksException e) { + LOGGER.log(SEVERE, "Error occurred when pretty printing expression : " + e.getMessage()); + } + return sb; + } + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryAPIServlet.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryAPIServlet.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryAPIServlet.java new file mode 100644 index 0000000..45ef910 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryAPIServlet.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.vxquery.rest.servlet; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static org.apache.vxquery.rest.Constants.MODE_ASYNC; +import static org.apache.vxquery.rest.Constants.MODE_SYNC; +import static org.apache.vxquery.rest.Constants.Parameters.COMPILE_ONLY; +import static org.apache.vxquery.rest.Constants.Parameters.FRAME_SIZE; +import static org.apache.vxquery.rest.Constants.Parameters.METRICS; +import static org.apache.vxquery.rest.Constants.Parameters.MODE; +import static org.apache.vxquery.rest.Constants.Parameters.OPTIMIZATION; +import static org.apache.vxquery.rest.Constants.Parameters.REPEAT_EXECUTIONS; +import static org.apache.vxquery.rest.Constants.Parameters.SHOW_AST; +import static org.apache.vxquery.rest.Constants.Parameters.SHOW_OET; +import static org.apache.vxquery.rest.Constants.Parameters.SHOW_RP; +import static org.apache.vxquery.rest.Constants.Parameters.SHOW_TET; +import static org.apache.vxquery.rest.Constants.Parameters.STATEMENT; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.stream.Collectors; + +import javax.xml.bind.JAXBException; + +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.vxquery.app.util.RestUtils; +import org.apache.vxquery.rest.Constants; +import org.apache.vxquery.rest.request.QueryRequest; +import org.apache.vxquery.rest.response.APIResponse; +import org.apache.vxquery.rest.response.Error; +import org.apache.vxquery.rest.service.VXQueryService; + +/** + * Servlet to handle query requests. + * + * @author Erandi Ganepola + */ +public class QueryAPIServlet extends RestAPIServlet { + + private VXQueryService vxQueryService; + + public QueryAPIServlet(VXQueryService vxQueryService, ConcurrentMap<String, Object> ctx, String... paths) { + super(ctx, paths); + this.vxQueryService = vxQueryService; + } + + @Override + protected APIResponse doHandle(IServletRequest request) { + LOGGER.log(Level.INFO, + String.format("Received a query request with query : %s", request.getParameter("statement"))); + + QueryRequest queryRequest; + try { + queryRequest = getQueryRequest(request); + } catch (Exception e) { + return APIResponse.newErrorResponse(null, + Error.builder().withCode(Constants.ErrorCodes.INVALID_INPUT).withMessage("Invalid input").build()); + } + + try { + return vxQueryService.execute(queryRequest); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "Error occurred when trying to execute query : " + queryRequest.getStatement(), e); + return APIResponse.newErrorResponse(queryRequest.getRequestId(), Error.builder() + .withCode(Constants.ErrorCodes.UNFORSEEN_PROBLEM).withMessage(e.getMessage()).build()); + } + } + + private QueryRequest getQueryRequest(IServletRequest request) throws IOException, JAXBException { + if (request.getParameter(STATEMENT) == null || request.getParameter(STATEMENT).trim().isEmpty()) { + throw new IllegalArgumentException("Parameter 'statement' is required to handle the request"); + } + + QueryRequest queryRequest = new QueryRequest(UUID.randomUUID().toString(), request.getParameter(STATEMENT)); + queryRequest.setCompileOnly(Boolean.parseBoolean(request.getParameter(COMPILE_ONLY))); + queryRequest.setShowMetrics(Boolean.parseBoolean(request.getParameter(METRICS))); + + queryRequest.setShowAbstractSyntaxTree(Boolean.parseBoolean(request.getParameter(SHOW_AST))); + queryRequest.setShowTranslatedExpressionTree(Boolean.parseBoolean(request.getParameter(SHOW_TET))); + queryRequest.setShowOptimizedExpressionTree(Boolean.parseBoolean(request.getParameter(SHOW_OET))); + queryRequest.setShowRuntimePlan(Boolean.parseBoolean(request.getParameter(SHOW_RP))); + + if (request.getParameter(OPTIMIZATION) != null) { + queryRequest.setOptimization(Integer.parseInt(request.getParameter(OPTIMIZATION))); + } + if (request.getParameter(FRAME_SIZE) != null) { + queryRequest.setFrameSize(Integer.parseInt(request.getParameter(FRAME_SIZE))); + } + if (request.getParameter(REPEAT_EXECUTIONS) != null) { + queryRequest.setRepeatExecutions(Integer.parseInt(request.getParameter(REPEAT_EXECUTIONS))); + } + + String sourceFileMap = request.getHttpRequest().content().toString(StandardCharsets.UTF_8); + if (sourceFileMap != null && !sourceFileMap.isEmpty()) { + Map<String, String> map = (Map<String, String>) RestUtils.mapEntity(sourceFileMap, Map.class, + request.getHeader(CONTENT_TYPE)); + LOGGER.log(Level.FINE, "Found source file map"); + Map<String, File> fileMap = map.entrySet().stream() + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new File(entry.getValue()))) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + queryRequest.setSourceFileMap(fileMap); + } + + if (request.getParameter(MODE) != null) { + switch (request.getParameter(MODE)) { + case MODE_SYNC: + queryRequest.setAsync(false); + break; + case MODE_ASYNC: + default: + queryRequest.setAsync(true); + break; + } + } + + return queryRequest; + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryResultAPIServlet.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryResultAPIServlet.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryResultAPIServlet.java new file mode 100644 index 0000000..84b0187 --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/QueryResultAPIServlet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.servlet; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; + +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.vxquery.rest.Constants; +import org.apache.vxquery.rest.request.QueryResultRequest; +import org.apache.vxquery.rest.response.APIResponse; +import org.apache.vxquery.rest.response.Error; +import org.apache.vxquery.rest.service.VXQueryService; + +import io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Servlet to handle query results requests. + * + * @author Erandi Ganepola + */ +public class QueryResultAPIServlet extends RestAPIServlet { + + private VXQueryService vxQueryService; + + public QueryResultAPIServlet(VXQueryService vxQueryService, ConcurrentMap<String, Object> ctx, String... paths) { + super(ctx, paths); + this.vxQueryService = vxQueryService; + } + + @Override + protected APIResponse doHandle(IServletRequest request) { + String uri = request.getHttpRequest().uri(); + long resultId; + try { + String pathParam = uri.substring(uri.lastIndexOf("/") + 1); + pathParam = pathParam.contains("?") ? pathParam.split("\\?")[0] : pathParam; + resultId = Long.parseLong(pathParam); + } catch (NumberFormatException e) { + LOGGER.log(Level.SEVERE, "Result ID could not be retrieved from URL"); + return APIResponse.newErrorResponse(null, Error.builder().withCode(HttpResponseStatus.BAD_REQUEST.code()) + .withMessage("Result ID couldn't be retrieved from URL").build()); + } + + QueryResultRequest resultRequest = new QueryResultRequest(resultId, UUID.randomUUID().toString()); + resultRequest.setShowMetrics(Boolean.parseBoolean(request.getParameter(Constants.Parameters.METRICS))); + LOGGER.log(Level.INFO, + String.format("Received a result request with resultId : %d", resultRequest.getResultId())); + return vxQueryService.getResult(resultRequest); + } +} http://git-wip-us.apache.org/repos/asf/vxquery/blob/f2e5fd90/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/RestAPIServlet.java ---------------------------------------------------------------------- diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/RestAPIServlet.java b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/RestAPIServlet.java new file mode 100644 index 0000000..bc93dfc --- /dev/null +++ b/vxquery-rest/src/main/java/org/apache/vxquery/rest/servlet/RestAPIServlet.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.vxquery.rest.servlet; + +import static org.apache.vxquery.rest.Constants.HttpHeaderValues.CONTENT_TYPE_JSON; +import static org.apache.vxquery.rest.Constants.HttpHeaderValues.CONTENT_TYPE_XML; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; + +import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.vxquery.exceptions.VXQueryRuntimeException; +import org.apache.vxquery.exceptions.VXQueryServletRuntimeException; +import org.apache.vxquery.rest.response.APIResponse; +import org.apache.vxquery.rest.response.AsyncQueryResponse; +import org.apache.vxquery.rest.response.ErrorResponse; +import org.apache.vxquery.rest.response.QueryResultResponse; +import org.apache.vxquery.rest.response.SyncQueryResponse; +import org.apache.vxquery.rest.service.Status; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Abstract servlet to handle REST API requests. + * + * @author Erandi Ganepola + */ +public abstract class RestAPIServlet extends AbstractServlet { + + protected final Logger LOGGER; + + private JAXBContext jaxbContext; + + public RestAPIServlet(ConcurrentMap<String, Object> ctx, String... paths) { + super(ctx, paths); + LOGGER = Logger.getLogger(this.getClass().getName()); + try { + jaxbContext = JAXBContext.newInstance(QueryResultResponse.class, AsyncQueryResponse.class, + SyncQueryResponse.class, ErrorResponse.class); + } catch (JAXBException e) { + LOGGER.log(Level.SEVERE, "Error occurred when creating JAXB context", e); + throw new VXQueryRuntimeException("Unable to load JAXBContext", e); + } + } + + @Override + protected final void post(IServletRequest request, IServletResponse response) { + getOrPost(request, response); + } + + @Override + protected final void get(IServletRequest request, IServletResponse response) { + getOrPost(request, response); + } + + private void getOrPost(IServletRequest request, IServletResponse response) { + try { + initResponse(request, response); + APIResponse entity = doHandle(request); + if (entity == null) { + LOGGER.log(Level.WARNING, "No entity found for request : " + request); + response.setStatus(HttpResponseStatus.BAD_REQUEST); + } else { + // Important to set Status OK before setting the entity because the response + // (chunked) checks it before + // writing the response to channel. + setResponseStatus(response, entity); + setEntity(request, response, entity); + } + } catch (IOException e) { + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + LOGGER.log(Level.SEVERE, "Error occurred when setting content type", e); + } + } + + private void initResponse(IServletRequest request, IServletResponse response) throws IOException { + // enable cross-origin resource sharing + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept"); + + HttpUtil.setContentType(response, "text/plain"); + } + + private void setEntity(IServletRequest request, IServletResponse response, APIResponse entity) throws IOException { + String accept = request.getHeader(HttpHeaderNames.ACCEPT, ""); + String entityString; + switch (accept) { + case CONTENT_TYPE_XML: + try { + HttpUtil.setContentType(response, CONTENT_TYPE_XML); + + Marshaller jaxbMarshaller = jaxbContext.createMarshaller(); + jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + StringWriter sw = new StringWriter(); + jaxbMarshaller.marshal(entity, sw); + entityString = sw.toString(); + } catch (JAXBException e) { + LOGGER.log(Level.SEVERE, "Error occurred when mapping java object into xml", e); + throw new VXQueryServletRuntimeException("Error occurred when marshalling entity", e); + } + break; + case CONTENT_TYPE_JSON: + default: + try { + HttpUtil.setContentType(response, CONTENT_TYPE_JSON); + ObjectMapper jsonMapper = new ObjectMapper(); + entityString = jsonMapper.writeValueAsString(entity); + } catch (JsonProcessingException e) { + LOGGER.log(Level.SEVERE, "Error occurred when mapping java object into JSON", e); + throw new VXQueryServletRuntimeException("Error occurred when mapping entity", e); + } + break; + } + + response.writer().print(entityString); + } + + private void setResponseStatus(IServletResponse response, APIResponse entity) { + if (Status.SUCCESS.toString().equals(entity.getStatus())) { + response.setStatus(HttpResponseStatus.OK); + } else if (Status.FATAL.toString().equals(entity.getStatus())) { + HttpResponseStatus status = HttpResponseStatus.INTERNAL_SERVER_ERROR; + if (entity instanceof ErrorResponse) { + status = HttpResponseStatus.valueOf(((ErrorResponse) entity).getError().getCode()); + } + response.setStatus(status); + } + } + + /** + * This abstract method is supposed to return an object which will be the entity + * of the response being sent to the client. Implementing classes doesn't have + * to worry about the content type of the request. + * + * @param request + * {@link IServletRequest} received + * @return Object to be set as the entity of the response + */ + protected abstract APIResponse doHandle(IServletRequest request); +}
