This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac83276dbce [Prism] Connect Web UI cancel requests with backend 
(#31028)
ac83276dbce is described below

commit ac83276dbce9d27ad4795cb850582006b109f920
Author: Damon <[email protected]>
AuthorDate: Thu May 2 10:06:03 2024 -0700

    [Prism] Connect Web UI cancel requests with backend (#31028)
    
    * Connect Web UI requests with backend
    
    * Remove artificial setting of cancelled state
    
    * Only lock when acquiring job
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go |   2 +-
 .../prism/internal/jobservices/management_test.go  |  42 +++++---
 .../prism/internal/jobservices/server_test.go      |   8 +-
 .../prism/internal/web/assets/job-action.js        | 120 ++++++++++++++++++++-
 .../runners/prism/internal/web/jobdetails.html     |   2 +-
 sdks/go/pkg/beam/runners/prism/internal/web/web.go |  21 +++-
 6 files changed, 169 insertions(+), 26 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 504125a2bd6..b218d84b891 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) {
        j.SendMsg("running " + j.String())
        j.Running()
 
-       if err := executePipeline(j.RootCtx, wks, j); err != nil {
+       if err := executePipeline(j.RootCtx, wks, j); err != nil && 
!errors.Is(err, jobservices.ErrCancel) {
                j.Failed(err)
                return
        }
diff --git 
a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
index 176abb8543a..5aad58b4a86 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
@@ -46,9 +46,13 @@ func TestServer(t *testing.T) {
 
        cmpOpts := []cmp.Option{protocmp.Transform(), cmpopts.EquateEmpty()}
        tests := []struct {
-               name                       string
+               name         string
+               postRunState jobpb.JobState_Enum
+               // noJobsCheck tests in the setting that the Job doesn't exist
+               // postPrepCheck tests after Server Prepare invoked
                noJobsCheck, postPrepCheck func(context.Context, *testing.T, 
*Server)
-               postRunCheck               func(context.Context, *testing.T, 
*Server, string)
+               // postRunCheck tests after Server Run invoked
+               postRunCheck func(context.Context, *testing.T, *Server, string)
        }{
                {
                        name: "GetJobs",
@@ -170,36 +174,38 @@ func TestServer(t *testing.T) {
                        },
                },
                {
-                       name: "Canceling",
+                       name:         "Canceling",
+                       postRunState: jobpb.JobState_RUNNING,
                        noJobsCheck: func(ctx context.Context, t *testing.T, 
undertest *Server) {
-                               resp, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: "job-001"})
-                               if resp != nil {
-                                       t.Errorf("Canceling(\"job-001\") = %s, 
want nil", resp)
-                               }
+                               id := "job-001"
+                               _, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: id})
+                               // Cancel currently returns nil, nil when Job 
not found
                                if err != nil {
-                                       t.Errorf("Canceling(\"job-001\") = %v, 
want nil", err)
+                                       t.Errorf("Cancel(%q) = %v, want not 
found error", id, err)
                                }
                        },
                        postPrepCheck: func(ctx context.Context, t *testing.T, 
undertest *Server) {
-                               resp, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: "job-001"})
+                               id := "job-001"
+                               resp, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: id})
                                if err != nil {
-                                       t.Errorf("Canceling(\"job-001\") = %v, 
want nil", err)
+                                       t.Errorf("Cancel(%q) = %v, want not 
found error", id, err)
                                }
                                if diff := cmp.Diff(&jobpb.CancelJobResponse{
                                        State: jobpb.JobState_CANCELLING,
                                }, resp, cmpOpts...); diff != "" {
-                                       t.Errorf("Canceling(\"job-001\") 
(-want, +got):\n%v", diff)
+                                       t.Errorf("Cancel(%q) (-want, 
+got):\n%s", id, diff)
                                }
                        },
                        postRunCheck: func(ctx context.Context, t *testing.T, 
undertest *Server, jobID string) {
-                               resp, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: jobID})
+                               id := "job-001"
+                               resp, err := undertest.Cancel(ctx, 
&jobpb.CancelJobRequest{JobId: id})
                                if err != nil {
-                                       t.Errorf("Canceling(\"%s\") = %v, want 
nil", jobID, err)
+                                       t.Errorf("Cancel(%q) = %v, want not 
found error", id, err)
                                }
                                if diff := cmp.Diff(&jobpb.CancelJobResponse{
-                                       State: jobpb.JobState_DONE,
+                                       State: jobpb.JobState_CANCELLING,
                                }, resp, cmpOpts...); diff != "" {
-                                       t.Errorf("Canceling(\"%s\") (-want, 
+got):\n%v", jobID, diff)
+                                       t.Errorf("Cancel(%q) (-want, 
+got):\n%s", id, diff)
                                }
                        },
                },
@@ -230,7 +236,11 @@ func TestServer(t *testing.T) {
                                        shortIDSize:  sizeData,
                                },
                        })
-                       j.state.Store(jobpb.JobState_DONE)
+                       state := jobpb.JobState_DONE
+                       if test.postRunState != jobpb.JobState_UNSPECIFIED {
+                               state = test.postRunState
+                       }
+                       j.state.Store(state)
                        called.Done()
                })
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
index 473c84f958e..fb72048d478 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -84,9 +84,12 @@ func TestServer_RunThenCancel(t *testing.T) {
        var called sync.WaitGroup
        called.Add(1)
        undertest := NewServer(0, func(j *Job) {
+               defer called.Done()
+               j.state.Store(jobpb.JobState_RUNNING)
                if errors.Is(context.Cause(j.RootCtx), ErrCancel) {
-                       j.state.Store(jobpb.JobState_CANCELLED)
-                       called.Done()
+                       j.SendMsg("pipeline canceled " + j.String())
+                       j.Canceled()
+                       return
                }
        })
        ctx := context.Background()
@@ -121,6 +124,7 @@ func TestServer_RunThenCancel(t *testing.T) {
        cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{
                JobId: runResp.GetJobId(),
        })
+
        if err != nil {
                t.Fatalf("server.Canceling() = %v, want nil", err)
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js 
b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
index 999fd22bbf8..4f2e51a7c02 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
@@ -12,6 +12,12 @@
  limitations under the License.
  */
 
+/**
+ * job-action.js provides UI functionality for taking actions on Prism Jobs 
via:
+ * - jobManager:        Client for Job Management.
+ * - uiStateProvider:   Encapsulates UI state of user interactive elements on 
the page.
+ */
+
 /** Element class for job action container. */
 const JOB_ACTION = '.job-action'
 
@@ -21,6 +27,24 @@ const CANCEL = '.cancel'
 /** Element class assigned to RUNNING Job state. */
 const RUNNING = 'RUNNING'
 
+/** Element class for elements reporting Job state details. */
+const JOB_STATE = '.job-state'
+
+/** PATH holds consts that map to backend endpoints. */
+const PATH = {
+
+    /** ROOT_ is the Job management path prefix for mapped backend endpoints. 
*/
+    ROOT_: '/job',
+
+    /** CANCEL maps to the backend endpoint to cancel a Job. Terminates with 
'/' to prevent ServeMux 301 redirect. */
+    get CANCEL() {
+        return `${this.ROOT_}/cancel/`
+    }
+}
+
+/** HTTP related consts. */
+const HTTP_POST = 'POST'
+
 /**
  * Client for Job Management.
  *
@@ -34,9 +58,28 @@ const jobManager = {
      * @param jobId
      * TODO(https://github.com/apache/beam/issues/29669) Send request to 
backend service.
      */
-    cancel: function(jobId) {
+    cancel: function (jobId) {
         console.debug(`cancel button for Job: ${jobId} clicked`)
-    }
+        const path = PATH.CANCEL
+        const request = {
+            method: HTTP_POST,
+            body: JSON.stringify(new CancelJobRequest(jobId))
+        }
+        fetch(path, request)
+            .then(response => {
+                const requestJson = JSON.stringify(request)
+                const responseJson = JSON.stringify(response)
+                if (response.ok) {
+                    console.debug(`Job cancellation request to ${path} of 
${requestJson} for Job: ${jobId} sent successfully, response: ${responseJson}`)
+                    uiStateProvider.onJobCancel(response)
+                } else {
+                    console.error(`Failed to send job cancellation request to 
${path} of ${requestJson} for Job: ${jobId}, response: ${responseJson}`)
+                }
+            })
+            .catch(error => {
+                console.error(`Error occurred while sending job cancellation 
request for Job: ${jobId}`, error)
+            })
+    },
 }
 
 /**
@@ -84,14 +127,85 @@ const uiStateProvider = {
      */
     get isStateRunning() {
         return this.jobAction.classList.contains(RUNNING)
+    },
+
+    /**
+     * Queries the element containing the {@link JOB_STATE} class.
+     * @return {Element}
+     */
+    get jobStateElement() {
+        let element = document.querySelector(JOB_STATE)
+        if (element === null) {
+            console.error(`no element found at ${JOB_STATE}`)
+        }
+        return element
+    },
+
+    /**
+     * Callback for successful Job Cancel requests.
+     * @param response {Response}
+     */
+    onJobCancel(response) {
+        response.json().then(json => {
+            console.debug(`job cancel response json: ${JSON.stringify(json)}`)
+            uiStateProvider.jobStateElement.textContent = 
JobState_Enum[json.state]
+        })
+            .catch(error => {
+                console.error(`error Response.json() ${error}`)
+            })
     }
 }
 
 /**
  * Attaches an event listener to the window for 'load' events.
  */
-window.addEventListener("load", function(){
+window.addEventListener("load", function () {
     console.debug(JOB_ACTION, uiStateProvider.jobAction)
     console.debug(CANCEL, uiStateProvider.cancelButton)
     uiStateProvider.init()
 })
+
+/**
+ * CancelJobRequest models a request to cancel a Job.
+ *
+ * Models after its proto namesake in:
+ * 
https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
+ */
+class CancelJobRequest {
+    job_id_;
+
+    constructor(jobId) {
+        this.job_id_ = jobId
+    }
+
+    /**
+     * The ID of the Job to cancel.
+     * @return {string}
+     */
+    get job_id() {
+        return this.job_id_
+    }
+
+    /** toJSON overrides JSON.stringify serialization behavior. */
+    toJSON() {
+        return {job_id: this.job_id}
+    }
+}
+
+/** Maps JobState_Enum from Job Management server response to the Job State 
name. See proto for more details:
+ * 
https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
+ */
+const JobState_Enum = {
+    0: "UNSPECIFIED",
+    1: "STOPPED",
+    2: "RUNNING",
+    3: "DONE",
+    4: "FAILED",
+    5: "CANCELLED",
+    6: "UPDATED",
+    7: "DRAINING",
+    8: "DRAINED",
+    9: "STARTING",
+    10: "CANCELLING",
+    11: "UPDATING",
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html 
b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
index ff87f677e2d..955132a03f4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
@@ -33,7 +33,7 @@ limitations under the License. See accompanying LICENSE file.
                         onclick="if (jobManager !== null) { 
jobManager.cancel('{{.JobID}}') }"
                 >Cancel</button>
             </div>
-            <div>{{.State}}</div>
+            <div class="job-state">{{.State}}</div>
         </header>
         <section class="container">
             {{ if .Error}}<div class="child">{{.Error}}</div>{{end}}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web.go 
b/sdks/go/pkg/beam/runners/prism/internal/web/web.go
index 33c28209753..baa14428aaa 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/web.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/web.go
@@ -27,6 +27,7 @@ import (
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
        "html/template"
+       "io"
        "net/http"
        "sort"
        "strings"
@@ -374,8 +375,22 @@ type jobCancelHandler struct {
 
 func (h *jobCancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        var cancelRequest *jobpb.CancelJobRequest
-       if err := json.NewDecoder(r.Body).Decode(&cancelRequest); err != nil {
-               err = fmt.Errorf("error parsing JSON of request: %w", err)
+       if r.Method != http.MethodPost {
+               http.Error(w, http.StatusText(http.StatusMethodNotAllowed), 
http.StatusMethodNotAllowed)
+               return
+       }
+       body, err := io.ReadAll(r.Body)
+       if err != nil {
+               err = fmt.Errorf("could not read request body: %w", err)
+               http.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+       if len(body) == 0 {
+               http.Error(w, "empty request body", http.StatusBadRequest)
+               return
+       }
+       if err := json.Unmarshal(body, &cancelRequest); err != nil {
+               err = fmt.Errorf("error parsing JSON: %s of request: %w", body, 
err)
                http.Error(w, err.Error(), http.StatusBadRequest)
                return
        }
@@ -405,10 +420,10 @@ func Initialize(ctx context.Context, port int, jobcli 
jobpb.JobServiceClient) er
        mux := http.NewServeMux()
 
        mux.Handle("/assets/", assetsFs)
+       mux.Handle("/job/cancel/", &jobCancelHandler{Jobcli: jobcli})
        mux.Handle("/job/", &jobDetailsHandler{Jobcli: jobcli})
        mux.Handle("/debugz", &debugzHandler{})
        mux.Handle("/", &jobsConsoleHandler{Jobcli: jobcli})
-       mux.Handle("/job/cancel", &jobCancelHandler{Jobcli: jobcli})
 
        endpoint := fmt.Sprintf("localhost:%d", port)
 

Reply via email to