This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 47d9ff4583a18c5814968eb1e7d2d8006774945a
Author: Ali Alsuliman <[email protected]>
AuthorDate: Mon May 13 13:53:20 2024 +0300

    [ASTERIXDB-3343][API] Add redact param to redact active/completed requests
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Add "redact" param to redact active/completed requests.
    - Make sure a completed request info is updated even if
      the request was cancelled.
    
    Change-Id: Ied406f4d803c5ca717e7ed6280550a3e96142fe4
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/translator/BaseClientRequest.java      |  9 ++++
 .../apache/asterix/translator/ClientRequest.java   | 25 +++++++----
 .../api/http/server/AbstractRequestsServlet.java   | 12 +++++-
 .../active_requests.2.pollquery.sqlpp              |  2 +-
 .../completed_requests.4.query.sqlpp}              |  8 ++--
 .../misc/active_requests/active_requests.2.regex   |  2 +-
 .../completed_requests.4.regexjson                 | 19 +++++++++
 .../apache/asterix/common/api/IClientRequest.java  |  5 +++
 .../asterix/runtime/utils/RequestTracker.java      | 48 +++++++++++++++-------
 9 files changed, 100 insertions(+), 30 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 25fcbf59e4..0fafc47fbd 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -94,6 +94,15 @@ public abstract class BaseClientRequest implements 
IClientRequest {
 
     @Override
     public ObjectNode asJson() {
+        return putJson();
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        return putJson();
+    }
+
+    private ObjectNode putJson() {
         ObjectNode json = JSONUtil.createObject();
         json.put("uuid", requestReference.getUuid());
         json.put("requestTime", new 
ADateTime(requestReference.getTime()).toSimpleString());
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 9f2cbdd9d1..31f197968d 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -95,11 +96,21 @@ public class ClientRequest extends BaseClientRequest {
     @Override
     public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        putJobDetails(json);
-        json.put("statement", statement);
+        return asJson(json, false);
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        ObjectNode json = super.asRedactedJson();
+        return asJson(json, true);
+    }
+
+    private ObjectNode asJson(ObjectNode json, boolean redact) {
+        putJobDetails(json, redact);
+        json.put("statement", redact ? LogRedactionUtil.statement(statement) : 
statement);
         json.put("clientContextID", clientContextId);
         if (plan != null) {
-            json.put("plan", plan);
+            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
         }
         return json;
     }
@@ -132,16 +143,16 @@ public class ClientRequest extends BaseClientRequest {
         return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private void putJobDetails(ObjectNode json) {
+    private void putJobDetails(ObjectNode json, boolean redact) {
         try {
             json.put("jobId", jobId != null ? jobId.toString() : null);
-            putJobState(json, jobState);
+            putJobState(json, jobState, redact);
         } catch (Throwable th) {
             // ignore
         }
     }
 
-    private static void putJobState(ObjectNode json, JobState state) {
+    private static void putJobState(ObjectNode json, JobState state, boolean 
redact) {
         AMutableDateTime dateTime = new AMutableDateTime(0);
         putTime(json, state.createTime, "jobCreateTime", dateTime);
         putTime(json, state.startTime, "jobStartTime", dateTime);
@@ -152,7 +163,7 @@ public class ClientRequest extends BaseClientRequest {
         json.put("jobRequiredCPUs", state.requiredCPUs);
         json.put("jobRequiredMemory", state.requiredMemoryInBytes);
         if (state.errorMsg != null) {
-            json.put("error", state.errorMsg);
+            json.put("error", redact ? 
LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
         }
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
index 285c4c837b..b6c6a713fd 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -35,6 +35,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 
 public abstract class AbstractRequestsServlet extends AbstractServlet {
 
+    public static final String REDACT_PARAM = "redact";
     protected final ICcApplicationContext appCtx;
 
     public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, 
ICcApplicationContext appCtx, String... paths) {
@@ -46,8 +47,15 @@ public abstract class AbstractRequestsServlet extends 
AbstractServlet {
     protected void get(IServletRequest request, IServletResponse response) 
throws Exception {
         ArrayNode requestsJson = JSONUtil.createArray();
         Collection<IClientRequest> requests = getRequests();
-        for (IClientRequest req : requests) {
-            requestsJson.add(req.asJson());
+        String redact = request.getParameter(REDACT_PARAM);
+        if (Boolean.parseBoolean(redact)) {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asRedactedJson());
+            }
+        } else {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asJson());
+            }
         }
         HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, request);
         response.setStatus(HttpResponseStatus.OK);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e6b1..07f8eea144 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
  */
  -- param client_context_id=ensure_running_query
  -- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM 
active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus 
FROM active_requests() rqst
 WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
similarity index 78%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
index 7eee42e6b1..de35939f09 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
- -- param client_context_id=ensure_running_query
- -- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM 
active_requests() rqst
-WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = 
"completed_requests_query")[0];
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b3c6..170838e5bf 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" 
\}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", 
"jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000000..15affd15a9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+    "cancellable": true,
+    "clientContextID": "completed_requests_query",
+    "elapsedTime": "R{.*}",
+    "jobCreateTime": "R{.*}",
+    "jobEndTime": "R{.*}",
+    "jobId": "R{.*}",
+    "jobQueueTime": "R{.*}",
+    "jobRequiredCPUs": "R{.*}",
+    "jobRequiredMemory": "R{.*}",
+    "jobStartTime": "R{.*}",
+    "jobStatus": "TERMINATED",
+    "node": "R{.*}",
+    "remoteAddr": "R{.*}",
+    "requestTime": "R{.*}",
+    "state": "completed",
+    "userAgent": "R{.*}",
+    "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 8d2c91aeaa..ee518dd664 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -103,6 +103,11 @@ public interface IClientRequest {
      */
     ObjectNode asJson();
 
+    /**
+     * @return A redacted json node representation of this request
+     */
+    ObjectNode asRedactedJson();
+
     /**
      * Called when the job is created.
      *
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 5771201c98..9875651817 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.utils;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
@@ -42,14 +41,22 @@ public class RequestTracker implements IRequestTracker {
 
     private final Map<String, IClientRequest> runningRequests = new 
ConcurrentHashMap<>();
     private final Map<String, IClientRequest> clientIdRequests = new 
ConcurrentHashMap<>();
-    private final CircularFifoQueue<IClientRequest> completedRequests;
+    private final Map<String, IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
     private final AtomicLong numOfFailedRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
-        completedRequests = new 
CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+        int archiveSize = 
ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+        completedRequests = new LinkedHashMap<>(archiveSize) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, 
IClientRequest> eldest) {
+                return size() > archiveSize;
+            }
+        };
         numRequests = new AtomicLong(0);
         numOfFailedRequests = new AtomicLong(0);
     }
@@ -102,7 +109,7 @@ public class RequestTracker implements IRequestTracker {
 
     @Override
     public synchronized Collection<IClientRequest> getCompletedRequests() {
-        return Collections.unmodifiableCollection(new 
ArrayList<>(completedRequests));
+        return Collections.unmodifiableCollection(completedRequests.values());
     }
 
     private void cancel(IClientRequest request) throws HyracksDataException {
@@ -122,7 +129,7 @@ public class RequestTracker implements IRequestTracker {
     }
 
     private synchronized void archive(IClientRequest request) {
-        completedRequests.add(request);
+        completedRequests.put(request.getId(), request);
     }
 
     public long getTotalNumberOfRequests() {
@@ -139,13 +146,14 @@ public class RequestTracker implements IRequestTracker {
         return numOfFailedRequests.get();
     }
 
+    @Override
     public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobCreated(jobId, 
spec.getRequiredClusterCapacity(), status);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobCreated(jobId, spec.getRequiredClusterCapacity(), 
status);
             }
         }
     }
@@ -154,9 +162,9 @@ public class RequestTracker implements IRequestTracker {
     public void notifyJobStart(JobId jobId, JobSpecification spec) throws 
HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobStarted(jobId);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobStarted(jobId);
             }
         }
     }
@@ -166,10 +174,20 @@ public class RequestTracker implements IRequestTracker {
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobFinished(jobId, jobStatus, exceptions);
             }
         }
     }
+
+    private IClientRequest getRequest(String requestId) {
+        IClientRequest clientRequest = runningRequests.get(requestId);
+        if (clientRequest != null) {
+            return clientRequest;
+        }
+        synchronized (this) {
+            return completedRequests.get(requestId);
+        }
+    }
 }

Reply via email to