lostluck commented on code in PR #21776:
URL: https://github.com/apache/beam/pull/21776#discussion_r893952091


##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) 
error {
        return nil
 }
 
+func memoryUsage() string {
+       m := runtime.MemStats{}
+       runtime.ReadMemStats(&m)
+       return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n 
Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, 
m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+       var states string
+       for bundleID, store := range w.metStore {
+               execStates := ""
+               for bundleID, state := range store.StateRegistry() {
+                       execStates += fmt.Sprintf("ID: %v Execution States: 
%#v,", bundleID, *state)
+
+               }
+               states += fmt.Sprintf("\nBundle ID: %v\nBundle State: 
%#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+       }
+       return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+       return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())

Review Comment:
   ```suggestion
        return fmt.Sprintf("State Cache:\n%+v", w.cache.CacheMetrics())
   ```
   
   It should be clear what cache this is taking about, and {0 0 0 0 0} isn't 
useful to anyway, so we use `%+v` to have the fields printed out as well, which 
makes the numbers useful.
   
   https://go.dev/play/p/q9db2d3a5Lv



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) 
error {
        return nil
 }
 
+func memoryUsage() string {
+       m := runtime.MemStats{}
+       runtime.ReadMemStats(&m)
+       return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n 
Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, 
m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+       var states string
+       for bundleID, store := range w.metStore {
+               execStates := ""
+               for bundleID, state := range store.StateRegistry() {
+                       execStates += fmt.Sprintf("ID: %v Execution States: 
%#v,", bundleID, *state)
+
+               }
+               states += fmt.Sprintf("\nBundle ID: %v\nBundle State: 
%#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+       }
+       return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+       return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())
+}
+
+func goroutineDump() string {
+       buf := make([]byte, 1<<16)
+       runtime.Stack(buf, true)

Review Comment:
   Instead of the raw runtime version, use the pprof version:
   
   `import "runtime/pprof"`
   ```
   profile := pprof.Lookup("goroutine")
   if profile != nil {
     // Use debug=1 to get the human readable consolidated goroutine output.
     profile.Write(b, 1)  
   }
   ```
   
   This makes the output easier to read as some repeated goroutines will 
instead be consolidated, and the duplication indicated with a count.
   
   (Note that a `*strings.Builder` implements `io.Writer` and can be passed 
right in. (the method is on the pointer)



##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -37,26 +38,15 @@ import (
        "google.golang.org/protobuf/types/known/durationpb"
 )
 
-// StatusAddress is a type of status endpoint address as an optional argument 
to harness.Main().
-type StatusAddress string
-
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
 // "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.

Review Comment:
   Please add documentation around "expected" Environment variables. TBH, I 
don't mind the options approach to pass into main, but the  fetching form the 
Env vars would then happen in the init.go.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) 
error {
        return nil
 }
 
+func memoryUsage() string {
+       m := runtime.MemStats{}
+       runtime.ReadMemStats(&m)
+       return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n 
Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, 
m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+       var states string
+       for bundleID, store := range w.metStore {
+               execStates := ""
+               for bundleID, state := range store.StateRegistry() {
+                       execStates += fmt.Sprintf("ID: %v Execution States: 
%#v,", bundleID, *state)
+
+               }
+               states += fmt.Sprintf("\nBundle ID: %v\nBundle State: 
%#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
+       }
+       return states
+}
+
+func (w *workerStatusHandler) cacheStats() string {
+       return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())
+}
+
+func goroutineDump() string {
+       buf := make([]byte, 1<<16)
+       runtime.Stack(buf, true)
+       return string(buf)
+}
+
 // reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
 // a response channel.
 func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
        defer w.wg.Done()
-       buf := make([]byte, 1<<16)
+
        for w.isAlive() {
                req, err := stub.Recv()
                if err != nil && err != io.EOF {
                        log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
                        return
                }
                log.Debugf(ctx, "RECV-status: %v", req.GetId())
-               runtime.Stack(buf, true)
-               response := &fnpb.WorkerStatusResponse{Id: req.GetId(), 
StatusInfo: string(buf)}
+
+               statusInfo := fmt.Sprintf("\n============Memory 
Usage============\n%s\n============Active Process Bundle 
States============\n%s\n============Cache 
Stats============\n%s\n============Goroutine Dump============\n%s\n", 
memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump())

Review Comment:
   Use a [strings.Builder](https://pkg.go.dev/strings#Builder) instead of 
trying to get everything into a single print output. It will be easier to read. 
I'd recommend simply passing the builder to each of the helper methods, and 
have each of the helper methods contribute their well formed output. Then these 
are still easy to unit test.



##########
sdks/go/test/integration/wordcount/wordcount.go:
##########
@@ -18,15 +18,13 @@ package wordcount
 
 import (
        "context"
-       "regexp"

Review Comment:
   We can remove this change. At best, we mostly want to remove the blank line 
between "strings" and "fmt".



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) 
error {
        return nil
 }
 
+func memoryUsage() string {
+       m := runtime.MemStats{}
+       runtime.ReadMemStats(&m)
+       return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n 
Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, 
m.Frees, m.HeapAlloc)

Review Comment:
   Aside from the previous string builder, we get *so much* information in 
MemStats, we should at least try to match what Java's putting out.
   
   https://pkg.go.dev/runtime#MemStats  -> Everything we get.
   Java's print out: 
https://github.com/apache/beam/blob/ffef8de04a93435e69faf3bf65efe11852cbd8dc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java#L622
 
   
   We can do the same level of readability improvement that Java is doing, 
though I don't think we can do "thrashing". We can show the GC target size, and 
`GCCPUFraction` gives us how much CPU time has been spent in GC in a way we can 
use as the percentage. Not sure about "thrashing" and "pushback", but this gets 
us close to the way there.
   
   Java's for reference:
   
https://github.com/apache/beam/blob/ffef8de04a93435e69faf3bf65efe11852cbd8dc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java#L411
   
   `NextGC` is useful since it sets the target of the next GC.
   
   `Mallocs` & `Frees` aren't super useful outside of a full heap dump (we care 
about memory, not counts). `HeapInuse` is more useful, as is `StackInuse` and 
`StackSys` (We'll need both for a complete picture, as Go tries to do as much 
as possible on the stack, instead of the heap, unlike Java which is All Heap).



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) 
error {
        return nil
 }
 
+func memoryUsage() string {
+       m := runtime.MemStats{}
+       runtime.ReadMemStats(&m)
+       return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n 
Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, 
m.Frees, m.HeapAlloc)
+}
+
+func (w *workerStatusHandler) activeProcessBundleStates() string {
+       var states string
+       for bundleID, store := range w.metStore {
+               execStates := ""
+               for bundleID, state := range store.StateRegistry() {
+                       execStates += fmt.Sprintf("ID: %v Execution States: 
%#v,", bundleID, *state)
+
+               }
+               states += fmt.Sprintf("\nBundle ID: %v\nBundle State: 
%#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)

Review Comment:
   Aside from making it use a strings.Builder, the only concern is to validate 
that we are being thread safe doing. We likely can improve the printout to be 
much clearer though.



##########
sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go:
##########
@@ -272,3 +272,8 @@ func (c *SideInputCache) evictElement(ctx context.Context) {
                }
        }
 }
+
+// CacheMetrics returns the cache metrics for current side input cache.
+func (c *SideInputCache) CacheMetrics() CacheMetrics {
+       return c.metrics

Review Comment:
   This method needs to be under the lock's critical section.
   
   Add
   
   ```
        c.mu.Lock()
        defer c.mu.Unlock()
   ```
   
   The return will provide a copy, but the copy could happen then things are 
mutating, causing a race condition error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to