Repository: asterixdb Updated Branches: refs/heads/master 6e30eed22 -> ed5fc8172
[ASTERIXDB-2165] Avoid OOM in QueryServiceServlet - user model changes: no - storage format changes: no - interface change: no Change-Id: I74f61941f2e75e10f2accd6b2e6be6c1c0cd1490 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2150 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ed5fc817 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ed5fc817 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ed5fc817 Branch: refs/heads/master Commit: ed5fc81724d31abbc92eeb60ded2518b91544ad0 Parents: 6e30eed Author: Till Westmann <[email protected]> Authored: Fri Nov 17 09:37:13 2017 -0800 Committer: Till Westmann <[email protected]> Committed: Fri Nov 17 16:23:23 2017 -0800 ---------------------------------------------------------------------- .../asterix/translator/SessionOutput.java | 29 ++++++++++++++- .../api/http/server/NCQueryServiceServlet.java | 2 + .../api/http/server/QueryServiceServlet.java | 39 +++++++++----------- .../asterix/app/translator/QueryTranslator.java | 2 + 4 files changed, 50 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed5fc817/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java index b559df8..f7031a4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java @@ -20,6 +20,7 @@ package org.apache.asterix.translator; import java.io.PrintWriter; +import java.io.StringWriter; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; @@ -29,6 +30,8 @@ public class SessionOutput { // Output path for primary execution. private final PrintWriter out; + private StringWriter buffer; + private PrintWriter bufferedOut; private final SessionOutput.ResultDecorator preResultDecorator; private final SessionOutput.ResultDecorator postResultDecorator; @@ -53,7 +56,31 @@ public class SessionOutput { * Retrieve the PrintWriter to produce output to. */ public PrintWriter out() { - return this.out; + return this.bufferedOut != null ? this.bufferedOut : this.out; + } + + /** + * buffer the data provided to the PrintWriter returned by out() to be able to set the status of the response + * message when it can be determined. This is a no-op, if data is already buffered. + */ + public void hold() { + if (this.bufferedOut == null) { + this.buffer = new StringWriter(); + this.bufferedOut = new PrintWriter(this.buffer); + } + } + + /** + * release the data that was buffered by calling hold() and remove the buffer from the pipeline. + * This is a no-op, if data is not buffered. + */ + public void release() { + if (this.bufferedOut != null) { + this.bufferedOut.flush(); + this.out.write(buffer.toString()); + this.bufferedOut = null; + this.buffer = null; + } } public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed5fc817/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 616c22e..64ea73d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -116,6 +116,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet { throw new Exception(err.toString(), err); } } + // no errors - stop buffering and allow for streaming result delivery + sessionOutput.release(); IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata(); if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed5fc817/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index f8f5c18..42c9edd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -381,25 +381,25 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { RequestParameters param = getRequestParameters(request); LOGGER.info(param.toString()); long elapsedStart = System.nanoTime(); - final StringWriter stringWriter = new StringWriter(); - final PrintWriter resultWriter = new PrintWriter(stringWriter); + final PrintWriter httpWriter = response.writer(); ResultDelivery delivery = parseResultDelivery(param.mode); String handleUrl = getHandleUrl(param.host, param.path, delivery); - SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter); + SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter); SessionConfig sessionConfig = sessionOutput.config(); HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); - HttpResponseStatus status = HttpResponseStatus.OK; Stats stats = new Stats(); long[] execStartEnd = new long[] { -1, -1 }; - resultWriter.print("{\n"); - printRequestId(resultWriter); - printClientContextID(resultWriter, param); - printSignature(resultWriter); - printType(resultWriter, sessionConfig); + // buffer the output until we are ready to set the status of the response message correctly + sessionOutput.hold(); + sessionOutput.out().print("{\n"); + printRequestId(sessionOutput.out()); + printClientContextID(sessionOutput.out(), param); + printSignature(sessionOutput.out()); + printType(sessionOutput.out(), sessionConfig); long errorCount = 1; // so far we just return 1 error try { if (param.statement == null || param.statement.isEmpty()) { @@ -410,33 +410,30 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { if (optionalParamProvider != null) { optionalParams = optionalParamProvider.apply(request); } + response.setStatus(HttpResponseStatus.OK); executeStatement(statementsText, sessionOutput, delivery, stats, param, execStartEnd, optionalParams); if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) { ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS); } errorCount = 0; } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) { - status = handleExecuteStatementException(e); - ResultUtil.printError(resultWriter, e); + response.setStatus(handleExecuteStatementException(e)); + ResultUtil.printError(sessionOutput.out(), e); ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); } finally { + // make sure that we stop buffering and return the result to the http response + sessionOutput.release(); if (execStartEnd[0] == -1) { execStartEnd[1] = -1; } else if (execStartEnd[1] == -1) { execStartEnd[1] = System.nanoTime(); } } - printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], + printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount); - resultWriter.print("}\n"); - resultWriter.flush(); - String result = stringWriter.toString(); - - GlobalConfig.ASTERIX_LOGGER.log(Level.FINE, result); - - response.setStatus(status); - response.writer().print(result); - if (response.writer().checkError()) { + sessionOutput.out().print("}\n"); + sessionOutput.out().flush(); + if (sessionOutput.out().checkError()) { LOGGER.warning("Error flushing output writer"); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed5fc817/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index a52d765..05debaa 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -2395,6 +2395,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); updateJobStats(id, stats); + // stop buffering and allow for streaming result delivery + sessionOutput.release(); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx);
