This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a2acc  Merge pull request #16121 from [BEAM-13334][Playground] Save 
Go logs to the cache
97a2acc is described below

commit 97a2acc599982f3bbd5ae498b5a8ff5968e84ca6
Author: Aydar Zainutdinov <[email protected]>
AuthorDate: Mon Dec 13 17:27:18 2021 +0300

    Merge pull request #16121 from [BEAM-13334][Playground] Save Go logs to the 
cache
    
    * [BEAM-13334][Playground]
    Implement adding go code processing logs to the cache
    
    * [BEAM-13334][Playground]
    Changed string concatenation to string formatting
    
    * [BEAM-13334][Playground]
    Add of using t.Error() instead of panic(err)
    
    * [BEAM-13334][Playground]
    Added comments;
    Renaming
---
 .../internal/code_processing/code_processing.go    | 31 +++++++++++++++++++---
 .../setup_tools/builder/setup_builder_test.go      |  3 +++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/playground/backend/internal/code_processing/code_processing.go 
b/playground/backend/internal/code_processing/code_processing.go
index ad4336d..ea7e611 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -170,7 +170,22 @@ func Process(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.LifeCycl
        var runError bytes.Buffer
        runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, 
CacheService: cacheService, PipelineId: pipelineId}
        go readLogFile(ctxWithTimeout, cacheService, 
lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel, 
finishReadLogsChannel)
-       runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, 
errorChannel)
+
+       if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+               // For go SDK all logs are placed to stdErr.
+               file, err := os.Create(lc.GetAbsoluteLogFilePath())
+               if err != nil {
+                       // If some error with creating a log file do the same 
as with other SDK.
+                       logger.Errorf("%s: error during create log file (go 
sdk): %s", pipelineId, err.Error())
+                       runCmdWithOutput(runCmd, &runOutput, &runError, 
successChannel, errorChannel)
+               } else {
+                       // Use the log file to write all stdErr into it.
+                       runCmdWithOutput(runCmd, &runOutput, file, 
successChannel, errorChannel)
+               }
+       } else {
+               // Other SDKs write logs to the log file on their own.
+               runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, 
errorChannel)
+       }
 
        // Start of the monitoring of background tasks (run 
step/cancellation/timeout)
        ok, err = reconcileBackgroundTask(ctxWithTimeout, pipelineId, 
cacheService, cancelChannel, successChannel)
@@ -179,6 +194,14 @@ func Process(ctx context.Context, cacheService 
cache.Cache, lc *fs_tool.LifeCycl
        }
        if !ok {
                // Run step is finished, but code contains some error (divide 
by 0 for example)
+               if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+                       // For Go SDK stdErr was redirected to the log file.
+                       errData, err := os.ReadFile(lc.GetAbsoluteLogFilePath())
+                       if err != nil {
+                               logger.Errorf("%s: error during read errors 
from log file (go sdk): %s", pipelineId, err.Error())
+                       }
+                       runError.Write(errData)
+               }
                _ = processRunError(ctxWithTimeout, errorChannel, 
runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel, 
finishReadLogsChannel)
                return
        }
@@ -272,7 +295,7 @@ func GetLastIndex(ctx context.Context, cacheService 
cache.Cache, key uuid.UUID,
 }
 
 // runCmdWithOutput runs command with keeping stdOut and stdErr
-func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError 
*bytes.Buffer, successChannel chan bool, errorChannel chan error) {
+func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer, 
successChannel chan bool, errorChannel chan error) {
        cmd.Stdout = stdOutput
        cmd.Stderr = stdError
        go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error) 
{
@@ -411,7 +434,7 @@ func processCompileError(ctx context.Context, errorChannel 
chan error, errorOutp
        err := <-errorChannel
        logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId, 
err.Error(), errorOutput)
 
-       if err := utils.SetToCache(ctx, cacheService, pipelineId, 
cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput)); 
err != nil {
+       if err := utils.SetToCache(ctx, cacheService, pipelineId, 
cache.CompileOutput, fmt.Sprintf("error: %s, output: %s", err.Error(), 
string(errorOutput))); err != nil {
                return err
        }
        return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status, 
pb.Status_STATUS_COMPILE_ERROR)
@@ -425,7 +448,7 @@ func processRunError(ctx context.Context, errorChannel chan 
error, errorOutput [
        err := <-errorChannel
        logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId, 
err.Error(), errorOutput)
 
-       if err := utils.SetToCache(ctx, cacheService, pipelineId, 
cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err != 
nil {
+       if err := utils.SetToCache(ctx, cacheService, pipelineId, 
cache.RunError, fmt.Sprintf("error: %s, output: %s", err.Error(), 
string(errorOutput))); err != nil {
                return err
        }
 
diff --git 
a/playground/backend/internal/setup_tools/builder/setup_builder_test.go 
b/playground/backend/internal/setup_tools/builder/setup_builder_test.go
index 1dcbf4a..3868369 100644
--- a/playground/backend/internal/setup_tools/builder/setup_builder_test.go
+++ b/playground/backend/internal/setup_tools/builder/setup_builder_test.go
@@ -31,6 +31,9 @@ func TestSetupExecutor(t *testing.T) {
        pipelineId := uuid.New()
        sdk := pb.Sdk_SDK_JAVA
        lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
+       if err != nil {
+               t.Error(err)
+       }
        pipelineOptions := ""
        executorConfig := &environment.ExecutorConfig{
                CompileCmd:  "MOCK_COMPILE_CMD",

Reply via email to