lostluck commented on code in PR #22225:
URL: https://github.com/apache/beam/pull/22225#discussion_r918419721


##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+       "bufio"
+       "context"
+       "os"
+
+       "runtime/debug"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Review Comment:
   I think this approach is OK if we file a P3 issue noting that it's currently 
cloud vendor specific.
   
   ---
   
   This is the bit where the container becomes "Assumes Google Cloud" instead 
of being more flexible, and requires updating/maintenance and a new version (or 
forces custom containers) if we wanted to support alternative blob stores. It's 
not very open as a result.
   
   This is where we'd want to *not* have the `gcs` import, and be able to rely 
on whatever the user program has imported/enabled, so they can write to ec2 or 
a hdfs, or a network filesystem mount or something.
   
   On the plus side, we could make this dependency explicit in the boot 
container if we move this _ import into the boot.go instead. This code could 
also be re-used in any other approach with "restarting" the binary to do the 
upload.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+       "bufio"
+       "context"
+       "os"
+
+       "runtime/debug"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite 
string) error {
+       heapDumpLoc := "heapDump"
+
+       f, err := os.Create(heapDumpLoc)
+       if err != nil {
+               return err
+       }
+       debug.WriteHeapDump(f.Fd())
+
+       heapDump, err := os.Open(heapDumpLoc)
+       if err != nil {
+               return err
+       }
+       defer heapDump.Close()
+       heapDumpReader := bufio.NewReader(heapDump)
+
+       fs, err := filesystem.New(ctx, dest)
+       if err != nil {
+               return err
+       }
+       defer fs.Close()
+       fd, err := fs.OpenWrite(ctx, dest)
+       if err != nil {
+               return err
+       }
+       buf := bufio.NewWriterSize(fd, 1<<20) // use 1MB buffer
+
+       buf.WriteString(additionalDataToWrite)

Review Comment:
   Why are we adding additional information to the heap dump file? Wouldn't 
this corrupt whatever file format the heap is in, preventing using the tools to 
analyse it?



##########
sdks/go/container/boot.go:
##########
@@ -115,7 +119,19 @@ func main() {
                os.Setenv("RUNNER_CAPABILITIES", 
strings.Join(info.GetRunnerCapabilities(), " "))
        }
 
-       log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+       err = execx.Execute(prog, args...)
+
+       if err != nil {
+               var opt runtime.RawOptionsWrapper
+               err = json.Unmarshal([]byte(options), &opt)

Review Comment:
   As written the error from execx.Execute is never read and this error will 
overwrite it. Suggest changing it to `err := json...` so the original err is 
shadowed instead and will be available for printing in the final log.Fatal.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+       "bufio"
+       "context"
+       "os"
+
+       "runtime/debug"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite 
string) error {

Review Comment:
   Always have documentation for Exported functions. In this case: at least 
point to instructions on how to read the file/make use of it.



##########
sdks/go/container/boot.go:
##########
@@ -115,7 +119,19 @@ func main() {
                os.Setenv("RUNNER_CAPABILITIES", 
strings.Join(info.GetRunnerCapabilities(), " "))
        }
 
-       log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+       err = execx.Execute(prog, args...)
+
+       if err != nil {
+               var opt runtime.RawOptionsWrapper
+               err = json.Unmarshal([]byte(options), &opt)
+               if err == nil {
+                       if tempLocation, ok := 
opt.Options.Options["temp_location"]; ok {
+                               diagnostics.UploadHeapDump(ctx, 
fmt.Sprintf("%v/heapDumps/dump-%v-%d", strings.TrimSuffix(tempLocation, "/"), 
*id, time.Now().Unix()), fmt.Sprintf("Options %v", opt))

Review Comment:
   Please add a `\n` after the additional data so it's easier to extract a 
valid heap dump from the resulting file. As it stands I can't take the sample 
file, and get pprof to render the heap in question.



##########
sdks/go/pkg/beam/util/diagnostics/diagnostics.go:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package diagnostics
+
+import (
+       "bufio"
+       "context"
+       "os"
+
+       "runtime/debug"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
+)
+
+func UploadHeapDump(ctx context.Context, dest string, additionalDataToWrite 
string) error {
+       heapDumpLoc := "heapDump"
+
+       f, err := os.Create(heapDumpLoc)
+       if err != nil {
+               return err
+       }
+       debug.WriteHeapDump(f.Fd())

Review Comment:
   I'm not sure if this is getting what we want now that I'm looking into this.
   
   1. It's not clear to me that we're actually getting the heap from the 
spawned process. So what are we actually getting? The example heap dump is only 
~8MB, which feels closer to what the launcher process is probably using, rather 
than the worker on death...
   2. AFAICT we might not actually want a debug.WriteHeapDump dump generally 
anyway. It's the whole heap. Every Byte. Attempting to actually read the linked 
example dump has been... difficult. 
https://pkg.go.dev/runtime/pprof#WriteHeapProfile is likely easier for most 
folks to deal with and will indicate where the leaks are coming from if any. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to