This is an automated email from the ASF dual-hosted git repository. ningyougang pushed a commit to branch support-array-result in repository https://gitbox.apache.org/repos/asf/openwhisk-runtime-go.git
commit ba71f1beb8a129230f47217a8e2eec1dd3cd5566 Author: ning.yougang <[email protected]> AuthorDate: Wed Jul 20 11:00:37 2022 +0800 return user logs to run api --- openwhisk/executor.go | 12 ++++++++---- openwhisk/executor_test.go | 8 -------- openwhisk/initHandler_test.go | 8 ++++---- openwhisk/logWriter.go | 33 +++++++++++++++++++++++++++++++++ openwhisk/runHandler.go | 35 +++++++++++++++++++++++++++++++---- openwhisk/util_test.go | 11 +++++++++-- 6 files changed, 85 insertions(+), 22 deletions(-) diff --git a/openwhisk/executor.go b/openwhisk/executor.go index 233f3cf..8427708 100644 --- a/openwhisk/executor.go +++ b/openwhisk/executor.go @@ -41,15 +41,20 @@ type Executor struct { input io.WriteCloser output *bufio.Reader exited chan bool + logger chan string } // NewExecutor creates a child subprocess using the provided command line, // writing the logs in the given file. // You can then start it getting a communication channel func NewExecutor(logout *os.File, logerr *os.File, command string, env map[string]string, args ...string) (proc *Executor) { + stringChan := make(chan string, 100) cmd := exec.Command(command, args...) - cmd.Stdout = logout - cmd.Stderr = logerr + stdoutWritter := NewLogWriter("stdout", stringChan, logout) + cmd.Stdout = stdoutWritter + + stderrWritter := NewLogWriter("stderr", stringChan, logerr) + cmd.Stderr = stderrWritter cmd.Env = []string{} for k, v := range env { cmd.Env = append(cmd.Env, k+"="+v) @@ -73,6 +78,7 @@ func NewExecutor(logout *os.File, logerr *os.File, command string, env map[strin input, output, make(chan bool), + stringChan, } } @@ -101,8 +107,6 @@ func (proc *Executor) Interact(in []byte) ([]byte, error) { case <-proc.exited: err = errors.New("command exited") } - proc.cmd.Stdout.Write([]byte(OutputGuard)) - proc.cmd.Stderr.Write([]byte(OutputGuard)) return out, err } diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go index 597643d..bfaf24f 100644 --- a/openwhisk/executor_test.go +++ b/openwhisk/executor_test.go @@ -60,8 +60,6 @@ func ExampleNewExecutor_bc() { // Output: // <nil> // 4 - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX } func ExampleNewExecutor_hello() { @@ -77,8 +75,6 @@ func ExampleNewExecutor_hello() { // <nil> // {"hello": "Mike"} // msg=hello Mike - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX } func ExampleNewExecutor_env() { @@ -93,8 +89,6 @@ func ExampleNewExecutor_env() { // Output: // <nil> // { "env": "TEST_HELLO=WORLD TEST_HI=ALL"} - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX } func ExampleNewExecutor_ack() { @@ -144,6 +138,4 @@ func ExampleNewExecutor_helloack() { // <nil> // {"hello": "Mike"} // msg=hello Mike - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX - // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX } diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go index d176874..6a3fe3e 100644 --- a/openwhisk/initHandler_test.go +++ b/openwhisk/initHandler_test.go @@ -100,8 +100,8 @@ func Example_shell_nocompiler() { // Output: // 500 {"error":"no action defined yet"} // 200 {"ok":true} - // 200 {"hello": "Mike"} - // 200 {"hello": "world"} + // 200 {"hello":"Mike"} + // 200 {"hello":"world"} // msg=hello Mike // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX @@ -237,8 +237,8 @@ func Example_zip_init() { stopTestServer(ts, cur, log) // Output: // 200 {"ok":true} - // 200 {"python": "Hello, Mike"} - // 200 {"python": "Hello, World"} + // 200 {"python":"Hello, Mike"} + // 200 {"python":"Hello, World"} // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX diff --git a/openwhisk/logWriter.go b/openwhisk/logWriter.go new file mode 100644 index 0000000..485f0db --- /dev/null +++ b/openwhisk/logWriter.go @@ -0,0 +1,33 @@ +package openwhisk + +import ( + "fmt" + "io" + "strings" + "time" +) + +type LogWriter struct { + sender chan string + writer io.Writer + stream string +} + +func (b *LogWriter) Write(p []byte) (n int, err error) { + size, err := b.writer.Write(p) + for _, str := range strings.Split(string(p), "\n") { + if len(str) != 0 { + log := fmt.Sprintf("%s %s: %s", time.Now().Format(time.RFC3339Nano), b.stream, str) + b.sender <- log + } + } + return size, err +} + +func NewLogWriter(stream string, sender chan string, writer io.Writer) (proc *LogWriter) { + return &LogWriter{ + sender, + writer, + stream, + } +} diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go index 3a16781..6d9f9dc 100644 --- a/openwhisk/runHandler.go +++ b/openwhisk/runHandler.go @@ -25,6 +25,9 @@ import ( "net/http" ) +// field name of user logs +const LOG_FIELD = "__OW_LOGS" + // ErrResponse is the response when there are errors type ErrResponse struct { Error string `json:"error"` @@ -65,15 +68,32 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { return } + defer ap.theExecutor.cmd.Stdout.Write([]byte(OutputGuard)) + defer ap.theExecutor.cmd.Stderr.Write([]byte(OutputGuard)) + // remove newlines body = bytes.Replace(body, []byte("\n"), []byte(""), -1) + // read logs until "stop" signal, this guarantee that all logs will be captured before send back to user + stopSignal := make(chan bool) + var logs []string + go func() { + for log := range ap.theExecutor.logger { + if log == "stop" { + break + } + logs = append(logs, log) + } + stopSignal <- true + }() + // execute the action response, err := ap.theExecutor.Interact(body) // check for early termination if err != nil { Debug("WARNING! Command exited") + ap.theExecutor.logger <- "stop" ap.theExecutor = nil sendError(w, http.StatusBadRequest, fmt.Sprintf("command exited")) return @@ -81,16 +101,23 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { DebugLimit("received:", response, 120) // check if the answer is an object map - var objmap map[string]*json.RawMessage + var objmap map[string]interface{} err = json.Unmarshal(response, &objmap) if err != nil { + ap.theExecutor.logger <- "stop" sendError(w, http.StatusBadGateway, "The action did not return a dictionary.") return } + // send "stop" signal and wait for log reading finished + ap.theExecutor.logger <- "stop" + <-stopSignal + objmap[LOG_FIELD] = logs + newResponse, _ := json.Marshal(objmap) + w.Header().Set("Content-Type", "application/json") - w.Header().Set("Content-Length", fmt.Sprintf("%d", len(response))) - numBytesWritten, err := w.Write(response) + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(newResponse))) + numBytesWritten, err := w.Write(newResponse) // flush output if f, ok := w.(http.Flusher); ok { @@ -102,7 +129,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error writing response: %v", err)) return } - if numBytesWritten != len(response) { + if numBytesWritten != len(newResponse) { sendError(w, http.StatusInternalServerError, fmt.Sprintf("Only wrote %d of %d bytes to response", numBytesWritten, len(response))) return } diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go index fc0d810..156d00c 100644 --- a/openwhisk/util_test.go +++ b/openwhisk/util_test.go @@ -81,12 +81,19 @@ func doRun(ts *httptest.Server, message string) { message = `{"name":"Mike"}` } resp, status, err := doPost(ts.URL+"/run", `{ "value": `+message+`}`) + + // remove `__OW_LOGS` field + var objmap map[string]*json.RawMessage + json.Unmarshal([]byte(resp), &objmap) + delete(objmap, "__OW_LOGS") + newResp, _ := json.Marshal(objmap) + if err != nil { fmt.Println(err) } else { - fmt.Printf("%d %s", status, resp) + fmt.Printf("%d %s", status, newResp) } - if !strings.HasSuffix(resp, "\n") { + if !strings.HasSuffix(string(newResp), "\n") { fmt.Println() } }
