This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 97a2acc Merge pull request #16121 from [BEAM-13334][Playground] Save
Go logs to the cache
97a2acc is described below
commit 97a2acc599982f3bbd5ae498b5a8ff5968e84ca6
Author: Aydar Zainutdinov <[email protected]>
AuthorDate: Mon Dec 13 17:27:18 2021 +0300
Merge pull request #16121 from [BEAM-13334][Playground] Save Go logs to the
cache
* [BEAM-13334][Playground]
Implement adding go code processing logs to the cache
* [BEAM-13334][Playground]
Changed string concatenation to string formatting
* [BEAM-13334][Playground]
Add of using t.Error() instead of panic(err)
* [BEAM-13334][Playground]
Added comments;
Renaming
---
.../internal/code_processing/code_processing.go | 31 +++++++++++++++++++---
.../setup_tools/builder/setup_builder_test.go | 3 +++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/playground/backend/internal/code_processing/code_processing.go
b/playground/backend/internal/code_processing/code_processing.go
index ad4336d..ea7e611 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -170,7 +170,22 @@ func Process(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.LifeCycl
var runError bytes.Buffer
runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout,
CacheService: cacheService, PipelineId: pipelineId}
go readLogFile(ctxWithTimeout, cacheService,
lc.GetAbsoluteLogFilePath(), pipelineId, stopReadLogsChannel,
finishReadLogsChannel)
- runCmdWithOutput(runCmd, &runOutput, &runError, successChannel,
errorChannel)
+
+ if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+ // For go SDK all logs are placed to stdErr.
+ file, err := os.Create(lc.GetAbsoluteLogFilePath())
+ if err != nil {
+ // If some error with creating a log file do the same
as with other SDK.
+ logger.Errorf("%s: error during create log file (go
sdk): %s", pipelineId, err.Error())
+ runCmdWithOutput(runCmd, &runOutput, &runError,
successChannel, errorChannel)
+ } else {
+ // Use the log file to write all stdErr into it.
+ runCmdWithOutput(runCmd, &runOutput, file,
successChannel, errorChannel)
+ }
+ } else {
+ // Other SDKs write logs to the log file on their own.
+ runCmdWithOutput(runCmd, &runOutput, &runError, successChannel,
errorChannel)
+ }
// Start of the monitoring of background tasks (run
step/cancellation/timeout)
ok, err = reconcileBackgroundTask(ctxWithTimeout, pipelineId,
cacheService, cancelChannel, successChannel)
@@ -179,6 +194,14 @@ func Process(ctx context.Context, cacheService
cache.Cache, lc *fs_tool.LifeCycl
}
if !ok {
// Run step is finished, but code contains some error (divide
by 0 for example)
+ if sdkEnv.ApacheBeamSdk == pb.Sdk_SDK_GO {
+ // For Go SDK stdErr was redirected to the log file.
+ errData, err := os.ReadFile(lc.GetAbsoluteLogFilePath())
+ if err != nil {
+ logger.Errorf("%s: error during read errors
from log file (go sdk): %s", pipelineId, err.Error())
+ }
+ runError.Write(errData)
+ }
_ = processRunError(ctxWithTimeout, errorChannel,
runError.Bytes(), pipelineId, cacheService, stopReadLogsChannel,
finishReadLogsChannel)
return
}
@@ -272,7 +295,7 @@ func GetLastIndex(ctx context.Context, cacheService
cache.Cache, key uuid.UUID,
}
// runCmdWithOutput runs command with keeping stdOut and stdErr
-func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError
*bytes.Buffer, successChannel chan bool, errorChannel chan error) {
+func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError io.Writer,
successChannel chan bool, errorChannel chan error) {
cmd.Stdout = stdOutput
cmd.Stderr = stdError
go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error)
{
@@ -411,7 +434,7 @@ func processCompileError(ctx context.Context, errorChannel
chan error, errorOutp
err := <-errorChannel
logger.Errorf("%s: Compile(): err: %s, output: %s\n", pipelineId,
err.Error(), errorOutput)
- if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.CompileOutput, "error: "+err.Error()+", output: "+string(errorOutput));
err != nil {
+ if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.CompileOutput, fmt.Sprintf("error: %s, output: %s", err.Error(),
string(errorOutput))); err != nil {
return err
}
return utils.SetToCache(ctx, cacheService, pipelineId, cache.Status,
pb.Status_STATUS_COMPILE_ERROR)
@@ -425,7 +448,7 @@ func processRunError(ctx context.Context, errorChannel chan
error, errorOutput [
err := <-errorChannel
logger.Errorf("%s: Run(): err: %s, output: %s\n", pipelineId,
err.Error(), errorOutput)
- if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.RunError, "error: "+err.Error()+", output: "+string(errorOutput)); err !=
nil {
+ if err := utils.SetToCache(ctx, cacheService, pipelineId,
cache.RunError, fmt.Sprintf("error: %s, output: %s", err.Error(),
string(errorOutput))); err != nil {
return err
}
diff --git
a/playground/backend/internal/setup_tools/builder/setup_builder_test.go
b/playground/backend/internal/setup_tools/builder/setup_builder_test.go
index 1dcbf4a..3868369 100644
--- a/playground/backend/internal/setup_tools/builder/setup_builder_test.go
+++ b/playground/backend/internal/setup_tools/builder/setup_builder_test.go
@@ -31,6 +31,9 @@ func TestSetupExecutor(t *testing.T) {
pipelineId := uuid.New()
sdk := pb.Sdk_SDK_JAVA
lc, err := fs_tool.NewLifeCycle(sdk, pipelineId, "")
+ if err != nil {
+ t.Error(err)
+ }
pipelineOptions := ""
executorConfig := &environment.ExecutorConfig{
CompileCmd: "MOCK_COMPILE_CMD",