pabloem commented on a change in pull request #15926:
URL: https://github.com/apache/beam/pull/15926#discussion_r747204260



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)
+               return false
+       case <-cancelChan:
+               processCancel(ctx, cacheService, pipelineId)
+               return false
+       case ok := <-successChan:
+               var data []byte = nil
+               if dataChan != nil {
+                       temp := <-dataChan
+                       data = temp.([]byte)
+               }
                if !ok {
                        err := <-errorChannel
-                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-                       return
+                       processError(ctx, err, data, pipelineId, cacheService, 
errorCaseStatus)
+                       return false
                }
-               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
+               processSuccess(ctx, data, pipelineId, cacheService, 
successCaseStatus)
        }
+       return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
        logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+       // set to cache pipelineId: cache.Canceled: false to stop cancelCheck() 
method
+       setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was 
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was 
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan 
bool, cacheService cache.Cache) {
+       for {
+               cancel, err := cacheService.GetValue(ctx, pipelineId, 
cache.Canceled)

Review comment:
       should we have a timer here so that we won't overwhelm the cache with 
requests?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -272,6 +282,9 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
        errorChannel := make(chan error, 1)
        dataChannel := make(chan interface{}, 1)
        successChannel := make(chan bool, 1)
+       cancelChan := make(chan bool, 1)

Review comment:
       ```suggestion
        cancelChannel := make(chan bool, 1)
   ```
   
   Let's keep consistent naming for channel variables

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)
+               return false
+       case <-cancelChan:
+               processCancel(ctx, cacheService, pipelineId)
+               return false
+       case ok := <-successChan:
+               var data []byte = nil
+               if dataChan != nil {
+                       temp := <-dataChan
+                       data = temp.([]byte)
+               }
                if !ok {
                        err := <-errorChannel
-                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-                       return
+                       processError(ctx, err, data, pipelineId, cacheService, 
errorCaseStatus)
+                       return false
                }
-               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
+               processSuccess(ctx, data, pipelineId, cacheService, 
successCaseStatus)
        }
+       return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
        logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+       // set to cache pipelineId: cache.Canceled: false to stop cancelCheck() 
method
+       setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was 
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was 
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan 
bool, cacheService cache.Cache) {
+       for {
+               cancel, err := cacheService.GetValue(ctx, pipelineId, 
cache.Canceled)

Review comment:
       seems like Go has a ticker utility that could be useful here: 
https://gobyexample.com/tickers

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -447,13 +469,27 @@ func processSuccess(ctx context.Context, output []byte, 
pipelineId uuid.UUID, ca
 
                setToCache(ctx, cacheService, pipelineId, cache.RunOutput, 
string(output))
 
+               // set to cache pipelineId: cache.Canceled: false to stop 
cancelCheck() method
+               setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
+               // set to cache pipelineId: cache.SubKey_Status: 
pb.Status_STATUS_FINISHED
                setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_FINISHED)
        }
 }
 
+// processCancel process case when code processing was canceled
+func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId 
uuid.UUID) {
+       logger.Infof("%s: was canceled\n", pipelineId)
+
+       // set to cache pipelineId: cache.SubKey_Status: 
pb.Status_STATUS_CANCELED
+       setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_CANCELED)

Review comment:
       Should we get a `CancelFunc` for the context? If we receive a cancel 
request, we need to kill the running process (compilation or the pipeline or 
anything), so we need to cancel the context, right?

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {

Review comment:
       Let's keep consistent naming
   
   ```suggestion
   func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChannel chan bool, dataChannel chan 
interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus 
pb.Status) bool {
   ```

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)

Review comment:
       Let's rename this function to `finishByTimeout` since that's the only 
case when this is triggered (you've documented it on the function, which is 
great - but let's name it as well)

##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -221,7 +224,9 @@ func TestPlaygroundController_CheckStatus(t *testing.T) {
                        wantErr:    true,
                },
                {
-                       name: "all success",
+                       // Test case with calling CheckStatus method with 
pipelineId which contains status.
+                       // As a result, want to receive an expected status.

Review comment:
       thanks for adding the comments. They're helpful : )

##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -448,6 +453,58 @@ func TestPlaygroundController_GetRunError(t *testing.T) {
        }
 }
 
+func TestPlaygroundController_Cancel(t *testing.T) {

Review comment:
       The code in processCode is becoming more complex (by necessity) - I 
think it would make sense to make sure we're not leaking goroutines. Do you 
think we could add checks for that?
   
   I found this library that seems to check this properly: 
https://github.com/uber-go/goleak
   
   (or it may not fit our use case - but worth considering)

##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.Life
                }
        }(successChannel, errorChannel, dataChannel)
 
+       processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan, 
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR, 
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{}, 
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
        select {
-       case <-ctxWithTimeout.Done():
-               finishByContext(ctxWithTimeout, pipelineId, cacheService)
-               return
-       case ok := <-successChannel:
-               data := <-dataChannel
+       case <-ctx.Done():
+               finishByContext(ctx, pipelineId, cacheService)
+               return false
+       case <-cancelChan:
+               processCancel(ctx, cacheService, pipelineId)
+               return false
+       case ok := <-successChan:
+               var data []byte = nil
+               if dataChan != nil {
+                       temp := <-dataChan
+                       data = temp.([]byte)
+               }
                if !ok {
                        err := <-errorChannel
-                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
-                       return
+                       processError(ctx, err, data, pipelineId, cacheService, 
errorCaseStatus)
+                       return false
                }
-               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
+               processSuccess(ctx, data, pipelineId, cacheService, 
successCaseStatus)
        }
+       return true
 }
 
 // finishByContext is used in case of runCode method finished by timeout
 func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
        logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
 
+       // set to cache pipelineId: cache.Canceled: false to stop cancelCheck() 
method
+       setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
        // set to cache pipelineId: cache.SubKey_Status: 
Status_STATUS_RUN_TIMEOUT
        setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_RUN_TIMEOUT)
 }
 
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was 
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was 
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan 
bool, cacheService cache.Cache) {
+       for {
+               cancel, err := cacheService.GetValue(ctx, pipelineId, 
cache.Canceled)
+               if err != nil {
+                       continue

Review comment:
       I also wonder - should we handle the case where `pipelineId` does not 
exist in the cache?
   
   I think you've correctly set `canceled=false` in the cache for every 
possible pipeline finalization, so this should not be necessary - but if we 
missed it for any reason, we could leak goroutines, so it may be good to let 
the `cancelCheck` exit if the `pipelineId` is no longer in the cache. 
   
   what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to