daria-malkova commented on a change in pull request #16477:
URL: https://github.com/apache/beam/pull/16477#discussion_r783072505



##########
File path: playground/backend/internal/setup_tools/builder/setup_builder_test.go
##########
@@ -23,100 +23,293 @@ import (
        "beam.apache.org/playground/backend/internal/utils"
        "fmt"
        "github.com/google/uuid"
+       "reflect"
        "strings"
        "testing"
 )
 
-func TestSetupExecutor(t *testing.T) {
+var lc *fs_tool.LifeCycle
+var sdkEnv *environment.BeamEnvs
+
+func TestMain(m *testing.M) {
+       setup()
+       m.Run()
+}
+
+func setup() {
        pipelineId := uuid.New()
-       sdk := pb.Sdk_SDK_JAVA
-       lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
-       if err != nil {
-               t.Error(err)
-       }
-       pipelineOptions := ""
+       sdk := pb.Sdk_SDK_PYTHON
+       lc, _ = fs_tool.NewLifeCycle(sdk, pipelineId, "")
        executorConfig := &environment.ExecutorConfig{
                CompileCmd:  "MOCK_COMPILE_CMD",
-               RunCmd:      "MOCK_RUN_CMD",
-               TestCmd:     "MOCK_TEST_CMD",
                CompileArgs: []string{"MOCK_COMPILE_ARG"},
-               RunArgs:     []string{"MOCK_RUN_ARG"},
-               TestArgs:    []string{"MOCK_TEST_ARG"},
        }
+       sdkEnv = environment.NewBeamEnvs(sdk, executorConfig, "", 0)
+}
+
+func TestSetupValidatorBuilder(t *testing.T) {
+       vals, err := utils.GetValidators(sdkEnv.ApacheBeamSdk, 
lc.GetAbsoluteSourceFilePath())
        if err != nil {
                panic(err)
        }
+       wantExecutor := executors.NewExecutorBuilder().
+               WithValidator().
+               WithSdkValidators(vals)
 
-       srcFilePath := lc.GetAbsoluteSourceFilePath()
+       wrongSdkEnv := environment.NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, 
sdkEnv.ExecutorConfig, "", 0)
 
-       sdkEnv := environment.NewBeamEnvs(sdk, executorConfig, "", 0)
-       val, err := utils.GetValidators(sdk, srcFilePath)
-       if err != nil {
-               panic(err)
+       type args struct {
+               lc     *fs_tool.LifeCycle
+               sdkEnv *environment.BeamEnvs
+       }
+       tests := []struct {
+               name    string
+               args    args
+               want    *executors.ExecutorBuilder
+               wantErr bool
+       }{
+               {
+                       // Test case with calling Setup with correct data.
+                       // As a result, want to receive an expected validator 
builder.
+                       name: "Test correct validator builder",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: sdkEnv,
+                       },
+                       want:    &wantExecutor.ExecutorBuilder,
+                       wantErr: false,
+               },
+               {
+                       // Test case with calling Setup with incorrect SDK.
+                       // As a result, want to receive an error.
+                       name: "incorrect sdk",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: wrongSdkEnv,
+                       },
+                       want:    nil,
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, err := SetupValidatorBuilder(tt.args.lc, 
tt.args.sdkEnv)
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("SetupValidatorBuilder() error = %v, 
wantErr %v", err, tt.wantErr)
+                               return
+                       }
+                       if err != nil && !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("SetupValidatorBuilder() got = %v, 
want %v", got, tt.want)
+                               return
+                       }
+                       if err == nil && 
!reflect.DeepEqual(fmt.Sprint(got.Build()), fmt.Sprint(tt.want.Build())) {
+                               t.Errorf("SetupValidatorBuilder() got = %v\n, 
want %v", got.Build(), tt.want.Build())
+                       }
+               })
        }
-       prep, err := utils.GetPreparators(sdk, srcFilePath)
+}
+
+func TestSetupPreparatorBuilder(t *testing.T) {
+       prep, err := utils.GetPreparators(sdkEnv.ApacheBeamSdk, 
lc.GetAbsoluteSourceFilePath())
        if err != nil {
                panic(err)
        }
-
        wantExecutor := executors.NewExecutorBuilder().
-               WithExecutableFileName(lc.GetAbsoluteExecutableFilePath()).
-               WithWorkingDir(lc.GetAbsoluteBaseFolderPath()).
-               WithValidator().
-               WithSdkValidators(val).
                WithPreparator().
-               WithSdkPreparators(prep).
-               WithCompiler().
-               WithCommand(executorConfig.CompileCmd).
-               WithArgs(executorConfig.CompileArgs).
-               WithFileName(srcFilePath).
-               WithRunner().
-               WithCommand(executorConfig.RunCmd).
-               WithArgs(executorConfig.RunArgs).
-               WithPipelineOptions(strings.Split(pipelineOptions, " ")).
-               WithTestRunner().
-               WithCommand(executorConfig.TestCmd).
-               WithArgs(executorConfig.TestArgs).
-               WithWorkingDir(lc.GetAbsoluteBaseFolderPath()).
-               ExecutorBuilder
+               WithSdkPreparators(prep)
+
+       wrongSdkEnv := environment.NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, 
sdkEnv.ExecutorConfig, "", 0)
 
        type args struct {
-               lc              *fs_tool.LifeCycle
-               pipelineOptions string
-               sdkEnv          *environment.BeamEnvs
+               lc     *fs_tool.LifeCycle
+               sdkEnv *environment.BeamEnvs
        }
        tests := []struct {
                name    string
                args    args
                want    *executors.ExecutorBuilder
                wantErr bool
        }{
+               {
+                       // Test case with calling Setup with correct data.
+                       // As a result, want to receive an expected preparator 
builder.
+                       name: "Test correct preparator builder",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: sdkEnv,
+                       },
+                       want:    &wantExecutor.ExecutorBuilder,
+                       wantErr: false,
+               },
                {
                        // Test case with calling Setup with incorrect SDK.
                        // As a result, want to receive an error.
-                       name:    "incorrect sdk",
-                       args:    args{lc, pipelineOptions, 
environment.NewBeamEnvs(pb.Sdk_SDK_UNSPECIFIED, executorConfig, "", 0)},
+                       name: "incorrect sdk",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: wrongSdkEnv,
+                       },
                        want:    nil,
                        wantErr: true,
                },
-               {
-                       // Test case with calling Setup with correct SDK.
-                       // As a result, want to receive an expected builder.
-                       name:    "correct sdk",
-                       args:    args{lc, pipelineOptions, sdkEnv},
-                       want:    &wantExecutor,
-                       wantErr: false,
-               },
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       got, err := SetupExecutorBuilder(tt.args.lc, 
tt.args.pipelineOptions, tt.args.sdkEnv)
+                       got, err := SetupPreparatorBuilder(tt.args.lc, 
tt.args.sdkEnv)
                        if (err != nil) != tt.wantErr {
-                               t.Errorf("SetupExecutorBuilder() error = %v, 
wantErr %v", err, tt.wantErr)
+                               t.Errorf("SetupPreparatorBuilder() error = %v, 
wantErr %v", err, tt.wantErr)
                                return
                        }
-                       if err == nil && fmt.Sprint(got.Build()) != 
fmt.Sprint(tt.want.Build()) {
-                               t.Errorf("SetupExecutorBuilder() got = %v\n, 
want %v", got.Build(), tt.want.Build())
+                       if err != nil && !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("SetupPreparatorBuilder() got = %v, 
want %v", got, tt.want)
+                               return
+                       }
+                       if err == nil && 
!reflect.DeepEqual(fmt.Sprint(got.Build()), fmt.Sprint(tt.want.Build())) {
+                               t.Errorf("SetupPreparatorBuilder() got = %v, 
want %v", got.Build(), tt.want.Build())
+                       }
+               })
+       }
+}
+
+func TestSetupCompilerBuilder(t *testing.T) {
+       wantExecutor := executors.NewExecutorBuilder().
+               WithCompiler().
+               WithCommand(sdkEnv.ExecutorConfig.CompileCmd).
+               WithWorkingDir(lc.GetAbsoluteBaseFolderPath()).
+               WithArgs(sdkEnv.ExecutorConfig.CompileArgs).
+               WithFileName(lc.GetAbsoluteSourceFilePath())
+
+       type args struct {
+               lc     *fs_tool.LifeCycle
+               sdkEnv *environment.BeamEnvs
+       }
+       tests := []struct {
+               name string
+               args args
+               want *executors.ExecutorBuilder
+       }{
+               {
+                       // Test case with calling Setup with correct data.
+                       // As a result, want to receive an expected compiler 
builder.
+                       name: "Test correct compiler builder",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: sdkEnv,
+                       },
+                       want: &wantExecutor.ExecutorBuilder,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got := SetupCompilerBuilder(tt.args.lc, tt.args.sdkEnv)
+                       if !reflect.DeepEqual(fmt.Sprint(got.Build()), 
fmt.Sprint(tt.want.Build())) {
+                               t.Errorf("SetupCompilerBuilder() = %v, want 
%v", got.Build(), tt.want.Build())
+                       }
+               })
+       }
+}
+
+func TestSetupRunBuilder(t *testing.T) {
+       wantExecutor := executors.NewExecutorBuilder().
+               WithRunner().
+               WithExecutableFileName(lc.GetAbsoluteExecutableFilePath()).
+               WithWorkingDir(lc.GetAbsoluteBaseFolderPath()).
+               WithCommand(sdkEnv.ExecutorConfig.RunCmd).
+               WithArgs(sdkEnv.ExecutorConfig.RunArgs).
+               WithPipelineOptions(strings.Split("", " "))
+
+       type args struct {
+               lc              *fs_tool.LifeCycle
+               pipelineOptions string
+               sdkEnv          *environment.BeamEnvs
+       }
+       tests := []struct {
+               name string
+               args args
+               want *executors.ExecutorBuilder
+       }{
+               {
+                       // Test case with calling Setup with correct data.
+                       // As a result, want to receive an expected run builder.
+                       name: "Test correct run builder",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: sdkEnv,
+                       },
+                       want: &wantExecutor.ExecutorBuilder,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, _ := SetupRunBuilder(tt.args.lc, 
tt.args.pipelineOptions, tt.args.sdkEnv)
+                       if !reflect.DeepEqual(fmt.Sprint(got.Build()), 
fmt.Sprint(tt.want.Build())) {
+                               t.Errorf("SetupRunBuilder() got = %v, want %v", 
got.Build(), tt.want.Build())
+                       }
+               })
+       }
+}
+
+func TestSetupTestBuilder(t *testing.T) {
+       wantExecutor := executors.NewExecutorBuilder().
+               WithTestRunner().
+               WithExecutableFileName(lc.GetAbsoluteExecutableFilePath()).
+               WithCommand(sdkEnv.ExecutorConfig.TestCmd).
+               WithArgs(sdkEnv.ExecutorConfig.TestArgs).
+               WithWorkingDir(lc.GetAbsoluteSourceFolderPath())
+
+       type args struct {
+               lc     *fs_tool.LifeCycle
+               sdkEnv *environment.BeamEnvs
+       }
+       tests := []struct {
+               name string
+               args args
+               want *executors.ExecutorBuilder
+       }{
+               {
+                       // Test case with calling Setup with correct data.
+                       // As a result, want to receive an expected test 
builder.
+                       name: "Test correct test builder",
+                       args: args{
+                               lc:     lc,
+                               sdkEnv: sdkEnv,
+                       },
+                       want: &wantExecutor.ExecutorBuilder,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, _ := SetupTestBuilder(tt.args.lc, tt.args.sdkEnv)
+                       if !reflect.DeepEqual(fmt.Sprint(got.Build()), 
fmt.Sprint(tt.want.Build())) {
+                               t.Errorf("SetupTestBuilder() got = %v, want 
%v", got.Build(), tt.want.Build())
+                       }
+               })
+       }
+}
+
+func Test_replaceLogPlaceholder(t *testing.T) {
+       type args struct {
+               lc             *fs_tool.LifeCycle
+               executorConfig *environment.ExecutorConfig
+       }
+       tests := []struct {
+               name string
+               args args
+               want []string
+       }{
+               {
+                       name: "",

Review comment:
       I will :) 

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -210,30 +154,122 @@ func Process(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.LifeCycl
        _ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, 
stopReadLogsChannel, finishReadLogsChannel)
 }
 
+func compileStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) 
*executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       var executor = executors.Executor{}
+       // This condition is used for cases when the playground doesn't compile 
source files. For the Python code and the Go Unit Tests
+       if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk 
== pb.Sdk_SDK_GO && isUnitTest) {
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
[]byte(""), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       } else { // in case of Java, Go (not unit test), Scala - need compile 
step
+               // Compile
+               executorBuilder := builder.SetupCompilerBuilder(lc, sdkEnv)
+               executor := executorBuilder.Build()
+               logger.Infof("%s: Compile() ...\n", pipelineId)
+               compileCmd := executor.Compile(pipelineLifeCycleCtx)
+               var compileError bytes.Buffer
+               var compileOutput bytes.Buffer
+               runCmdWithOutput(compileCmd, &compileOutput, &compileError, 
successChannel, errorChannel)
+
+               // Start of the monitoring of background tasks (compile 
step/cancellation/timeout)
+               ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, 
pipelineId, cacheService, cancelChannel, successChannel)
+               if err != nil {
+                       return nil
+               }
+               if !ok { // Compile step is finished, but code couldn't be 
compiled (some typos for example)
+                       err := <-errorChannel
+                       _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, 
err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, 
"Compile", pb.Status_STATUS_COMPILE_ERROR)
+                       return nil
+               } // Compile step is finished and code is compiled
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
compileOutput.Bytes(), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       }
+       return &executor
+}
+
+func prepareStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
pipelineLifeCycleCtx context.Context, validationResults *sync.Map, 
cancelChannel chan bool) *executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       executorBuilder, err := builder.SetupPreparatorBuilder(lc, sdkEnv)
+       if err != nil {
+               _ = processSetupError(err, pipelineId, cacheService, 
pipelineLifeCycleCtx)
+               return nil
+       }
+       executor := executorBuilder.Build()
+       // Prepare
+       logger.Infof("%s: Prepare() ...\n", pipelineId)
+       prepareFunc := executor.Prepare()
+       // Run prepare function
+       go prepareFunc(successChannel, errorChannel, validationResults)
+
+       // Start of the monitoring of background tasks (prepare 
function/cancellation/timeout)
+       ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, 
pipelineId, cacheService, cancelChannel, successChannel)
+       if err != nil {
+               return nil
+       }
+       if !ok {
+               err := <-errorChannel
+               // Prepare step is finished, but code couldn't be prepared 
(some error during prepare step)
+               _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, 
[]byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, 
"Prepare", pb.Status_STATUS_PREPARATION_ERROR)
+               return nil
+       }
+       // Prepare step is finished and code is prepared
+       if err := processSuccess(pipelineLifeCycleCtx, pipelineId, 
cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
+               return nil
+       }
+       return &executor
+}
+
+func validateStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
pipelineLifeCycleCtx context.Context, validationResults *sync.Map, 
cancelChannel chan bool) *executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       executorBuilder, err := builder.SetupValidatorBuilder(lc, sdkEnv)
+       if err != nil {
+               _ = processSetupError(err, pipelineId, cacheService, 
pipelineLifeCycleCtx)
+               return nil
+       }
+       executor := executorBuilder.Build()
+       // Validate

Review comment:
       Done

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -210,30 +154,122 @@ func Process(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.LifeCycl
        _ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, 
stopReadLogsChannel, finishReadLogsChannel)
 }
 
+func compileStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) 
*executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       var executor = executors.Executor{}
+       // This condition is used for cases when the playground doesn't compile 
source files. For the Python code and the Go Unit Tests
+       if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk 
== pb.Sdk_SDK_GO && isUnitTest) {
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
[]byte(""), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       } else { // in case of Java, Go (not unit test), Scala - need compile 
step
+               // Compile
+               executorBuilder := builder.SetupCompilerBuilder(lc, sdkEnv)
+               executor := executorBuilder.Build()
+               logger.Infof("%s: Compile() ...\n", pipelineId)
+               compileCmd := executor.Compile(pipelineLifeCycleCtx)
+               var compileError bytes.Buffer
+               var compileOutput bytes.Buffer
+               runCmdWithOutput(compileCmd, &compileOutput, &compileError, 
successChannel, errorChannel)
+
+               // Start of the monitoring of background tasks (compile 
step/cancellation/timeout)
+               ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, 
pipelineId, cacheService, cancelChannel, successChannel)
+               if err != nil {
+                       return nil
+               }
+               if !ok { // Compile step is finished, but code couldn't be 
compiled (some typos for example)
+                       err := <-errorChannel
+                       _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, 
err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, 
"Compile", pb.Status_STATUS_COMPILE_ERROR)
+                       return nil
+               } // Compile step is finished and code is compiled
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
compileOutput.Bytes(), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       }
+       return &executor
+}
+
+func prepareStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
pipelineLifeCycleCtx context.Context, validationResults *sync.Map, 
cancelChannel chan bool) *executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       executorBuilder, err := builder.SetupPreparatorBuilder(lc, sdkEnv)
+       if err != nil {
+               _ = processSetupError(err, pipelineId, cacheService, 
pipelineLifeCycleCtx)
+               return nil
+       }
+       executor := executorBuilder.Build()
+       // Prepare

Review comment:
       No, done

##########
File path: playground/backend/internal/setup_tools/builder/setup_builder.go
##########
@@ -31,73 +31,134 @@ const (
        javaLogConfigFilePlaceholder = "{logConfigFile}"
 )
 
-// SetupExecutorBuilder return executor with set args for validator, 
preparator, compiler and runner
-func SetupExecutorBuilder(lc *fs_tool.LifeCycle, pipelineOptions string, 
sdkEnv *environment.BeamEnvs) (*executors.ExecutorBuilder, error) {
+// SetupValidatorBuilder return executor with set args for validator
+func SetupValidatorBuilder(lc *fs_tool.LifeCycle, sdkEnv 
*environment.BeamEnvs) (*executors.ExecutorBuilder, error) {
        sdk := sdkEnv.ApacheBeamSdk
-
-       if sdk == pb.Sdk_SDK_JAVA {
-               pipelineOptions = utils.ReplaceSpacesWithEquals(pipelineOptions)
-       }
-
        val, err := utils.GetValidators(sdk, lc.GetAbsoluteSourceFilePath())
        if err != nil {
                return nil, err
        }
+       builder := executors.NewExecutorBuilder().
+               WithValidator().
+               WithSdkValidators(val).
+               ExecutorBuilder
+       return &builder, err
+}
+
+// SetupPreparatorBuilder return executor with set args for preparator
+func SetupPreparatorBuilder(lc *fs_tool.LifeCycle, sdkEnv 
*environment.BeamEnvs) (*executors.ExecutorBuilder, error) {
+       sdk := sdkEnv.ApacheBeamSdk
        prep, err := utils.GetPreparators(sdk, lc.GetAbsoluteSourceFilePath())
        if err != nil {
                return nil, err
        }
-       executorConfig := sdkEnv.ExecutorConfig
        builder := executors.NewExecutorBuilder().
-               WithExecutableFileName(lc.GetAbsoluteExecutableFilePath()).
-               WithWorkingDir(lc.GetAbsoluteBaseFolderPath()).
-               WithValidator().
-               WithSdkValidators(val).
                WithPreparator().
                WithSdkPreparators(prep).
+               ExecutorBuilder
+       return &builder, err
+}
+
+// SetupCompilerBuilder return executor with set args for compiler
+func SetupCompilerBuilder(lc *fs_tool.LifeCycle, sdkEnv *environment.BeamEnvs) 
*executors.ExecutorBuilder {

Review comment:
       Done

##########
File path: playground/backend/internal/code_processing/code_processing.go
##########
@@ -210,30 +154,122 @@ func Process(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.LifeCycl
        _ = processRunSuccess(pipelineLifeCycleCtx, pipelineId, cacheService, 
stopReadLogsChannel, finishReadLogsChannel)
 }
 
+func compileStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
isUnitTest bool, pipelineLifeCycleCtx context.Context, cancelChannel chan bool) 
*executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       var executor = executors.Executor{}
+       // This condition is used for cases when the playground doesn't compile 
source files. For the Python code and the Go Unit Tests
+       if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_PYTHON || (sdkEnv.ApacheBeamSdk 
== pb.Sdk_SDK_GO && isUnitTest) {
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
[]byte(""), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       } else { // in case of Java, Go (not unit test), Scala - need compile 
step
+               // Compile
+               executorBuilder := builder.SetupCompilerBuilder(lc, sdkEnv)
+               executor := executorBuilder.Build()
+               logger.Infof("%s: Compile() ...\n", pipelineId)
+               compileCmd := executor.Compile(pipelineLifeCycleCtx)
+               var compileError bytes.Buffer
+               var compileOutput bytes.Buffer
+               runCmdWithOutput(compileCmd, &compileOutput, &compileError, 
successChannel, errorChannel)
+
+               // Start of the monitoring of background tasks (compile 
step/cancellation/timeout)
+               ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, 
pipelineId, cacheService, cancelChannel, successChannel)
+               if err != nil {
+                       return nil
+               }
+               if !ok { // Compile step is finished, but code couldn't be 
compiled (some typos for example)
+                       err := <-errorChannel
+                       _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, 
err, compileError.Bytes(), pipelineId, cache.CompileOutput, cacheService, 
"Compile", pb.Status_STATUS_COMPILE_ERROR)
+                       return nil
+               } // Compile step is finished and code is compiled
+               if err := processCompileSuccess(pipelineLifeCycleCtx, 
compileOutput.Bytes(), pipelineId, cacheService); err != nil {
+                       return nil
+               }
+       }
+       return &executor
+}
+
+func prepareStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
pipelineLifeCycleCtx context.Context, validationResults *sync.Map, 
cancelChannel chan bool) *executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       executorBuilder, err := builder.SetupPreparatorBuilder(lc, sdkEnv)
+       if err != nil {
+               _ = processSetupError(err, pipelineId, cacheService, 
pipelineLifeCycleCtx)
+               return nil
+       }
+       executor := executorBuilder.Build()
+       // Prepare
+       logger.Infof("%s: Prepare() ...\n", pipelineId)
+       prepareFunc := executor.Prepare()
+       // Run prepare function
+       go prepareFunc(successChannel, errorChannel, validationResults)
+
+       // Start of the monitoring of background tasks (prepare 
function/cancellation/timeout)
+       ok, err := reconcileBackgroundTask(pipelineLifeCycleCtx, ctx, 
pipelineId, cacheService, cancelChannel, successChannel)
+       if err != nil {
+               return nil
+       }
+       if !ok {
+               err := <-errorChannel
+               // Prepare step is finished, but code couldn't be prepared 
(some error during prepare step)
+               _ = processErrorWithSavingOutput(pipelineLifeCycleCtx, err, 
[]byte(err.Error()), pipelineId, cache.PreparationOutput, cacheService, 
"Prepare", pb.Status_STATUS_PREPARATION_ERROR)
+               return nil
+       }
+       // Prepare step is finished and code is prepared
+       if err := processSuccess(pipelineLifeCycleCtx, pipelineId, 
cacheService, "Prepare", pb.Status_STATUS_COMPILING); err != nil {
+               return nil
+       }
+       return &executor
+}
+
+func validateStep(ctx context.Context, cacheService cache.Cache, lc 
*fs_tool.LifeCycle, pipelineId uuid.UUID, sdkEnv *environment.BeamEnvs, 
pipelineLifeCycleCtx context.Context, validationResults *sync.Map, 
cancelChannel chan bool) *executors.Executor {
+       errorChannel, successChannel := createStatusChannels()
+       executorBuilder, err := builder.SetupValidatorBuilder(lc, sdkEnv)
+       if err != nil {
+               _ = processSetupError(err, pipelineId, cacheService, 
pipelineLifeCycleCtx)
+               return nil
+       }
+       executor := executorBuilder.Build()
+       // Validate
+       logger.Infof("%s: Validate() ...\n", pipelineId)
+       validateFunc := executor.Validate()
+       // Run validate function

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to