csantanapr closed pull request #58: fixed unsynced log bugs
URL: https://github.com/apache/incubator-openwhisk-runtime-go/pull/58
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/openwhisk/compiler.go b/openwhisk/compiler.go
index a948547..412a67b 100644
--- a/openwhisk/compiler.go
+++ b/openwhisk/compiler.go
@@ -24,7 +24,7 @@ import (
"os/exec"
"runtime"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
// this is only to let test run on OSX
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index ad1b549..1658fe8 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -44,14 +44,14 @@ type Executor struct {
_output *bufio.Reader
_logout *bufio.Reader
_logerr *bufio.Reader
- _outbuf *os.File
- _errbuf *os.File
+ _outbuf *bufio.Writer
+ _errbuf *bufio.Writer
}
// NewExecutor creates a child subprocess using the provided command line,
// writing the logs in the given file.
// You can then start it getting a communication channel
-func NewExecutor(outbuf *os.File, errbuf *os.File, command string, args
...string) (proc *Executor) {
+func NewExecutor(logout *os.File, logerr *os.File, command string, args
...string) (proc *Executor) {
cmd := exec.Command(command, args...)
cmd.Env = []string{
"__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
@@ -85,6 +85,8 @@ func NewExecutor(outbuf *os.File, errbuf *os.File, command
string, args ...strin
pout := bufio.NewReader(pipeOut)
sout := bufio.NewReader(stdout)
serr := bufio.NewReader(stderr)
+ outbuf := bufio.NewWriter(logout)
+ errbuf := bufio.NewWriter(logerr)
return &Executor{
make(chan []byte),
@@ -129,17 +131,19 @@ func (proc *Executor) run() {
Debug("run: end")
}
-func drain(ch chan string, out *os.File) {
+func drain(ch chan string, out *bufio.Writer) {
for loop := true; loop; {
runtime.Gosched()
select {
case buf := <-ch:
fmt.Fprint(out, buf)
+ out.Flush()
case <-time.After(DefaultTimeoutDrain):
loop = false
}
}
fmt.Fprintln(out, OutputGuard)
+ out.Flush()
}
// manage copying stdout and stder in output
@@ -152,15 +156,16 @@ func (proc *Executor) logger() {
chErr := make(chan string)
go _collect(chErr, proc._logerr)
- // wait for the signal
+ // loop draining the loop until asked to exit
for <-proc.log {
- // flush stdout
+ // drain stdout
+ Debug("draining stdout")
drain(chOut, proc._outbuf)
- // flush stderr
+ // drain stderr
+ Debug("draining stderr")
drain(chErr, proc._errbuf)
+ proc.log <- true
}
- proc._outbuf.Sync()
- proc._errbuf.Sync()
Debug("logger: end")
}
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 6a649ed..8504fd9 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -51,19 +51,18 @@ func ExampleNewExecutor_bc() {
proc := NewExecutor(log, log, "_test/bc.sh")
err := proc.Start()
fmt.Println(err)
- //proc.log <- true
proc.io <- []byte("2+2")
fmt.Printf("%s", <-proc.io)
+ proc.log <- true
+ <-proc.log
// and now, exit detection
proc.io <- []byte("quit")
- proc.log <- true
select {
case in := <-proc.io:
fmt.Printf("%s", in)
case <-proc.exit:
fmt.Println("exit")
}
- waitabit()
proc.Stop()
dump(log)
// Output:
@@ -82,9 +81,8 @@ func ExampleNewExecutor_hello() {
proc.io <- []byte(`{"value":{"name":"Mike"}}`)
fmt.Printf("%s", <-proc.io)
proc.log <- true
- waitabit()
+ <-proc.log
proc.Stop()
- waitabit()
_, ok := <-proc.io
fmt.Printf("io %v\n", ok)
dump(log)
@@ -111,10 +109,9 @@ func ExampleNewExecutor_term() {
exited = true
}
proc.log <- true
+ <-proc.log
fmt.Printf("exit %v\n", exited)
- waitabit()
proc.Stop()
- waitabit()
_, ok := <-proc.io
fmt.Printf("io %v\n", ok)
dump(log)
diff --git a/openwhisk/extractor.go b/openwhisk/extractor.go
index 70b1520..ca751f8 100644
--- a/openwhisk/extractor.go
+++ b/openwhisk/extractor.go
@@ -27,7 +27,7 @@ import (
"path/filepath"
"strconv"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
func unzip(src []byte, dest string) error {
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index 803684f..81f00dc 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -105,6 +105,8 @@ func Example_shell_nocompiler() {
// 500 {"error":"no action defined yet"}
// msg=hello Mike
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
// Goodbye!
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
}
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 1495189..2463255 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -64,8 +64,9 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r
*http.Request) {
body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
// execute the action
- // and check for early termination
ap.theExecutor.io <- body
+
+ // check for early termination
var response []byte
var exited bool
select {
@@ -75,6 +76,10 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r
*http.Request) {
exited = true
}
+ // flush the logs sending the activation message at the end
+ ap.theExecutor.log <- true
+ <-ap.theExecutor.log
+
// check for early termination
if exited {
Debug("WARNING! Command exited")
@@ -84,9 +89,6 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r
*http.Request) {
}
DebugLimit("received:", response, 120)
- // flush the logs sending the activation message at the end
- ap.theExecutor.log <- true
-
// check if the answer is an object map
var objmap map[string]*json.RawMessage
err = json.Unmarshal(response, &objmap)
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index 9ebcbc0..b0d8084 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -35,7 +35,7 @@ import (
"testing"
"time"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
func startTestServer(compiler string) (*httptest.Server, string, *os.File) {
@@ -157,7 +157,8 @@ func exists(dir, filename string) error {
var pseudoElfForMacType = filetype.NewType("elf", "darwin/mach")
func pseudoElfForMacMatcher(buf []byte) bool {
- return len(buf) > 4 && buf[0] == 0xcf && buf[1] == 0xfa && buf[2] ==
0xed && buf[3] == 0xfe
+ return len(buf) > 4 && ((buf[0] == 0xcf && buf[1] == 0xfa && buf[2] ==
0xed && buf[3] == 0xfe) ||
+ (buf[0] == 0xce && buf[1] == 0xfa && buf[2] == 0xed && buf[3]
== 0xfe))
}
func detect(dir, filename string) string {
@@ -177,7 +178,7 @@ func removeLineNr(out string) string {
return re.ReplaceAllString(out, "::")
}
func TestMain(m *testing.M) {
- //Debugging = true // enable debug
+ // Debugging = true // enable debug
// silence those annoying logs
if !Debugging {
log.SetOutput(ioutil.Discard)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services