cozos commented on code in PR #38246:
URL: https://github.com/apache/beam/pull/38246#discussion_r3114637931
##########
sdks/python/container/boot.go:
##########
@@ -314,23 +314,47 @@ func launchSDKProcess() error {
var wg sync.WaitGroup
wg.Add(len(workerIds))
- for _, workerId := range workerIds {
- go func(workerId string) {
- defer wg.Done()
-
- bufLogger := tools.NewBufferedLogger(logger)
- errorCount := 0
- for {
- childPids.mu.Lock()
- if childPids.canceled {
- childPids.mu.Unlock()
- return
- }
- logger.Printf(ctx, "Executing Python (worker
%v): python %v", workerId, strings.Join(args, " "))
- cmd :=
StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger,
bufLogger, "python", args...)
- childPids.v = append(childPids.v,
cmd.Process.Pid)
- childPids.mu.Unlock()
-
+for _, workerId := range workerIds {
+ go func(workerId string) {
+ defer wg.Done()
+
+ workerCtx := grpcx.WriteWorkerID(ctx, workerId)
Review Comment:
In my experiment I got this working using `context.Background()`, which
creates a new root context, instead of using the existing `ctx`.
1. Looking at the code I believe it will append every time
```go
func WriteWorkerID(ctx context.Context, id string) context.Context {
md := metadata.New(map[string]string{
idKey: id,
})
if old, ok := metadata.FromOutgoingContext(ctx); ok {
md = metadata.Join(md, old)
}
return metadata.NewOutgoingContext(ctx, md)
}
```
```go
// Join joins any number of mds into a single MD.
//
// The order of values for each key is determined by the order in which the
mds
// containing those values are presented to Join.
func Join(mds ...MD) MD {
out := MD{}
for _, md := range mds {
for k, v := range md {
out[k] = append(out[k], v...)
}
}
return out
}
```
2. It seems like `WriteWorkerId` does not mutate the existing `ctx` but
creates a new one via `&valueCtx{parent, key, val}`
```go
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}
```
Having said that, it seems to me creating a new context with
`context.Background()` is the cleanest option.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]