This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8d6c5c7d77 [ASTERIXDB-3343][API] Refactor and track other requests
8d6c5c7d77 is described below
commit 8d6c5c7d779bbbb71b531569a5377de37dbb6d80
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue May 14 02:36:32 2024 +0300
[ASTERIXDB-3343][API] Refactor and track other requests
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Refactor code and track other requests:
INSERT/UPSERT/DELETE/COPY TO/COPY FROM and queries.
Change-Id: Iddd26895f0eb6b8008c3512025180ec620a2ca98
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18286
Reviewed-by: Ali Alsuliman <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../asterix/app/translator/QueryTranslator.java | 42 +++++++++++-----------
1 file changed, 22 insertions(+), 20 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2ef36571c2..340cd57d81 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4007,11 +4007,10 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME,
new GlobalTxInfo(participatingDatasetIds,
numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, spec, jobFlags, false);
+ String reqId =
requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, spec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4152,8 +4151,9 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
throw e;
}
};
+ String reqId = reqParams.getRequestReference().getUuid();
IRequestTracker requestTracker = appCtx.getRequestTracker();
- ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqParams.getRequestReference().getUuid());
+ ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats,
reqParams, true, stmt, clientRequest);
@@ -4179,8 +4179,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
reqParams.getClientContextId(), clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4248,11 +4247,11 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ String reqId =
requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker =
appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
- clientRequest.setJobId(jobId);
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(),
+ clientRequest);
clientRequest.markCancellable();
String nameBefore = Thread.currentThread().getName();
try {
@@ -4280,6 +4279,15 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
+ private static JobId runTrackJob(IHyracksClientConnection hcc,
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
+ String reqId, String clientCtxId, ClientRequest clientRequest)
throws Exception {
+ jobSpec.setRequestId(reqId);
+ JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId,
reqId, clientCtxId);
+ clientRequest.setJobId(jobId);
+ return jobId;
+ }
+
@Override
public JobSpecification rewriteCompileQuery(IClusterInfoCollector
clusterInfoCollector,
MetadataProvider metadataProvider, Query query,
ICompiledDmlStatement stmt,
@@ -5393,9 +5401,9 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IResultPrinter printer,
IRequestParameters requestParameters, boolean cancellable,
ICcApplicationContext appCtx,
MetadataProvider metadataProvider, Statement atomicStatement)
throws Exception {
+ String reqId = requestParameters.getRequestReference().getUuid();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
+ final ClientRequest clientRequest = (ClientRequest)
requestTracker.get(reqId);
if (cancellable) {
clientRequest.markCancellable();
}
@@ -5412,7 +5420,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
- jobSpec.setRequestId(clientRequest.getId());
if (atomicStatement != null) {
Dataset ds = metadataProvider.findDataset(((InsertStatement)
atomicStatement).getDatabaseName(),
((InsertStatement) atomicStatement).getDataverseName(),
@@ -5430,12 +5437,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
participatingDatasetIds, numParticipatingNodes,
numParticipatingPartitions));
}
}
- jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Created job {} for query uuid:{},
clientContextID:{}", jobId,
- requestParameters.getRequestReference().getUuid(),
requestParameters.getClientContextId());
- }
- clientRequest.setJobId(jobId);
+ jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId,
requestParameters.getClientContextId(), clientRequest);
if (jId != null) {
jId.setValue(jobId);
}