This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac83276dbce [Prism] Connect Web UI cancel requests with backend
(#31028)
ac83276dbce is described below
commit ac83276dbce9d27ad4795cb850582006b109f920
Author: Damon <[email protected]>
AuthorDate: Thu May 2 10:06:03 2024 -0700
[Prism] Connect Web UI cancel requests with backend (#31028)
* Connect Web UI requests with backend
* Remove artificial setting of cancelled state
* Only lock when acquiring job
---
sdks/go/pkg/beam/runners/prism/internal/execute.go | 2 +-
.../prism/internal/jobservices/management_test.go | 42 +++++---
.../prism/internal/jobservices/server_test.go | 8 +-
.../prism/internal/web/assets/job-action.js | 120 ++++++++++++++++++++-
.../runners/prism/internal/web/jobdetails.html | 2 +-
sdks/go/pkg/beam/runners/prism/internal/web/web.go | 21 +++-
6 files changed, 169 insertions(+), 26 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 504125a2bd6..b218d84b891 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()
- if err := executePipeline(j.RootCtx, wks, j); err != nil {
+ if err := executePipeline(j.RootCtx, wks, j); err != nil &&
!errors.Is(err, jobservices.ErrCancel) {
j.Failed(err)
return
}
diff --git
a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
index 176abb8543a..5aad58b4a86 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go
@@ -46,9 +46,13 @@ func TestServer(t *testing.T) {
cmpOpts := []cmp.Option{protocmp.Transform(), cmpopts.EquateEmpty()}
tests := []struct {
- name string
+ name string
+ postRunState jobpb.JobState_Enum
+ // noJobsCheck tests in the setting that the Job doesn't exist
+ // postPrepCheck tests after Server Prepare invoked
noJobsCheck, postPrepCheck func(context.Context, *testing.T,
*Server)
- postRunCheck func(context.Context, *testing.T,
*Server, string)
+ // postRunCheck tests after Server Run invoked
+ postRunCheck func(context.Context, *testing.T, *Server, string)
}{
{
name: "GetJobs",
@@ -170,36 +174,38 @@ func TestServer(t *testing.T) {
},
},
{
- name: "Canceling",
+ name: "Canceling",
+ postRunState: jobpb.JobState_RUNNING,
noJobsCheck: func(ctx context.Context, t *testing.T,
undertest *Server) {
- resp, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: "job-001"})
- if resp != nil {
- t.Errorf("Canceling(\"job-001\") = %s,
want nil", resp)
- }
+ id := "job-001"
+ _, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: id})
+ // Cancel currently returns nil, nil when Job
not found
if err != nil {
- t.Errorf("Canceling(\"job-001\") = %v,
want nil", err)
+ t.Errorf("Cancel(%q) = %v, want not
found error", id, err)
}
},
postPrepCheck: func(ctx context.Context, t *testing.T,
undertest *Server) {
- resp, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: "job-001"})
+ id := "job-001"
+ resp, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: id})
if err != nil {
- t.Errorf("Canceling(\"job-001\") = %v,
want nil", err)
+ t.Errorf("Cancel(%q) = %v, want not
found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
- t.Errorf("Canceling(\"job-001\")
(-want, +got):\n%v", diff)
+ t.Errorf("Cancel(%q) (-want,
+got):\n%s", id, diff)
}
},
postRunCheck: func(ctx context.Context, t *testing.T,
undertest *Server, jobID string) {
- resp, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: jobID})
+ id := "job-001"
+ resp, err := undertest.Cancel(ctx,
&jobpb.CancelJobRequest{JobId: id})
if err != nil {
- t.Errorf("Canceling(\"%s\") = %v, want
nil", jobID, err)
+ t.Errorf("Cancel(%q) = %v, want not
found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
- State: jobpb.JobState_DONE,
+ State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
- t.Errorf("Canceling(\"%s\") (-want,
+got):\n%v", jobID, diff)
+ t.Errorf("Cancel(%q) (-want,
+got):\n%s", id, diff)
}
},
},
@@ -230,7 +236,11 @@ func TestServer(t *testing.T) {
shortIDSize: sizeData,
},
})
- j.state.Store(jobpb.JobState_DONE)
+ state := jobpb.JobState_DONE
+ if test.postRunState != jobpb.JobState_UNSPECIFIED {
+ state = test.postRunState
+ }
+ j.state.Store(state)
called.Done()
})
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
index 473c84f958e..fb72048d478 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go
@@ -84,9 +84,12 @@ func TestServer_RunThenCancel(t *testing.T) {
var called sync.WaitGroup
called.Add(1)
undertest := NewServer(0, func(j *Job) {
+ defer called.Done()
+ j.state.Store(jobpb.JobState_RUNNING)
if errors.Is(context.Cause(j.RootCtx), ErrCancel) {
- j.state.Store(jobpb.JobState_CANCELLED)
- called.Done()
+ j.SendMsg("pipeline canceled " + j.String())
+ j.Canceled()
+ return
}
})
ctx := context.Background()
@@ -121,6 +124,7 @@ func TestServer_RunThenCancel(t *testing.T) {
cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{
JobId: runResp.GetJobId(),
})
+
if err != nil {
t.Fatalf("server.Canceling() = %v, want nil", err)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
index 999fd22bbf8..4f2e51a7c02 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
@@ -12,6 +12,12 @@
limitations under the License.
*/
+/**
+ * job-action.js provides UI functionality for taking actions on Prism Jobs
via:
+ * - jobManager: Client for Job Management.
+ * - uiStateProvider: Encapsulates UI state of user interactive elements on
the page.
+ */
+
/** Element class for job action container. */
const JOB_ACTION = '.job-action'
@@ -21,6 +27,24 @@ const CANCEL = '.cancel'
/** Element class assigned to RUNNING Job state. */
const RUNNING = 'RUNNING'
+/** Element class for elements reporting Job state details. */
+const JOB_STATE = '.job-state'
+
+/** PATH holds consts that map to backend endpoints. */
+const PATH = {
+
+ /** ROOT_ is the Job management path prefix for mapped backend endpoints.
*/
+ ROOT_: '/job',
+
+ /** CANCEL maps to the backend endpoint to cancel a Job. Terminates with
'/' to prevent ServeMux 301 redirect. */
+ get CANCEL() {
+ return `${this.ROOT_}/cancel/`
+ }
+}
+
+/** HTTP related consts. */
+const HTTP_POST = 'POST'
+
/**
* Client for Job Management.
*
@@ -34,9 +58,28 @@ const jobManager = {
* @param jobId
* TODO(https://github.com/apache/beam/issues/29669) Send request to
backend service.
*/
- cancel: function(jobId) {
+ cancel: function (jobId) {
console.debug(`cancel button for Job: ${jobId} clicked`)
- }
+ const path = PATH.CANCEL
+ const request = {
+ method: HTTP_POST,
+ body: JSON.stringify(new CancelJobRequest(jobId))
+ }
+ fetch(path, request)
+ .then(response => {
+ const requestJson = JSON.stringify(request)
+ const responseJson = JSON.stringify(response)
+ if (response.ok) {
+ console.debug(`Job cancellation request to ${path} of
${requestJson} for Job: ${jobId} sent successfully, response: ${responseJson}`)
+ uiStateProvider.onJobCancel(response)
+ } else {
+ console.error(`Failed to send job cancellation request to
${path} of ${requestJson} for Job: ${jobId}, response: ${responseJson}`)
+ }
+ })
+ .catch(error => {
+ console.error(`Error occurred while sending job cancellation
request for Job: ${jobId}`, error)
+ })
+ },
}
/**
@@ -84,14 +127,85 @@ const uiStateProvider = {
*/
get isStateRunning() {
return this.jobAction.classList.contains(RUNNING)
+ },
+
+ /**
+ * Queries the element containing the {@link JOB_STATE} class.
+ * @return {Element}
+ */
+ get jobStateElement() {
+ let element = document.querySelector(JOB_STATE)
+ if (element === null) {
+ console.error(`no element found at ${JOB_STATE}`)
+ }
+ return element
+ },
+
+ /**
+ * Callback for successful Job Cancel requests.
+ * @param response {Response}
+ */
+ onJobCancel(response) {
+ response.json().then(json => {
+ console.debug(`job cancel response json: ${JSON.stringify(json)}`)
+ uiStateProvider.jobStateElement.textContent =
JobState_Enum[json.state]
+ })
+ .catch(error => {
+ console.error(`error Response.json() ${error}`)
+ })
}
}
/**
* Attaches an event listener to the window for 'load' events.
*/
-window.addEventListener("load", function(){
+window.addEventListener("load", function () {
console.debug(JOB_ACTION, uiStateProvider.jobAction)
console.debug(CANCEL, uiStateProvider.cancelButton)
uiStateProvider.init()
})
+
+/**
+ * CancelJobRequest models a request to cancel a Job.
+ *
+ * Models after its proto namesake in:
+ *
https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
+ */
+class CancelJobRequest {
+ job_id_;
+
+ constructor(jobId) {
+ this.job_id_ = jobId
+ }
+
+ /**
+ * The ID of the Job to cancel.
+ * @return {string}
+ */
+ get job_id() {
+ return this.job_id_
+ }
+
+ /** toJSON overrides JSON.stringify serialization behavior. */
+ toJSON() {
+ return {job_id: this.job_id}
+ }
+}
+
+/** Maps JobState_Enum from Job Management server response to the Job State
name. See proto for more details:
+ *
https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
+ */
+const JobState_Enum = {
+ 0: "UNSPECIFIED",
+ 1: "STOPPED",
+ 2: "RUNNING",
+ 3: "DONE",
+ 4: "FAILED",
+ 5: "CANCELLED",
+ 6: "UPDATED",
+ 7: "DRAINING",
+ 8: "DRAINED",
+ 9: "STARTING",
+ 10: "CANCELLING",
+ 11: "UPDATING",
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
index ff87f677e2d..955132a03f4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html
@@ -33,7 +33,7 @@ limitations under the License. See accompanying LICENSE file.
onclick="if (jobManager !== null) {
jobManager.cancel('{{.JobID}}') }"
>Cancel</button>
</div>
- <div>{{.State}}</div>
+ <div class="job-state">{{.State}}</div>
</header>
<section class="container">
{{ if .Error}}<div class="child">{{.Error}}</div>{{end}}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web.go
b/sdks/go/pkg/beam/runners/prism/internal/web/web.go
index 33c28209753..baa14428aaa 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/web/web.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/web/web.go
@@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"html/template"
+ "io"
"net/http"
"sort"
"strings"
@@ -374,8 +375,22 @@ type jobCancelHandler struct {
func (h *jobCancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var cancelRequest *jobpb.CancelJobRequest
- if err := json.NewDecoder(r.Body).Decode(&cancelRequest); err != nil {
- err = fmt.Errorf("error parsing JSON of request: %w", err)
+ if r.Method != http.MethodPost {
+ http.Error(w, http.StatusText(http.StatusMethodNotAllowed),
http.StatusMethodNotAllowed)
+ return
+ }
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ err = fmt.Errorf("could not read request body: %w", err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ if len(body) == 0 {
+ http.Error(w, "empty request body", http.StatusBadRequest)
+ return
+ }
+ if err := json.Unmarshal(body, &cancelRequest); err != nil {
+ err = fmt.Errorf("error parsing JSON: %s of request: %w", body,
err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@@ -405,10 +420,10 @@ func Initialize(ctx context.Context, port int, jobcli
jobpb.JobServiceClient) er
mux := http.NewServeMux()
mux.Handle("/assets/", assetsFs)
+ mux.Handle("/job/cancel/", &jobCancelHandler{Jobcli: jobcli})
mux.Handle("/job/", &jobDetailsHandler{Jobcli: jobcli})
mux.Handle("/debugz", &debugzHandler{})
mux.Handle("/", &jobsConsoleHandler{Jobcli: jobcli})
- mux.Handle("/job/cancel", &jobCancelHandler{Jobcli: jobcli})
endpoint := fmt.Sprintf("localhost:%d", port)