This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 0da78d6f840836af98004ad129fcb10b6d166e72 Author: Ali Alsuliman <[email protected]> AuthorDate: Thu Apr 22 18:29:22 2021 +0300 [NO ISSUE][OTH] Add request parameter to allow for immediate execution - user model changes: no - storage format changes: no - interface changes: yes Details: Add a request parameter to allow certain queries to run immediately. Change-Id: I60208c07200326a4957e908d8e7c9dcdb7bc3204 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11145 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../asterix/translator/IRequestParameters.java | 2 ++ .../asterix/translator/IStatementExecutor.java | 6 +++++- .../apache/asterix/api/common/APIFramework.java | 24 +++++++++++++--------- .../message/ExecuteStatementRequestMessage.java | 20 +++++++++++++++--- .../asterix/app/translator/QueryTranslator.java | 16 +++++++-------- .../asterix/app/translator/RequestParameters.java | 17 +++++++++++++++ .../org/apache/asterix/utils/FeedOperations.java | 2 +- 7 files changed, 64 insertions(+), 23 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java index 4ad1040..5c743ee 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java @@ -73,4 +73,6 @@ public interface IRequestParameters extends ICommonRequestParameters { * inconsistent. */ boolean isForceDropDataset(); + + boolean isSkipAdmissionPolicy(); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 083fa83..d60b791 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -273,6 +273,9 @@ public interface IStatementExecutor { * @param statementParameters * Statement parameters * @param statementRewriter + * The statement rewriter + * @param requestParameters + * The request parameters * @return the compiled {@code JobSpecification} * @throws AsterixException * @throws RemoteException @@ -281,7 +284,8 @@ public interface IStatementExecutor { */ JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, Query query, ICompiledDmlStatement dmlStatement, Map<String, IAObject> statementParameters, - IStatementRewriter statementRewriter) throws RemoteException, AlgebricksException, ACIDException; + IStatementRewriter statementRewriter, IRequestParameters requestParameters) + throws RemoteException, AlgebricksException, ACIDException; /** * returns the active dataverse for an entity or a statement diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index b3c7f54..920b0d9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -77,6 +77,7 @@ import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.ExecutionPlans; +import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.asterix.translator.SqlppExpressionToPlanTranslator; @@ -205,7 +206,8 @@ public class APIFramework { public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, - IWarningCollector warningCollector) throws AlgebricksException, ACIDException { + IWarningCollector warningCollector, IRequestParameters requestParameters) + throws AlgebricksException, ACIDException { // establish facts final boolean isQuery = query != null; @@ -307,15 +309,17 @@ public class APIFramework { JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory); if (isQuery) { - // Sets a required capacity, only for read-only queries. - // DDLs and DMLs are considered not that frequent. - // limit the computation locations to the locations that will be used in the query - final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker(); - final AlgebricksAbsolutePartitionConstraint jobLocations = - getJobLocations(spec, nodeJobTracker, computationLocations); - final IClusterCapacity jobRequiredCapacity = - ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf); - spec.setRequiredClusterCapacity(jobRequiredCapacity); + if (requestParameters == null || !requestParameters.isSkipAdmissionPolicy()) { + // Sets a required capacity, only for read-only queries. + // DDLs and DMLs are considered not that frequent. + // limit the computation locations to the locations that will be used in the query + final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker(); + final AlgebricksAbsolutePartitionConstraint jobLocations = + getJobLocations(spec, nodeJobTracker, computationLocations); + final IClusterCapacity jobRequiredCapacity = + ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf); + spec.setRequiredClusterCapacity(jobRequiredCapacity); + } } if (isQuery && conf.is(SessionConfig.OOB_HYRACKS_JOB)) { generateJob(spec); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index d3345fb..29ee76d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -87,12 +87,24 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { private final ProfileType profileType; private final IRequestReference requestReference; private final boolean forceDropDataset; + private final boolean skipAdmissionPolicy; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties, String clientContextID, String handleUrl, Map<String, String> optionalParameters, Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType, int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset) { + this(requestNodeId, requestMessageId, lang, statementsText, sessionConfig, resultProperties, clientContextID, + handleUrl, optionalParameters, statementParameters, multiStatement, profileType, + statementCategoryRestrictionMask, requestReference, forceDropDataset, false); + } + + protected ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, + String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties, + String clientContextID, String handleUrl, Map<String, String> optionalParameters, + Map<String, byte[]> statementParameters, boolean multiStatement, ProfileType profileType, + int statementCategoryRestrictionMask, IRequestReference requestReference, boolean forceDropDataset, + boolean skipAdmissionPolicy) { this.requestNodeId = requestNodeId; this.requestMessageId = requestMessageId; this.lang = lang; @@ -108,6 +120,7 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { this.profileType = profileType; this.requestReference = requestReference; this.forceDropDataset = forceDropDataset; + this.skipAdmissionPolicy = skipAdmissionPolicy; } @Override @@ -150,9 +163,10 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { final IStatementExecutor.StatementProperties statementProperties = new IStatementExecutor.StatementProperties(); Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters); - final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null, - resultProperties, stats, statementProperties, outMetadata, clientContextID, optionalParameters, - stmtParams, multiStatement, statementCategoryRestrictionMask, forceDropDataset); + final IRequestParameters requestParameters = + new RequestParameters(requestReference, statementsText, null, resultProperties, stats, + statementProperties, outMetadata, clientContextID, optionalParameters, stmtParams, + multiStatement, statementCategoryRestrictionMask, forceDropDataset, skipAdmissionPolicy); translator.compileAndExecute(ccApp.getHcc(), requestParameters); translator.getWarnings(warnings, maxWarnings - warnings.size()); stats.updateTotalWarningsCount(parserTotalWarningsCount); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index d49649d..754e2b6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -2781,7 +2781,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted()); cls.setSourceLocation(stmt.getSourceLocation()); JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, - null, responsePrinter, warningCollector); + null, responsePrinter, warningCollector, null); afterCompile(); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -2880,8 +2880,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, datasetName, stmtDelete.getCondition(), stmtDelete.getVarCounter(), stmtDelete.getQuery()); clfrqs.setSourceLocation(stmt.getSourceLocation()); - JobSpecification jobSpec = - rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams, stmtRewriter); + JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams, + stmtRewriter, null); afterCompile(); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -2904,7 +2904,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen @Override public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt, - Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) + Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter, IRequestParameters requestParameters) throws AlgebricksException, ACIDException { Map<VarIdentifier, IAObject> externalVars = createExternalVariables(stmtParams, stmtRewriter); @@ -2916,7 +2916,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Query Compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars, - responsePrinter, warningCollector); + responsePrinter, warningCollector, requestParameters); } private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector, @@ -2955,7 +2955,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(), rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter, - warningCollector); + warningCollector, null); } protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { @@ -3419,8 +3419,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); try { - final JobSpecification jobSpec = - rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams, stmtRewriter); + final JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null, stmtParams, + stmtRewriter, requestParameters); // update stats with count of compile-time warnings. needs to be adapted for multi-statement. stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount()); afterCompile(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java index d5ea685..6c8f21c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java @@ -59,6 +59,7 @@ public class RequestParameters implements IRequestParameters { private final int statementCategoryRestrictionMask; private final String statement; private final boolean forceDropDataset; + private final boolean skipAdmissionPolicy; public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet, ResultProperties resultProperties, Stats stats, StatementProperties statementProperties, @@ -83,6 +84,16 @@ public class RequestParameters implements IRequestParameters { IStatementExecutor.ResultMetadata outMetadata, String clientContextId, Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement, int statementCategoryRestrictionMask, boolean forceDropDataset) { + this(requestReference, statement, resultSet, resultProperties, stats, statementProperties, outMetadata, + clientContextId, optionalParameters, statementParameters, multiStatement, + statementCategoryRestrictionMask, forceDropDataset, false); + } + + public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet, + ResultProperties resultProperties, Stats stats, StatementProperties statementProperties, + IStatementExecutor.ResultMetadata outMetadata, String clientContextId, + Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement, + int statementCategoryRestrictionMask, boolean forceDropDataset, boolean skipAdmissionPolicy) { this.requestReference = requestReference; this.statement = statement; this.resultSet = resultSet; @@ -96,6 +107,7 @@ public class RequestParameters implements IRequestParameters { this.multiStatement = multiStatement; this.statementCategoryRestrictionMask = statementCategoryRestrictionMask; this.forceDropDataset = forceDropDataset; + this.skipAdmissionPolicy = skipAdmissionPolicy; } @Override @@ -149,6 +161,11 @@ public class RequestParameters implements IRequestParameters { } @Override + public boolean isSkipAdmissionPolicy() { + return skipAdmissionPolicy; + } + + @Override public Map<String, IAObject> getStatementParameters() { return statementParameters; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index dcd52a0..9472da5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -253,7 +253,7 @@ public class FeedOperations { clfrqs = new CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(), feedConn.getDatasetName(), feedConnQuery, stmtUpsert.getVarCounter(), null, null); } - return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null); + return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, feedConnQuery, clfrqs, null, null, null); } private static JobSpecification combineIntakeCollectJobs(MetadataProvider metadataProvider, Feed feed,
