This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 47d9ff4583a18c5814968eb1e7d2d8006774945a Author: Ali Alsuliman <[email protected]> AuthorDate: Mon May 13 13:53:20 2024 +0300 [ASTERIXDB-3343][API] Add redact param to redact active/completed requests - user model changes: no - storage format changes: no - interface changes: yes Details: - Add "redact" param to redact active/completed requests. - Make sure a completed request info is updated even if the request was cancelled. Change-Id: Ied406f4d803c5ca717e7ed6280550a3e96142fe4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> --- .../asterix/translator/BaseClientRequest.java | 9 ++++ .../apache/asterix/translator/ClientRequest.java | 25 +++++++---- .../api/http/server/AbstractRequestsServlet.java | 12 +++++- .../active_requests.2.pollquery.sqlpp | 2 +- .../completed_requests.4.query.sqlpp} | 8 ++-- .../misc/active_requests/active_requests.2.regex | 2 +- .../completed_requests.4.regexjson | 19 +++++++++ .../apache/asterix/common/api/IClientRequest.java | 5 +++ .../asterix/runtime/utils/RequestTracker.java | 48 +++++++++++++++------- 9 files changed, 100 insertions(+), 30 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index 25fcbf59e4..0fafc47fbd 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -94,6 +94,15 @@ public abstract class BaseClientRequest implements IClientRequest { @Override public ObjectNode asJson() { + return putJson(); + } + + @Override + public ObjectNode asRedactedJson() { + return putJson(); + } + + private ObjectNode putJson() { ObjectNode json = JSONUtil.createObject(); json.put("uuid", requestReference.getUuid()); json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString()); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index 9f2cbdd9d1..31f197968d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -33,6 +33,7 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.util.LogRedactionUtil; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -95,11 +96,21 @@ public class ClientRequest extends BaseClientRequest { @Override public ObjectNode asJson() { ObjectNode json = super.asJson(); - putJobDetails(json); - json.put("statement", statement); + return asJson(json, false); + } + + @Override + public ObjectNode asRedactedJson() { + ObjectNode json = super.asRedactedJson(); + return asJson(json, true); + } + + private ObjectNode asJson(ObjectNode json, boolean redact) { + putJobDetails(json, redact); + json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement); json.put("clientContextID", clientContextId); if (plan != null) { - json.put("plan", plan); + json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan); } return json; } @@ -132,16 +143,16 @@ public class ClientRequest extends BaseClientRequest { return ExceptionUtils.unwrap(e).getMessage(); } - private void putJobDetails(ObjectNode json) { + private void putJobDetails(ObjectNode json, boolean redact) { try { json.put("jobId", jobId != null ? jobId.toString() : null); - putJobState(json, jobState); + putJobState(json, jobState, redact); } catch (Throwable th) { // ignore } } - private static void putJobState(ObjectNode json, JobState state) { + private static void putJobState(ObjectNode json, JobState state, boolean redact) { AMutableDateTime dateTime = new AMutableDateTime(0); putTime(json, state.createTime, "jobCreateTime", dateTime); putTime(json, state.startTime, "jobStartTime", dateTime); @@ -152,7 +163,7 @@ public class ClientRequest extends BaseClientRequest { json.put("jobRequiredCPUs", state.requiredCPUs); json.put("jobRequiredMemory", state.requiredMemoryInBytes); if (state.errorMsg != null) { - json.put("error", state.errorMsg); + json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java index 285c4c837b..b6c6a713fd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java @@ -35,6 +35,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; public abstract class AbstractRequestsServlet extends AbstractServlet { + public static final String REDACT_PARAM = "redact"; protected final ICcApplicationContext appCtx; public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) { @@ -46,8 +47,15 @@ public abstract class AbstractRequestsServlet extends AbstractServlet { protected void get(IServletRequest request, IServletResponse response) throws Exception { ArrayNode requestsJson = JSONUtil.createArray(); Collection<IClientRequest> requests = getRequests(); - for (IClientRequest req : requests) { - requestsJson.add(req.asJson()); + String redact = request.getParameter(REDACT_PARAM); + if (Boolean.parseBoolean(redact)) { + for (IClientRequest req : requests) { + requestsJson.add(req.asRedactedJson()); + } + } else { + for (IClientRequest req : requests) { + requestsJson.add(req.asJson()); + } } HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request); response.setStatus(HttpResponseStatus.OK); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp index 7eee42e6b1..07f8eea144 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp @@ -18,5 +18,5 @@ */ -- param client_context_id=ensure_running_query -- polltimeoutsecs=15 -SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst +SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp similarity index 78% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp index 7eee42e6b1..de35939f09 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ - -- param client_context_id=ensure_running_query - -- polltimeoutsecs=15 -SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst -WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file + -- param client_context_id=ensure_completed_query + -- param ignoreextrafields=true +SELECT VALUE (SELECT VALUE r FROM completed_requests() r +WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0]; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex index e31fe3b3c6..170838e5bf 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex @@ -1 +1 @@ -/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/ \ No newline at end of file +/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson new file mode 100644 index 0000000000..15affd15a9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson @@ -0,0 +1,19 @@ +{ + "cancellable": true, + "clientContextID": "completed_requests_query", + "elapsedTime": "R{.*}", + "jobCreateTime": "R{.*}", + "jobEndTime": "R{.*}", + "jobId": "R{.*}", + "jobQueueTime": "R{.*}", + "jobRequiredCPUs": "R{.*}", + "jobRequiredMemory": "R{.*}", + "jobStartTime": "R{.*}", + "jobStatus": "TERMINATED", + "node": "R{.*}", + "remoteAddr": "R{.*}", + "requestTime": "R{.*}", + "state": "completed", + "userAgent": "R{.*}", + "uuid": "R{.*}" +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 8d2c91aeaa..ee518dd664 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -103,6 +103,11 @@ public interface IClientRequest { */ ObjectNode asJson(); + /** + * @return A redacted json node representation of this request + */ + ObjectNode asRedactedJson(); + /** * Called when the job is created. * diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index 5771201c98..9875651817 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -18,9 +18,9 @@ */ package org.apache.asterix.runtime.utils; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; @@ -42,14 +41,22 @@ public class RequestTracker implements IRequestTracker { private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>(); private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>(); - private final CircularFifoQueue<IClientRequest> completedRequests; + private final Map<String, IClientRequest> completedRequests; private final ICcApplicationContext ccAppCtx; private final AtomicLong numRequests; private final AtomicLong numOfFailedRequests; public RequestTracker(ICcApplicationContext ccAppCtx) { this.ccAppCtx = ccAppCtx; - completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize()); + int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize(); + completedRequests = new LinkedHashMap<>(archiveSize) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) { + return size() > archiveSize; + } + }; numRequests = new AtomicLong(0); numOfFailedRequests = new AtomicLong(0); } @@ -102,7 +109,7 @@ public class RequestTracker implements IRequestTracker { @Override public synchronized Collection<IClientRequest> getCompletedRequests() { - return Collections.unmodifiableCollection(new ArrayList<>(completedRequests)); + return Collections.unmodifiableCollection(completedRequests.values()); } private void cancel(IClientRequest request) throws HyracksDataException { @@ -122,7 +129,7 @@ public class RequestTracker implements IRequestTracker { } private synchronized void archive(IClientRequest request) { - completedRequests.add(request); + completedRequests.put(request.getId(), request); } public long getTotalNumberOfRequests() { @@ -139,13 +146,14 @@ public class RequestTracker implements IRequestTracker { return numOfFailedRequests.get(); } + @Override public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) throws HyracksException { String requestId = spec.getRequestId(); if (requestId != null) { - IClientRequest clientRequest = runningRequests.get(requestId); - if (clientRequest != null) { - clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status); + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status); } } } @@ -154,9 +162,9 @@ public class RequestTracker implements IRequestTracker { public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { String requestId = spec.getRequestId(); if (requestId != null) { - IClientRequest clientRequest = runningRequests.get(requestId); - if (clientRequest != null) { - clientRequest.jobStarted(jobId); + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobStarted(jobId); } } } @@ -166,10 +174,20 @@ public class RequestTracker implements IRequestTracker { throws HyracksException { String requestId = spec.getRequestId(); if (requestId != null) { - IClientRequest clientRequest = runningRequests.get(requestId); - if (clientRequest != null) { - clientRequest.jobFinished(jobId, jobStatus, exceptions); + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobFinished(jobId, jobStatus, exceptions); } } } + + private IClientRequest getRequest(String requestId) { + IClientRequest clientRequest = runningRequests.get(requestId); + if (clientRequest != null) { + return clientRequest; + } + synchronized (this) { + return completedRequests.get(requestId); + } + } }
