pabloem commented on a change in pull request #15926:
URL: https://github.com/apache/beam/pull/15926#discussion_r747204260
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
}
}(successChannel, errorChannel, dataChannel)
+ processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan,
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR,
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{},
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
- case <-ctxWithTimeout.Done():
- finishByContext(ctxWithTimeout, pipelineId, cacheService)
- return
- case ok := <-successChannel:
- data := <-dataChannel
+ case <-ctx.Done():
+ finishByContext(ctx, pipelineId, cacheService)
+ return false
+ case <-cancelChan:
+ processCancel(ctx, cacheService, pipelineId)
+ return false
+ case ok := <-successChan:
+ var data []byte = nil
+ if dataChan != nil {
+ temp := <-dataChan
+ data = temp.([]byte)
+ }
if !ok {
err := <-errorChannel
- processError(ctxWithTimeout, err.(error),
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
- return
+ processError(ctx, err, data, pipelineId, cacheService,
errorCaseStatus)
+ return false
}
- processSuccess(ctxWithTimeout, data.([]byte), pipelineId,
cacheService, pb.Status_STATUS_FINISHED)
+ processSuccess(ctx, data, pipelineId, cacheService,
successCaseStatus)
}
+ return true
}
// finishByContext is used in case of runCode method finished by timeout
func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache) {
logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
+ // set to cache pipelineId: cache.Canceled: false to stop cancelCheck()
method
+ setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
// set to cache pipelineId: cache.SubKey_Status:
Status_STATUS_RUN_TIMEOUT
setToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_RUN_TIMEOUT)
}
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan
bool, cacheService cache.Cache) {
+ for {
+ cancel, err := cacheService.GetValue(ctx, pipelineId,
cache.Canceled)
Review comment:
should we have a timer here so that we won't overwhelm the cache with
requests?
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -272,6 +282,9 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
errorChannel := make(chan error, 1)
dataChannel := make(chan interface{}, 1)
successChannel := make(chan bool, 1)
+ cancelChan := make(chan bool, 1)
Review comment:
```suggestion
cancelChannel := make(chan bool, 1)
```
Let's keep consistent naming for channel variables
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
}
}(successChannel, errorChannel, dataChannel)
+ processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan,
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR,
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{},
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
- case <-ctxWithTimeout.Done():
- finishByContext(ctxWithTimeout, pipelineId, cacheService)
- return
- case ok := <-successChannel:
- data := <-dataChannel
+ case <-ctx.Done():
+ finishByContext(ctx, pipelineId, cacheService)
+ return false
+ case <-cancelChan:
+ processCancel(ctx, cacheService, pipelineId)
+ return false
+ case ok := <-successChan:
+ var data []byte = nil
+ if dataChan != nil {
+ temp := <-dataChan
+ data = temp.([]byte)
+ }
if !ok {
err := <-errorChannel
- processError(ctxWithTimeout, err.(error),
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
- return
+ processError(ctx, err, data, pipelineId, cacheService,
errorCaseStatus)
+ return false
}
- processSuccess(ctxWithTimeout, data.([]byte), pipelineId,
cacheService, pb.Status_STATUS_FINISHED)
+ processSuccess(ctx, data, pipelineId, cacheService,
successCaseStatus)
}
+ return true
}
// finishByContext is used in case of runCode method finished by timeout
func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache) {
logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
+ // set to cache pipelineId: cache.Canceled: false to stop cancelCheck()
method
+ setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
// set to cache pipelineId: cache.SubKey_Status:
Status_STATUS_RUN_TIMEOUT
setToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_RUN_TIMEOUT)
}
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan
bool, cacheService cache.Cache) {
+ for {
+ cancel, err := cacheService.GetValue(ctx, pipelineId,
cache.Canceled)
Review comment:
seems like Go has a ticker utility that could be useful here:
https://gobyexample.com/tickers
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -447,13 +469,27 @@ func processSuccess(ctx context.Context, output []byte,
pipelineId uuid.UUID, ca
setToCache(ctx, cacheService, pipelineId, cache.RunOutput,
string(output))
+ // set to cache pipelineId: cache.Canceled: false to stop
cancelCheck() method
+ setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
+ // set to cache pipelineId: cache.SubKey_Status:
pb.Status_STATUS_FINISHED
setToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_FINISHED)
}
}
+// processCancel process case when code processing was canceled
+func processCancel(ctx context.Context, cacheService cache.Cache, pipelineId
uuid.UUID) {
+ logger.Infof("%s: was canceled\n", pipelineId)
+
+ // set to cache pipelineId: cache.SubKey_Status:
pb.Status_STATUS_CANCELED
+ setToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_CANCELED)
Review comment:
Should we get a `CancelFunc` for the context? If we receive a cancel
request, we need to kill the running process (compilation or the pipeline or
anything), so we need to cancel the context, right?
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
}
}(successChannel, errorChannel, dataChannel)
+ processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan,
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR,
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{},
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
Review comment:
Let's keep consistent naming
```suggestion
func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChannel chan bool, dataChannel chan
interface{}, errorChannel chan error, errorCaseStatus, successCaseStatus
pb.Status) bool {
```
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
}
}(successChannel, errorChannel, dataChannel)
+ processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan,
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR,
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{},
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
- case <-ctxWithTimeout.Done():
- finishByContext(ctxWithTimeout, pipelineId, cacheService)
- return
- case ok := <-successChannel:
- data := <-dataChannel
+ case <-ctx.Done():
+ finishByContext(ctx, pipelineId, cacheService)
Review comment:
Let's rename this function to `finishByTimeout` since that's the only
case when this is triggered (you've documented it on the function, which is
great - but let's name it as well)
##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -221,7 +224,9 @@ func TestPlaygroundController_CheckStatus(t *testing.T) {
wantErr: true,
},
{
- name: "all success",
+ // Test case with calling CheckStatus method with
pipelineId which contains status.
+ // As a result, want to receive an expected status.
Review comment:
thanks for adding the comments. They're helpful : )
##########
File path: playground/backend/cmd/server/controller_test.go
##########
@@ -448,6 +453,58 @@ func TestPlaygroundController_GetRunError(t *testing.T) {
}
}
+func TestPlaygroundController_Cancel(t *testing.T) {
Review comment:
The code in processCode is becoming more complex (by necessity) - I
think it would make sense to make sure we're not leaking goroutines. Do you
think we could add checks for that?
I found this library that seems to check this properly:
https://github.com/uber-go/goleak
(or it may not fit our use case - but worth considering)
##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -366,29 +351,64 @@ func processCode(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.Life
}
}(successChannel, errorChannel, dataChannel)
+ processStep(ctxWithTimeout, pipelineId, cacheService, cancelChan,
successChannel, dataChannel, errorChannel, pb.Status_STATUS_RUN_ERROR,
pb.Status_STATUS_FINISHED)
+}
+
+// processStep processes each executor's step with cancel and timeout checks.
+// If finishes by canceling, timeout or error - returns false.
+// If finishes successfully returns true.
+func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache, cancelChan, successChan chan bool, dataChan chan interface{},
errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) bool {
select {
- case <-ctxWithTimeout.Done():
- finishByContext(ctxWithTimeout, pipelineId, cacheService)
- return
- case ok := <-successChannel:
- data := <-dataChannel
+ case <-ctx.Done():
+ finishByContext(ctx, pipelineId, cacheService)
+ return false
+ case <-cancelChan:
+ processCancel(ctx, cacheService, pipelineId)
+ return false
+ case ok := <-successChan:
+ var data []byte = nil
+ if dataChan != nil {
+ temp := <-dataChan
+ data = temp.([]byte)
+ }
if !ok {
err := <-errorChannel
- processError(ctxWithTimeout, err.(error),
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_RUN_ERROR)
- return
+ processError(ctx, err, data, pipelineId, cacheService,
errorCaseStatus)
+ return false
}
- processSuccess(ctxWithTimeout, data.([]byte), pipelineId,
cacheService, pb.Status_STATUS_FINISHED)
+ processSuccess(ctx, data, pipelineId, cacheService,
successCaseStatus)
}
+ return true
}
// finishByContext is used in case of runCode method finished by timeout
func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache) {
logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)
+ // set to cache pipelineId: cache.Canceled: false to stop cancelCheck()
method
+ setToCache(ctx, cacheService, pipelineId, cache.Canceled, false)
+
// set to cache pipelineId: cache.SubKey_Status:
Status_STATUS_RUN_TIMEOUT
setToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_RUN_TIMEOUT)
}
+// cancelCheck checks cancel flag for code processing.
+// If cancel flag doesn't exist in cache continue working.
+// If cancel flag exists, and it is true it means that code processing was
canceled. Set true to cancelChan and return.
+// If cancel flag exists, and it is false it means that code processing was
finished. Return.
+func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelChan chan
bool, cacheService cache.Cache) {
+ for {
+ cancel, err := cacheService.GetValue(ctx, pipelineId,
cache.Canceled)
+ if err != nil {
+ continue
Review comment:
I also wonder - should we handle the case where `pipelineId` does not
exist in the cache?
I think you've correctly set `canceled=false` in the cache for every
possible pipeline finalization, so this should not be necessary - but if we
missed it for any reason, we could leak goroutines, so it may be good to let
the `cancelCheck` exit if the `pipelineId` is no longer in the cache.
what do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]