This is an automated email from the ASF dual-hosted git repository.

ningyougang pushed a commit to branch support-array-result
in repository https://gitbox.apache.org/repos/asf/openwhisk-runtime-go.git

commit ba71f1beb8a129230f47217a8e2eec1dd3cd5566
Author: ning.yougang <[email protected]>
AuthorDate: Wed Jul 20 11:00:37 2022 +0800

    return user logs to run api
---
 openwhisk/executor.go         | 12 ++++++++----
 openwhisk/executor_test.go    |  8 --------
 openwhisk/initHandler_test.go |  8 ++++----
 openwhisk/logWriter.go        | 33 +++++++++++++++++++++++++++++++++
 openwhisk/runHandler.go       | 35 +++++++++++++++++++++++++++++++----
 openwhisk/util_test.go        | 11 +++++++++--
 6 files changed, 85 insertions(+), 22 deletions(-)

diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index 233f3cf..8427708 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -41,15 +41,20 @@ type Executor struct {
        input  io.WriteCloser
        output *bufio.Reader
        exited chan bool
+       logger chan string
 }
 
 // NewExecutor creates a child subprocess using the provided command line,
 // writing the logs in the given file.
 // You can then start it getting a communication channel
 func NewExecutor(logout *os.File, logerr *os.File, command string, env 
map[string]string, args ...string) (proc *Executor) {
+       stringChan := make(chan string, 100)
        cmd := exec.Command(command, args...)
-       cmd.Stdout = logout
-       cmd.Stderr = logerr
+       stdoutWritter := NewLogWriter("stdout", stringChan, logout)
+       cmd.Stdout = stdoutWritter
+
+       stderrWritter := NewLogWriter("stderr", stringChan, logerr)
+       cmd.Stderr = stderrWritter
        cmd.Env = []string{}
        for k, v := range env {
                cmd.Env = append(cmd.Env, k+"="+v)
@@ -73,6 +78,7 @@ func NewExecutor(logout *os.File, logerr *os.File, command 
string, env map[strin
                input,
                output,
                make(chan bool),
+               stringChan,
        }
 }
 
@@ -101,8 +107,6 @@ func (proc *Executor) Interact(in []byte) ([]byte, error) {
        case <-proc.exited:
                err = errors.New("command exited")
        }
-       proc.cmd.Stdout.Write([]byte(OutputGuard))
-       proc.cmd.Stderr.Write([]byte(OutputGuard))
        return out, err
 }
 
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 597643d..bfaf24f 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -60,8 +60,6 @@ func ExampleNewExecutor_bc() {
        // Output:
        // <nil>
        // 4
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
 
 func ExampleNewExecutor_hello() {
@@ -77,8 +75,6 @@ func ExampleNewExecutor_hello() {
        // <nil>
        // {"hello": "Mike"}
        // msg=hello Mike
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
 
 func ExampleNewExecutor_env() {
@@ -93,8 +89,6 @@ func ExampleNewExecutor_env() {
        // Output:
        // <nil>
        // { "env": "TEST_HELLO=WORLD TEST_HI=ALL"}
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
 
 func ExampleNewExecutor_ack() {
@@ -144,6 +138,4 @@ func ExampleNewExecutor_helloack() {
        // <nil>
        // {"hello": "Mike"}
        // msg=hello Mike
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index d176874..6a3fe3e 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -100,8 +100,8 @@ func Example_shell_nocompiler() {
        // Output:
        // 500 {"error":"no action defined yet"}
        // 200 {"ok":true}
-       // 200 {"hello": "Mike"}
-       // 200 {"hello": "world"}
+       // 200 {"hello":"Mike"}
+       // 200 {"hello":"world"}
        // msg=hello Mike
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
@@ -237,8 +237,8 @@ func Example_zip_init() {
        stopTestServer(ts, cur, log)
        // Output:
        // 200 {"ok":true}
-       // 200 {"python": "Hello, Mike"}
-       // 200 {"python": "Hello, World"}
+       // 200 {"python":"Hello, Mike"}
+       // 200 {"python":"Hello, World"}
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
diff --git a/openwhisk/logWriter.go b/openwhisk/logWriter.go
new file mode 100644
index 0000000..485f0db
--- /dev/null
+++ b/openwhisk/logWriter.go
@@ -0,0 +1,33 @@
+package openwhisk
+
+import (
+       "fmt"
+       "io"
+       "strings"
+       "time"
+)
+
+type LogWriter struct {
+       sender chan string
+       writer io.Writer
+       stream string
+}
+
+func (b *LogWriter) Write(p []byte) (n int, err error) {
+       size, err := b.writer.Write(p)
+       for _, str := range strings.Split(string(p), "\n") {
+               if len(str) != 0 {
+                       log := fmt.Sprintf("%s %s: %s", 
time.Now().Format(time.RFC3339Nano), b.stream, str)
+                       b.sender <- log
+               }
+       }
+       return size, err
+}
+
+func NewLogWriter(stream string, sender chan string, writer io.Writer) (proc 
*LogWriter) {
+       return &LogWriter{
+               sender,
+               writer,
+               stream,
+       }
+}
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 3a16781..6d9f9dc 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -25,6 +25,9 @@ import (
        "net/http"
 )
 
+// field name of user logs
+const LOG_FIELD = "__OW_LOGS"
+
 // ErrResponse is the response when there are errors
 type ErrResponse struct {
        Error string `json:"error"`
@@ -65,15 +68,32 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r 
*http.Request) {
                return
        }
 
+       defer ap.theExecutor.cmd.Stdout.Write([]byte(OutputGuard))
+       defer ap.theExecutor.cmd.Stderr.Write([]byte(OutputGuard))
+
        // remove newlines
        body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
 
+       // read logs until "stop" signal, this guarantee that all logs will be 
captured before send back to user
+       stopSignal := make(chan bool)
+       var logs []string
+       go func() {
+               for log := range ap.theExecutor.logger {
+                       if log == "stop" {
+                               break
+                       }
+                       logs = append(logs, log)
+               }
+               stopSignal <- true
+       }()
+
        // execute the action
        response, err := ap.theExecutor.Interact(body)
 
        // check for early termination
        if err != nil {
                Debug("WARNING! Command exited")
+               ap.theExecutor.logger <- "stop"
                ap.theExecutor = nil
                sendError(w, http.StatusBadRequest, fmt.Sprintf("command 
exited"))
                return
@@ -81,16 +101,23 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r 
*http.Request) {
        DebugLimit("received:", response, 120)
 
        // check if the answer is an object map
-       var objmap map[string]*json.RawMessage
+       var objmap map[string]interface{}
        err = json.Unmarshal(response, &objmap)
        if err != nil {
+               ap.theExecutor.logger <- "stop"
                sendError(w, http.StatusBadGateway, "The action did not return 
a dictionary.")
                return
        }
 
+       // send "stop" signal and wait for log reading finished
+       ap.theExecutor.logger <- "stop"
+       <-stopSignal
+       objmap[LOG_FIELD] = logs
+       newResponse, _ := json.Marshal(objmap)
+
        w.Header().Set("Content-Type", "application/json")
-       w.Header().Set("Content-Length", fmt.Sprintf("%d", len(response)))
-       numBytesWritten, err := w.Write(response)
+       w.Header().Set("Content-Length", fmt.Sprintf("%d", len(newResponse)))
+       numBytesWritten, err := w.Write(newResponse)
 
        // flush output
        if f, ok := w.(http.Flusher); ok {
@@ -102,7 +129,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r 
*http.Request) {
                sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error 
writing response: %v", err))
                return
        }
-       if numBytesWritten != len(response) {
+       if numBytesWritten != len(newResponse) {
                sendError(w, http.StatusInternalServerError, fmt.Sprintf("Only 
wrote %d of %d bytes to response", numBytesWritten, len(response)))
                return
        }
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index fc0d810..156d00c 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -81,12 +81,19 @@ func doRun(ts *httptest.Server, message string) {
                message = `{"name":"Mike"}`
        }
        resp, status, err := doPost(ts.URL+"/run", `{ "value": `+message+`}`)
+
+       // remove `__OW_LOGS` field
+       var objmap map[string]*json.RawMessage
+       json.Unmarshal([]byte(resp), &objmap)
+       delete(objmap, "__OW_LOGS")
+       newResp, _ := json.Marshal(objmap)
+
        if err != nil {
                fmt.Println(err)
        } else {
-               fmt.Printf("%d %s", status, resp)
+               fmt.Printf("%d %s", status, newResp)
        }
-       if !strings.HasSuffix(resp, "\n") {
+       if !strings.HasSuffix(string(newResp), "\n") {
                fmt.Println()
        }
 }

Reply via email to