This is an automated email from the ASF dual-hosted git repository. ningyougang pushed a commit to branch support-array-result in repository https://gitbox.apache.org/repos/asf/openwhisk-runtime-go.git
commit bcfa8e8d924cb1247c5f5a4c0fe2f0da1219ebe9 Author: ning.yougang <[email protected]> AuthorDate: Wed Jul 20 11:21:45 2022 +0800 Wait for log guards before return --- common/gobuild.py.launcher.go | 5 +++++ golang1.17/lib/launcher.go | 5 +++++ golang1.18/lib/launcher.go | 5 +++++ openwhisk/_test/hello.sh | 2 ++ openwhisk/_test/pysample/lib/exec.py | 8 +++++++- openwhisk/compiler_test.go | 6 ++++++ openwhisk/executor.go | 1 + openwhisk/executor_test.go | 2 ++ openwhisk/initHandler_test.go | 8 ++++---- openwhisk/runHandler.go | 18 ++++++++++-------- .../actionContainers/ActionLoopBasicGoTests.scala | 2 ++ .../actionContainers/ActionLoopBasicTests.scala | 13 ++++++++++++- .../actionContainers/ActionLoopContainerTests.scala | 2 ++ 13 files changed, 63 insertions(+), 14 deletions(-) diff --git a/common/gobuild.py.launcher.go b/common/gobuild.py.launcher.go index 386c126..efd01ed 100644 --- a/common/gobuild.py.launcher.go +++ b/common/gobuild.py.launcher.go @@ -28,6 +28,9 @@ import ( "strings" ) +// log guard +const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n" + func main() { // debugging var debug = os.Getenv("OW_DEBUG") != "" @@ -106,6 +109,8 @@ func main() { if debug { log.Printf("'<<<%s'<<<", output) } + fmt.Fprint(os.Stdout, OutputGuard) + fmt.Fprint(os.Stderr, OutputGuard) fmt.Fprintf(out, "%s\n", output) } } diff --git a/golang1.17/lib/launcher.go b/golang1.17/lib/launcher.go index 3f95c15..5cb7fea 100644 --- a/golang1.17/lib/launcher.go +++ b/golang1.17/lib/launcher.go @@ -28,6 +28,9 @@ import ( "strings" ) +// log guard +const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n" + // OwExecutionEnv is the execution environment set at compile time var OwExecutionEnv = "" @@ -117,6 +120,8 @@ func main() { if debug { log.Printf("<<<'%s'<<<", output) } + fmt.Fprint(os.Stdout, OutputGuard) + fmt.Fprint(os.Stderr, OutputGuard) fmt.Fprintf(out, "%s\n", output) } } diff --git a/golang1.18/lib/launcher.go b/golang1.18/lib/launcher.go index 3f95c15..5cb7fea 100644 --- a/golang1.18/lib/launcher.go +++ b/golang1.18/lib/launcher.go @@ -28,6 +28,9 @@ import ( "strings" ) +// log guard +const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n" + // OwExecutionEnv is the execution environment set at compile time var OwExecutionEnv = "" @@ -117,6 +120,8 @@ func main() { if debug { log.Printf("<<<'%s'<<<", output) } + fmt.Fprint(os.Stdout, OutputGuard) + fmt.Fprint(os.Stderr, OutputGuard) fmt.Fprintf(out, "%s\n", output) } } diff --git a/openwhisk/_test/hello.sh b/openwhisk/_test/hello.sh index d8ddb7f..e84ff56 100755 --- a/openwhisk/_test/hello.sh +++ b/openwhisk/_test/hello.sh @@ -19,5 +19,7 @@ while read line do name="$(echo $line | jq -r .value.name)" echo msg="hello $name" + echo 'XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX' + echo 'XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX' >&2 echo '{"hello": "'$name'"}' >&3 done diff --git a/openwhisk/_test/pysample/lib/exec.py b/openwhisk/_test/pysample/lib/exec.py index 3bf305e..4c7a521 100644 --- a/openwhisk/_test/pysample/lib/exec.py +++ b/openwhisk/_test/pysample/lib/exec.py @@ -19,11 +19,13 @@ from __future__ import print_function import os +import sys import json from action.main import main inp = os.fdopen(0, "rb") out = os.fdopen(3, "wb") +OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" while True: while True: line = inp.readline() @@ -33,5 +35,9 @@ while True: payload = args["value"] res = main(payload) out.write(json.dumps(res, ensure_ascii=False).encode('utf-8')) - out.write("\n") + print(OutputGuard, file=sys.stdout) + print(OutputGuard, file=sys.stderr) + out.write(b"\n") + sys.stdout.flush() + sys.stderr.flush() out.flush() diff --git a/openwhisk/compiler_test.go b/openwhisk/compiler_test.go index aa032f6..331292c 100644 --- a/openwhisk/compiler_test.go +++ b/openwhisk/compiler_test.go @@ -63,6 +63,8 @@ func Example_cli_compiler() { // false // _test/compile/0/bin/exec: application/x-executable // name=Mike + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // {"message":"Hello, Mike!"} // true // false @@ -80,6 +82,8 @@ func Example_hello() { // Output: // _test/compile/1/bin/exec: application/x-executable // name=Mike + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // {"hello":"Hello, Mike!"} } @@ -96,6 +100,8 @@ func Example_package() { // _test/compile/2/bin/exec: application/x-executable // Main // Hello, Mike + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // {"greetings":"Hello, Mike"} } diff --git a/openwhisk/executor.go b/openwhisk/executor.go index 8427708..bf81aac 100644 --- a/openwhisk/executor.go +++ b/openwhisk/executor.go @@ -30,6 +30,7 @@ import ( // OutputGuard constant string const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n" +const OutputGuardRaw = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" // DefaultTimeoutStart to wait for a process to start var DefaultTimeoutStart = 5 * time.Millisecond diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go index bfaf24f..4a8251f 100644 --- a/openwhisk/executor_test.go +++ b/openwhisk/executor_test.go @@ -75,6 +75,8 @@ func ExampleNewExecutor_hello() { // <nil> // {"hello": "Mike"} // msg=hello Mike + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX + // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX } func ExampleNewExecutor_env() { diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go index 6a3fe3e..ba9c044 100644 --- a/openwhisk/initHandler_test.go +++ b/openwhisk/initHandler_test.go @@ -52,7 +52,7 @@ func Example_bininit_nocompiler() { doInit(ts, initBinary("_test/hello_greeting", "")) doRun(ts, "") stopTestServer(ts, cur, log) - // Output: + // Unordered output: // 500 {"error":"no action defined yet"} // 200 {"ok":true} // 200 {"message":"Hello, Mike!"} @@ -76,7 +76,7 @@ func Example_zipinit_nocompiler() { doInit(ts, initBinary("_test/hello_message.zip", "")) doRun(ts, "") stopTestServer(ts, cur, log) - // Output: + // Unordered output: // 500 {"error":"no action defined yet"} // 200 {"ok":true} // 200 {"greetings":"Hello, Mike"} @@ -97,7 +97,7 @@ func Example_shell_nocompiler() { doRun(ts, "") doRun(ts, `{"name":"world"}`) stopTestServer(ts, cur, log) - // Output: + // Unordered output: // 500 {"error":"no action defined yet"} // 200 {"ok":true} // 200 {"hello":"Mike"} @@ -120,7 +120,7 @@ func Example_main_nocompiler() { doInit(ts, initBinary("_test/hello_greeting", "greeting")) doRun(ts, "") stopTestServer(ts, cur, log) - // Output: + // Unordered output: // 500 {"error":"no action defined yet"} // 200 {"ok":true} // 200 {"message":"Hello, Mike!"} diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go index 6d9f9dc..096eeeb 100644 --- a/openwhisk/runHandler.go +++ b/openwhisk/runHandler.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strings" ) // field name of user logs @@ -68,18 +69,19 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { return } - defer ap.theExecutor.cmd.Stdout.Write([]byte(OutputGuard)) - defer ap.theExecutor.cmd.Stderr.Write([]byte(OutputGuard)) - // remove newlines body = bytes.Replace(body, []byte("\n"), []byte(""), -1) - // read logs until "stop" signal, this guarantee that all logs will be captured before send back to user + // read logs until two sentinel logs are got, this guarantee that all logs will be captured before send back to user stopSignal := make(chan bool) var logs []string go func() { + var sentinel = 0 for log := range ap.theExecutor.logger { - if log == "stop" { + if strings.Contains(log, OutputGuardRaw) { + sentinel += 1 + } + if sentinel == 2 { break } logs = append(logs, log) @@ -93,7 +95,8 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { // check for early termination if err != nil { Debug("WARNING! Command exited") - ap.theExecutor.logger <- "stop" + ap.outFile.Write([]byte(OutputGuard)) + ap.errFile.Write([]byte(OutputGuard)) ap.theExecutor = nil sendError(w, http.StatusBadRequest, fmt.Sprintf("command exited")) return @@ -109,8 +112,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { return } - // send "stop" signal and wait for log reading finished - ap.theExecutor.logger <- "stop" + // wait for log reading finished <-stopSignal objmap[LOG_FIELD] = logs newResponse, _ := json.Marshal(objmap) diff --git a/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicGoTests.scala b/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicGoTests.scala index c283852..9684d77 100644 --- a/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicGoTests.scala +++ b/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicGoTests.scala @@ -55,6 +55,8 @@ abstract class ActionLoopBasicGoTests else ""} | reader.ReadBytes('\\n') | fmt.Fprintln(out, \"a string but not a map\") + | fmt.Fprintln(os.Stdout, "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX") + | fmt.Fprintln(os.Stderr, "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX") | reader.ReadBytes('\\n') |} """.stripMargin) diff --git a/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicTests.scala b/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicTests.scala index e3edffe..60a082a 100644 --- a/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicTests.scala +++ b/tests/src/test/scala/runtime/actionContainers/ActionLoopBasicTests.scala @@ -42,6 +42,8 @@ class ActionLoopBasicTests extends BasicActionRunnerTests with WskActorSystem { override val testNotReturningJson = TestConfig("""#!/bin/bash |read line |echo '"not json"' >&3 + |echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + |echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 |read line |""".stripMargin) @@ -50,6 +52,8 @@ class ActionLoopBasicTests extends BasicActionRunnerTests with WskActorSystem { |do | echo "hello stdout" | echo "hello stderr" >&2 + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 | echo "$line" | jq -c .value >&3 |done """.stripMargin) @@ -61,6 +65,8 @@ class ActionLoopBasicTests extends BasicActionRunnerTests with WskActorSystem { | delimiter="$(echo "$line" | jq -r ".value.delimiter")" | msg="$delimiter ☃ $delimiter" | echo "$msg" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 | echo "{\"winter\": \"$msg\"}" >&3 |done """.stripMargin) @@ -77,6 +83,8 @@ class ActionLoopBasicTests extends BasicActionRunnerTests with WskActorSystem { | __OW_ACTION_NAME="$(echo "$line" | jq -r .action_name)" | __OW_ACTION_VERSION="$(echo "$line" | jq -r .action_version)" | __OW_DEADLINE="$(echo "$line" | jq -r .deadline)" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 | echo >&3 "{ \ | \"api_host\": \"$__OW_API_HOST\", \ | \"api_key\": \"$__OW_API_KEY\", \ @@ -91,7 +99,10 @@ class ActionLoopBasicTests extends BasicActionRunnerTests with WskActorSystem { val echoSh = """|#!/bin/bash |while read line - |do echo "$line" | jq -c .value >&3 + |do + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 + | echo "$line" | jq -c .value >&3 |done """.stripMargin diff --git a/tests/src/test/scala/runtime/actionContainers/ActionLoopContainerTests.scala b/tests/src/test/scala/runtime/actionContainers/ActionLoopContainerTests.scala index 86f69e4..109ba15 100644 --- a/tests/src/test/scala/runtime/actionContainers/ActionLoopContainerTests.scala +++ b/tests/src/test/scala/runtime/actionContainers/ActionLoopContainerTests.scala @@ -50,6 +50,8 @@ class ActionLoopContainerTests | fi | echo "name=$$name" | hello="Hello, $$name" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" + | echo "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX" >&2 | echo '{"${main}":"'$$hello'"}' >&3 |done |""".stripMargin
