AydarZaynutdinov commented on a change in pull request #16121:
URL: https://github.com/apache/beam/pull/16121#discussion_r763027811
##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -261,69 +368,92 @@ func DeleteFolders(pipelineId uuid.UUID, lc
*fs_tool.LifeCycle) {
}
// finishByTimeout is used in case of runCode method finished by timeout
-func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache) {
+func finishByTimeout(ctx context.Context, pipelineId uuid.UUID, cacheService
cache.Cache) error {
logger.Errorf("%s: code processing finishes because of timeout\n",
pipelineId)
// set to cache pipelineId: cache.SubKey_Status:
Status_STATUS_RUN_TIMEOUT
- cacheService.SetValue(ctx, pipelineId, cache.Status,
pb.Status_STATUS_RUN_TIMEOUT)
+ return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_RUN_TIMEOUT)
}
-// processError processes error received during processing code via setting a
corresponding status and output to cache
-func processError(ctx context.Context, err error, data []byte, pipelineId
uuid.UUID, cacheService cache.Cache, status pb.Status) {
- switch status {
- case pb.Status_STATUS_VALIDATION_ERROR:
- logger.Errorf("%s: Validate: %s\n", pipelineId, err.Error())
-
- cacheService.SetValue(ctx, pipelineId, cache.Status,
pb.Status_STATUS_VALIDATION_ERROR)
- case pb.Status_STATUS_PREPARATION_ERROR:
- logger.Errorf("%s: Prepare: %s\n", pipelineId, err.Error())
+// processError processes error received during processing validation or
preparation steps.
+// This method sets corresponding status to the cache.
+func processError(ctx context.Context, errorChannel chan error, pipelineId
uuid.UUID, cacheService cache.Cache, errorTitle string, newStatus pb.Status)
error {
+ err := <-errorChannel
+ logger.Errorf("%s: %s(): %s\n", pipelineId, errorTitle, err.Error())
- cacheService.SetValue(ctx, pipelineId, cache.Status,
pb.Status_STATUS_PREPARATION_ERROR)
- case pb.Status_STATUS_COMPILE_ERROR:
- logger.Errorf("%s: Compile: err: %s, output: %s\n", pipelineId,
err.Error(), data)
+ return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status,
newStatus)
+}
- cacheService.SetValue(ctx, pipelineId, cache.CompileOutput,
"error: "+err.Error()+", output: "+string(data))
+// processCompileError processes error received during processing compile step.
+// This method sets error output and corresponding status to the cache.
+func processCompileError(ctx context.Context, errorChannel chan error,
errorOutput []byte, pipelineId uuid.UUID, cacheService cache.Cache) error {
+ err := <-errorChannel
+ logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId,
err.Error(), errorOutput)
- cacheService.SetValue(ctx, pipelineId, cache.Status,
pb.Status_STATUS_COMPILE_ERROR)
- case pb.Status_STATUS_RUN_ERROR:
- logger.Errorf("%s: Run: err: %s, output: %s\n", pipelineId,
err.Error(), data)
+ if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput));
err != nil {
+ return err
+ }
+ return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_COMPILE_ERROR)
+}
- cacheService.SetValue(ctx, pipelineId, cache.RunError, "error:
"+err.Error()+", output: "+string(data))
+// processRunError processes error received during processing run step.
+// This method sets error output to the cache and after that sets value to
channel to stop goroutine which writes logs.
+// After receiving a signal that goroutine was finished (read value from
finishReadLogsChannel) this method
+// sets corresponding status to the cache.
+func processRunError(ctx context.Context, errorChannel chan error, errorOutput
[]byte, pipelineId uuid.UUID, cacheService cache.Cache, stopReadLogsChannel,
finishReadLogsChannel chan bool) error {
+ err := <-errorChannel
+ logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId,
err.Error(), errorOutput)
- cacheService.SetValue(ctx, pipelineId, cache.Status,
pb.Status_STATUS_RUN_ERROR)
+ if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err !=
nil {
Review comment:
Changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]