[ 
https://issues.apache.org/jira/browse/BEAM-13110?focusedWorklogId=680232&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-680232
 ]

ASF GitHub Bot logged work on BEAM-13110:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Nov/21 11:17
            Start Date: 11/Nov/21 11:17
    Worklog Time Spent: 10m 
      Work Description: AydarZaynutdinov commented on a change in pull request 
#15926:
URL: https://github.com/apache/beam/pull/15926#discussion_r747414238



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)
+               return false
+       case <-cancelChan:
+               processCancel(ctx, cacheService, pipelineId)
+               return false
+       case ok := <-successChan:
+               var data []byte = nil
+               if dataChan != nil {
+                       temp := <-dataChan
+                       data = temp.([]byte)
+               }
                if !ok {
                        err := <-errorChannel
-                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-                       return
+                       processError(ctx, err, data, pipelineId, cacheService, 
errorCaseStatus)
+                       return false
                }
-               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
+               processSuccess(ctx, data, pipelineId, cacheService, 
successCaseStatus)
        }
+       return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
        logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+       // set to cache pipelineId: cache.Canceled: false to stop cancelCheck() 
method
+       setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was 
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was 
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan 
bool, cacheService cache.Cache) {
+       for {
+               cancel, err := cacheService.GetValue(ctx, pipelineId, 
cache.Canceled)
+               if err != nil {
+                       continue

Review comment:
       I updated this part based on changes with the `timer`.
   So now if the context of the `processCode()` method is done then the 
`cancelCheck()` should stop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 680232)
    Time Spent: 1h  (was: 50m)

> [Playground] Playground pipeline cancelation
> --------------------------------------------
>
>                 Key: BEAM-13110
>                 URL: https://issues.apache.org/jira/browse/BEAM-13110
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-playground
>            Reporter: Aydar Zaynutdinov
>            Assignee: Aydar Zaynutdinov
>            Priority: P3
>              Labels: beam-playground-backend
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> * Add status Canceled
>  * Add new API method Cancel:
>  this method should set pipelinedId:canceled:true to cache
>  * Update code processing:
>  If the pipelinedId:canceled is true, then
>  ** set status Canceled
>  ** stop processes (which were generated to process code)  for pipelineId
>  ** clean resources (folders and files to process code) for pipelineId



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to