This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d6c5c7d77 [ASTERIXDB-3343][API] Refactor and track other requests
8d6c5c7d77 is described below

commit 8d6c5c7d779bbbb71b531569a5377de37dbb6d80
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue May 14 02:36:32 2024 +0300

    [ASTERIXDB-3343][API] Refactor and track other requests
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Refactor code and track other requests:
    INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries.
    
    Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/app/translator/QueryTranslator.java    | 42 +++++++++++-----------
 1 file changed, 22 insertions(+), 20 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2ef36571c2..340cd57d81 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4007,11 +4007,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, 
new GlobalTxInfo(participatingDatasetIds,
                             numParticipatingNodes, 
numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+                String reqId = 
requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
-                final ClientRequest clientRequest =
-                        (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
-                clientRequest.setJobId(jobId);
+                final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
+                jobId = runTrackJob(hcc, spec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4152,8 +4151,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw e;
             }
         };
+        String reqId = reqParams.getRequestReference().getUuid();
         IRequestTracker requestTracker = appCtx.getRequestTracker();
-        ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqParams.getRequestReference().getUuid());
+        ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
                     reqParams, true, stmt, clientRequest);
@@ -4179,8 +4179,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
                             participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-                clientRequest.setJobId(jobId);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
reqParams.getClientContextId(), clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4248,11 +4247,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
                             participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
                 }
-                jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+                String reqId = 
requestParameters.getRequestReference().getUuid();
                 final IRequestTracker requestTracker = 
appCtx.getRequestTracker();
-                final ClientRequest clientRequest =
-                        (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
-                clientRequest.setJobId(jobId);
+                final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
+                jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(),
+                        clientRequest);
                 clientRequest.markCancellable();
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -4280,6 +4279,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
+    private static JobId runTrackJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
+            String reqId, String clientCtxId, ClientRequest clientRequest) 
throws Exception {
+        jobSpec.setRequestId(reqId);
+        JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+        LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, 
reqId, clientCtxId);
+        clientRequest.setJobId(jobId);
+        return jobId;
+    }
+
     @Override
     public JobSpecification rewriteCompileQuery(IClusterInfoCollector 
clusterInfoCollector,
             MetadataProvider metadataProvider, Query query, 
ICompiledDmlStatement stmt,
@@ -5393,9 +5401,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
             IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx,
             MetadataProvider metadataProvider, Statement atomicStatement) 
throws Exception {
+        String reqId = requestParameters.getRequestReference().getUuid();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
-        final ClientRequest clientRequest =
-                (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
+        final ClientRequest clientRequest = (ClientRequest) 
requestTracker.get(reqId);
         if (cancellable) {
             clientRequest.markCancellable();
         }
@@ -5412,7 +5420,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
-            jobSpec.setRequestId(clientRequest.getId());
             if (atomicStatement != null) {
                 Dataset ds = metadataProvider.findDataset(((InsertStatement) 
atomicStatement).getDatabaseName(),
                         ((InsertStatement) atomicStatement).getDataverseName(),
@@ -5430,12 +5437,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                             participatingDatasetIds, numParticipatingNodes, 
numParticipatingPartitions));
                 }
             }
-            jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("Created job {} for query uuid:{}, 
clientContextID:{}", jobId,
-                        requestParameters.getRequestReference().getUuid(), 
requestParameters.getClientContextId());
-            }
-            clientRequest.setJobId(jobId);
+            jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, 
requestParameters.getClientContextId(), clientRequest);
             if (jId != null) {
                 jId.setValue(jobId);
             }

Reply via email to