AydarZaynutdinov commented on a change in pull request #15804:
URL: https://github.com/apache/beam/pull/15804#discussion_r746537654



##########
File path: playground/backend/cmd/server/controller.go
##########
@@ -237,58 +237,138 @@ func setupValidators(sdk pb.Sdk, filepath string) 
*[]validators.Validator {
 
 // processCode validates, compiles and runs code by pipelineId.
 // During each operation updates status of execution and saves it into cache:
+// - In case of processing works more that timeout duration saves 
playground.Status_STATUS_RUN_TIMEOUT as cache.Status into cache.
 // - In case of validation step is failed saves playground.Status_STATUS_ERROR 
as cache.Status into cache.
 // - In case of compile step is failed saves 
playground.Status_STATUS_COMPILE_ERROR as cache.Status and compile logs as 
cache.CompileOutput into cache.
 // - In case of compile step is completed with no errors saves empty string 
("") as cache.CompileOutput into cache.
 // - In case of run step is failed saves playground.Status_STATUS_ERROR as 
cache.Status and run logs as cache.RunOutput into cache.
 // - In case of run step is completed with no errors saves 
playground.Status_STATUS_FINISHED as cache.Status and run output as 
cache.RunOutput into cache.
 // At the end of this method deletes all created folders.
 func processCode(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, compileBuilder *executors.CompileBuilder, pipelineId 
uuid.UUID, env *environment.Environment, sdk pb.Sdk) {
-       defer cleanUp(pipelineId, lc)
+       ctxWithTimeout, cancelByTimeoutFunc := context.WithTimeout(ctx, 
env.ApplicationEnvs.PipelineExecuteTimeout())
+       defer func(lc *fs_tool.LifeCycle) {
+               cancelByTimeoutFunc()
+               cleanUp(pipelineId, lc)
+       }(lc)
+
+       errorChannel := make(chan error, 1)
+       dataChannel := make(chan interface{}, 1)
+       doneChannel := make(chan bool, 1)
 
        // build executor for validate and compile steps
        exec := compileBuilder.Build()
 
        // validate
        logger.Infof("%s: Validate() ...\n", pipelineId)
        validateFunc := exec.Validate()
-       if err := validateFunc(); err != nil {
-               processError(ctx, err, nil, pipelineId, cacheService, 
pb.Status_STATUS_VALIDATION_ERROR)
+       go validateFunc(doneChannel, errorChannel)
+
+       select {
+       case <-ctxWithTimeout.Done():
+               finishByContext(ctxWithTimeout, pipelineId, cacheService)
+               return
+       case ok := <-doneChannel:
+               if !ok {
+                       err := <-errorChannel
+                       processError(ctx, err, nil, pipelineId, cacheService, 
pb.Status_STATUS_VALIDATION_ERROR)
+                       return
+               }
+               processSuccess(ctx, nil, pipelineId, cacheService, 
pb.Status_STATUS_PREPARING)
+       }
+
+       // prepare
+       logger.Info("%s: Prepare() ...\n", pipelineId)
+       prepareFunc := exec.Prepare()
+       go prepareFunc(doneChannel, errorChannel)
+
+       select {
+       case <-ctxWithTimeout.Done():
+               finishByContext(ctxWithTimeout, pipelineId, cacheService)
                return
-       } else {
+       case ok := <-doneChannel:
+               if !ok {
+                       err := <-errorChannel
+                       processError(ctx, err, nil, pipelineId, cacheService, 
pb.Status_STATUS_PREPARATION_ERROR)
+                       return
+               }
                processSuccess(ctx, nil, pipelineId, cacheService, 
pb.Status_STATUS_COMPILING)
        }
 
        // compile
        logger.Infof("%s: Compile() ...\n", pipelineId)
-       compileCmd := exec.Compile()
-       if data, err := compileCmd.CombinedOutput(); err != nil {
-               processError(ctx, err, data, pipelineId, cacheService, 
pb.Status_STATUS_COMPILE_ERROR)
+       compileCmd := exec.Compile(ctxWithTimeout)
+       go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+               data, err := compileCmd.CombinedOutput()
+               dataCh <- data
+               if err != nil {
+                       errCh <- err
+                       doneCh <- false
+               } else {
+                       doneCh <- true
+               }
+       }(doneChannel, errorChannel, dataChannel)
+
+       select {
+       case <-ctxWithTimeout.Done():
+               finishByContext(ctxWithTimeout, pipelineId, cacheService)
                return
-       } else {
-               processSuccess(ctx, data, pipelineId, cacheService, 
pb.Status_STATUS_EXECUTING)
+       case ok := <-doneChannel:
+               data := <-dataChannel
+               if !ok {
+                       err := <-errorChannel
+                       processError(ctxWithTimeout, err, data.([]byte), 
pipelineId, cacheService, pb.Status_STATUS_COMPILE_ERROR)
+                       return
+               }
+               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_EXECUTING)
        }
 
        runBuilder, err := setupRunBuilder(pipelineId, lc, sdk, env, 
compileBuilder)
        if err != nil {
                logger.Errorf("%s: error during setup runBuilder: %s\n", 
pipelineId, err.Error())
-               setToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_ERROR)
+               setToCache(ctxWithTimeout, cacheService, pipelineId, 
cache.Status, pb.Status_STATUS_ERROR)
                return
        }
 
        // build executor for run step
        exec = runBuilder.Build()
 
+       // run
        logger.Infof("%s: Run() ...\n", pipelineId)
-       runCmd := exec.Run()
-       if data, err := runCmd.CombinedOutput(); err != nil {
-               processError(ctx, err, data, pipelineId, cacheService, 
pb.Status_STATUS_ERROR)
+       runCmd := exec.Run(ctxWithTimeout)
+       go func(doneCh chan bool, errCh chan error, dataCh chan interface{}) {
+               data, err := runCmd.CombinedOutput()
+               dataCh <- data
+               if err != nil {
+                       errCh <- err
+                       doneCh <- false
+               } else {
+                       doneCh <- true
+               }
+       }(doneChannel, errorChannel, dataChannel)
+
+       select {
+       case <-ctxWithTimeout.Done():
+               finishByContext(ctxWithTimeout, pipelineId, cacheService)
                return
-       } else {
-               processSuccess(ctx, data, pipelineId, cacheService, 
pb.Status_STATUS_FINISHED)
+       case ok := <-doneChannel:
+               data := <-dataChannel
+               if !ok {
+                       err := <-errorChannel
+                       processError(ctxWithTimeout, err.(error), 
data.([]byte), pipelineId, cacheService, pb.Status_STATUS_ERROR)
+                       return
+               }
+               processSuccess(ctxWithTimeout, data.([]byte), pipelineId, 
cacheService, pb.Status_STATUS_FINISHED)
        }
 }
 
+// finishByContext is used in case of runCode method finished by timeout
+func finishByContext(ctx context.Context, pipelineId uuid.UUID, cacheService 
cache.Cache) {
+       logger.Errorf("%s: processCode finish because of timeout\n", pipelineId)

Review comment:
       Now before each step (validate/prepare/…) we have a log string like:
   `logger.Infof("%s: Validate() ...\n", pipelineId)` in the `processCode()` 
method.
   At the end of each step we also have a log string like:
   `logger.Infof("%s: Validate() finish\n", pipelineId)` in the 
`processSuccess()` method.
   For example:
   ```
   ...
   {pipelineId}: Validate() ...
   {pipelineId}: Validate() finish
   {pipelineId}: Prepare() ...
   {pipelineId}: processCode finish because of timeout
   ...
   ```
   So I guess using these logs we could understand which step timeout has 
occurred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to