AydarZaynutdinov commented on a change in pull request #15926:
URL: https://github.com/apache/beam/pull/15926#discussion_r747414238



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)
+               return false
+       case <-cancelChan:
+               processCancel(ctx, cacheService, pipelineId)
+               return false
+       case ok := <-successChan:
+               var data []byte = nil
+               if dataChan != nil {
+                       temp := <-dataChan
+                       data = temp.([]byte)
+               }
                if !ok {
                        err := <-errorChannel
-                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-                       return
+                       processError(ctx, err, data, pipelineId, cacheService, 
errorCaseStatus)
+                       return false
                }
-               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
+               processSuccess(ctx, data, pipelineId, cacheService, 
successCaseStatus)
        }
+       return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
        logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+       // set to cache pipelineId: cache.Canceled: false to stop cancelCheck() 
method
+       setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was 
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was 
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan 
bool, cacheService cache.Cache) {
+       for {
+               cancel, err := cacheService.GetValue(ctx, pipelineId, 
cache.Canceled)
+               if err != nil {
+                       continue

Review comment:
       I updated this part based on changes with the `timer`.
   So now if the context of the `processCode()` method is done then the 
`cancelCheck()` should stop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to