This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 0da78d6f840836af98004ad129fcb10b6d166e72
Author: Ali Alsuliman <[email protected]>
AuthorDate: Thu Apr 22 18:29:22 2021 +0300

    [NO ISSUE][OTH] Add request parameter to allow for immediate execution
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Add a request parameter to allow certain queries to
    run immediately.
    
    Change-Id: I60208c07200326a4957e908d8e7c9dcdb7bc3204
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11145
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
---
 .../asterix/translator/IRequestParameters.java     |  2 ++
 .../asterix/translator/IStatementExecutor.java     |  6 +++++-
 .../apache/asterix/api/common/APIFramework.java    | 24 +++++++++++++---------
 .../message/ExecuteStatementRequestMessage.java    | 20 +++++++++++++++---
 .../asterix/app/translator/QueryTranslator.java    | 16 +++++++--------
 .../asterix/app/translator/RequestParameters.java  | 17 +++++++++++++++
 .../org/apache/asterix/utils/FeedOperations.java   |  2 +-
 7 files changed, 64 insertions(+), 23 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 4ad1040..5c743ee 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -73,4 +73,6 @@ public interface IRequestParameters extends 
ICommonRequestParameters {
      * inconsistent.
      */
     boolean isForceDropDataset();
+
+    boolean isSkipAdmissionPolicy();
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 083fa83..d60b791 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -273,6 +273,9 @@ public interface IStatementExecutor {
      * @param statementParameters
      *            Statement parameters
      * @param statementRewriter
+     *            The statement rewriter
+     * @param requestParameters
+     *            The request parameters
      * @return the compiled {@code JobSpecification}
      * @throws AsterixException
      * @throws RemoteException
@@ -281,7 +284,8 @@ public interface IStatementExecutor {
      */
     JobSpecification rewriteCompileQuery(IClusterInfoCollector 
clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, ICompiledDmlStatement dmlStatement, Map<String, 
IAObject> statementParameters,
-            IStatementRewriter statementRewriter) throws RemoteException, 
AlgebricksException, ACIDException;
+            IStatementRewriter statementRewriter, IRequestParameters 
requestParameters)
+            throws RemoteException, AlgebricksException, ACIDException;
 
     /**
      * returns the active dataverse for an entity or a statement
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b3c7f54..920b0d9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -77,6 +77,7 @@ import 
org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.ExecutionPlans;
+import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.SqlppExpressionToPlanTranslator;
@@ -205,7 +206,8 @@ public class APIFramework {
     public JobSpecification compileQuery(IClusterInfoCollector 
clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, int varCounter, String outputDatasetName, 
SessionOutput output,
             ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> 
externalVars, IResponsePrinter printer,
-            IWarningCollector warningCollector) throws AlgebricksException, 
ACIDException {
+            IWarningCollector warningCollector, IRequestParameters 
requestParameters)
+            throws AlgebricksException, ACIDException {
 
         // establish facts
         final boolean isQuery = query != null;
@@ -307,15 +309,17 @@ public class APIFramework {
         JobSpecification spec = 
compiler.createJob(metadataProvider.getApplicationContext(), 
jobEventListenerFactory);
 
         if (isQuery) {
-            // Sets a required capacity, only for read-only queries.
-            // DDLs and DMLs are considered not that frequent.
-            // limit the computation locations to the locations that will be 
used in the query
-            final INodeJobTracker nodeJobTracker = 
metadataProvider.getApplicationContext().getNodeJobTracker();
-            final AlgebricksAbsolutePartitionConstraint jobLocations =
-                    getJobLocations(spec, nodeJobTracker, 
computationLocations);
-            final IClusterCapacity jobRequiredCapacity =
-                    ResourceUtils.getRequiredCapacity(plan, jobLocations, 
physOptConf);
-            spec.setRequiredClusterCapacity(jobRequiredCapacity);
+            if (requestParameters == null || 
!requestParameters.isSkipAdmissionPolicy()) {
+                // Sets a required capacity, only for read-only queries.
+                // DDLs and DMLs are considered not that frequent.
+                // limit the computation locations to the locations that will 
be used in the query
+                final INodeJobTracker nodeJobTracker = 
metadataProvider.getApplicationContext().getNodeJobTracker();
+                final AlgebricksAbsolutePartitionConstraint jobLocations =
+                        getJobLocations(spec, nodeJobTracker, 
computationLocations);
+                final IClusterCapacity jobRequiredCapacity =
+                        ResourceUtils.getRequiredCapacity(plan, jobLocations, 
physOptConf);
+                spec.setRequiredClusterCapacity(jobRequiredCapacity);
+            }
         }
         if (isQuery && conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
             generateJob(spec);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index d3345fb..29ee76d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -87,12 +87,24 @@ public class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
     private final ProfileType profileType;
     private final IRequestReference requestReference;
     private final boolean forceDropDataset;
+    private final boolean skipAdmissionPolicy;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
             String statementsText, SessionConfig sessionConfig, 
ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> 
optionalParameters,
             Map<String, byte[]> statementParameters, boolean multiStatement, 
ProfileType profileType,
             int statementCategoryRestrictionMask, IRequestReference 
requestReference, boolean forceDropDataset) {
+        this(requestNodeId, requestMessageId, lang, statementsText, 
sessionConfig, resultProperties, clientContextID,
+                handleUrl, optionalParameters, statementParameters, 
multiStatement, profileType,
+                statementCategoryRestrictionMask, requestReference, 
forceDropDataset, false);
+    }
+
+    protected ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
+            String statementsText, SessionConfig sessionConfig, 
ResultProperties resultProperties,
+            String clientContextID, String handleUrl, Map<String, String> 
optionalParameters,
+            Map<String, byte[]> statementParameters, boolean multiStatement, 
ProfileType profileType,
+            int statementCategoryRestrictionMask, IRequestReference 
requestReference, boolean forceDropDataset,
+            boolean skipAdmissionPolicy) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
@@ -108,6 +120,7 @@ public class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
         this.profileType = profileType;
         this.requestReference = requestReference;
         this.forceDropDataset = forceDropDataset;
+        this.skipAdmissionPolicy = skipAdmissionPolicy;
     }
 
     @Override
@@ -150,9 +163,10 @@ public class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
             final IStatementExecutor.StatementProperties statementProperties =
                     new IStatementExecutor.StatementProperties();
             Map<String, IAObject> stmtParams = 
RequestParameters.deserializeParameterValues(statementParameters);
-            final IRequestParameters requestParameters = new 
RequestParameters(requestReference, statementsText, null,
-                    resultProperties, stats, statementProperties, outMetadata, 
clientContextID, optionalParameters,
-                    stmtParams, multiStatement, 
statementCategoryRestrictionMask, forceDropDataset);
+            final IRequestParameters requestParameters =
+                    new RequestParameters(requestReference, statementsText, 
null, resultProperties, stats,
+                            statementProperties, outMetadata, clientContextID, 
optionalParameters, stmtParams,
+                            multiStatement, statementCategoryRestrictionMask, 
forceDropDataset, skipAdmissionPolicy);
             translator.compileAndExecute(ccApp.getHcc(), requestParameters);
             translator.getWarnings(warnings, maxWarnings - warnings.size());
             stats.updateTotalWarningsCount(parserTotalWarningsCount);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d49649d..754e2b6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2781,7 +2781,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     loadStmt.getDatasetName(), loadStmt.getAdapter(), 
properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionOutput, cls,
-                    null, responsePrinter, warningCollector);
+                    null, responsePrinter, warningCollector, null);
             afterCompile();
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2880,8 +2880,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             CompiledDeleteStatement clfrqs = new 
CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                     datasetName, stmtDelete.getCondition(), 
stmtDelete.getVarCounter(), stmtDelete.getQuery());
             clfrqs.setSourceLocation(stmt.getSourceLocation());
-            JobSpecification jobSpec =
-                    rewriteCompileQuery(hcc, metadataProvider, 
clfrqs.getQuery(), clfrqs, stmtParams, stmtRewriter);
+            JobSpecification jobSpec = rewriteCompileQuery(hcc, 
metadataProvider, clfrqs.getQuery(), clfrqs, stmtParams,
+                    stmtRewriter, null);
             afterCompile();
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2904,7 +2904,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     @Override
     public JobSpecification rewriteCompileQuery(IClusterInfoCollector 
clusterInfoCollector,
             MetadataProvider metadataProvider, Query query, 
ICompiledDmlStatement stmt,
-            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
+            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter, 
IRequestParameters requestParameters)
             throws AlgebricksException, ACIDException {
 
         Map<VarIdentifier, IAObject> externalVars = 
createExternalVariables(stmtParams, stmtRewriter);
@@ -2916,7 +2916,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         // Query Compilation (happens under the same ongoing metadata 
transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, (Query) rewrittenResult.first,
                 rewrittenResult.second, stmt == null ? null : 
stmt.getDatasetName(), sessionOutput, stmt, externalVars,
-                responsePrinter, warningCollector);
+                responsePrinter, warningCollector, requestParameters);
     }
 
     private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector 
clusterInfoCollector,
@@ -2955,7 +2955,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         // transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, rewrittenInsertUpsert.getQuery(),
                 rewrittenResult.second, datasetName, sessionOutput, clfrqs, 
externalVars, responsePrinter,
-                warningCollector);
+                warningCollector, null);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
@@ -3419,8 +3419,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             boolean bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             try {
-                final JobSpecification jobSpec =
-                        rewriteCompileQuery(hcc, metadataProvider, query, 
null, stmtParams, stmtRewriter);
+                final JobSpecification jobSpec = rewriteCompileQuery(hcc, 
metadataProvider, query, null, stmtParams,
+                        stmtRewriter, requestParameters);
                 // update stats with count of compile-time warnings. needs to 
be adapted for multi-statement.
                 
stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
                 afterCompile();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index d5ea685..6c8f21c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -59,6 +59,7 @@ public class RequestParameters implements IRequestParameters {
     private final int statementCategoryRestrictionMask;
     private final String statement;
     private final boolean forceDropDataset;
+    private final boolean skipAdmissionPolicy;
 
     public RequestParameters(IRequestReference requestReference, String 
statement, IResultSet resultSet,
             ResultProperties resultProperties, Stats stats, 
StatementProperties statementProperties,
@@ -83,6 +84,16 @@ public class RequestParameters implements IRequestParameters 
{
             IStatementExecutor.ResultMetadata outMetadata, String 
clientContextId,
             Map<String, String> optionalParameters, Map<String, IAObject> 
statementParameters, boolean multiStatement,
             int statementCategoryRestrictionMask, boolean forceDropDataset) {
+        this(requestReference, statement, resultSet, resultProperties, stats, 
statementProperties, outMetadata,
+                clientContextId, optionalParameters, statementParameters, 
multiStatement,
+                statementCategoryRestrictionMask, forceDropDataset, false);
+    }
+
+    public RequestParameters(IRequestReference requestReference, String 
statement, IResultSet resultSet,
+            ResultProperties resultProperties, Stats stats, 
StatementProperties statementProperties,
+            IStatementExecutor.ResultMetadata outMetadata, String 
clientContextId,
+            Map<String, String> optionalParameters, Map<String, IAObject> 
statementParameters, boolean multiStatement,
+            int statementCategoryRestrictionMask, boolean forceDropDataset, 
boolean skipAdmissionPolicy) {
         this.requestReference = requestReference;
         this.statement = statement;
         this.resultSet = resultSet;
@@ -96,6 +107,7 @@ public class RequestParameters implements IRequestParameters 
{
         this.multiStatement = multiStatement;
         this.statementCategoryRestrictionMask = 
statementCategoryRestrictionMask;
         this.forceDropDataset = forceDropDataset;
+        this.skipAdmissionPolicy = skipAdmissionPolicy;
     }
 
     @Override
@@ -149,6 +161,11 @@ public class RequestParameters implements 
IRequestParameters {
     }
 
     @Override
+    public boolean isSkipAdmissionPolicy() {
+        return skipAdmissionPolicy;
+    }
+
+    @Override
     public Map<String, IAObject> getStatementParameters() {
         return statementParameters;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0..9472da5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -253,7 +253,7 @@ public class FeedOperations {
             clfrqs = new 
CompiledStatements.CompiledUpsertStatement(feedConn.getDataverseName(),
                     feedConn.getDatasetName(), feedConnQuery, 
stmtUpsert.getVarCounter(), null, null);
         }
-        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, 
feedConnQuery, clfrqs, null, null);
+        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, 
feedConnQuery, clfrqs, null, null, null);
     }
 
     private static JobSpecification combineIntakeCollectJobs(MetadataProvider 
metadataProvider, Feed feed,

Reply via email to