lostluck commented on code in PR #33438:
URL: https://github.com/apache/beam/pull/33438#discussion_r1899811178


##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -88,28 +91,41 @@ func RunPipeline(j *jobservices.Job) {
 
 // makeWorker creates a worker for that environment.
 func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
-       wk := worker.New(j.String()+"_"+env, env)
+       wk := worker.Pool.NewWorker(j.String()+"_"+env, env)
 
        wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
        wk.PipelineOptions = j.PipelineOptions()
        wk.JobKey = j.JobKey()
        wk.ArtifactEndpoint = j.ArtifactEndpoint()
-
-       go wk.Serve()
+       wk.WorkerPoolEndpoint = j.WorkerPoolEndpoint
 
        if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
                return nil, fmt.Errorf("failed to start environment %v for job 
%v: %w", env, j, err)
        }
        // Check for connection succeeding after we've created the environment 
successfully.
        timeout := 1 * time.Minute
-       time.AfterFunc(timeout, func() {
-               if wk.Connected() || wk.Stopped() {
-                       return
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer cancel()
+       go func() {
+               <-ctx.Done()
+               if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+                       err := fmt.Errorf("prism %v didn't get control 
connection to %v after %v", wk, j.WorkerPoolEndpoint, timeout)

Review Comment:
   This error message no longer describes what's happening. 
   
   Previously it was validating that the *SDK* was making a connection properly.
   
   AFAICT, now, this message will happen if the context is not cancelled, and 
that we can't make a (new) health connection to the worker endpoint. Which 
isn't what we were checking for.
   



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+       id, err := grpcx.ReadWorkerID(ctx)
+       if err != nil {
+               return nil, err
+       }
+       if id == "" {
+               return nil, fmt.Errorf("worker id in ctx metadata is an empty 
string")
+       }
+       mu.Lock()

Review Comment:
   This is a package level lock being used to lock all instances of MapW, while 
mutating distinct state. That means that if we did for some reason have 
multiple MapW's, they would all get serialized behind that package level lock 
whenever they're used. 
   
   The way you get around this is by having a struct, and have the lock and map 
be fields for that struct, as we do elsewhere in prism.



##########
sdks/go/cmd/prism/prism.go:
##########
@@ -37,6 +38,7 @@ import (
 var (
        jobPort             = flag.Int("job_port", 8073, "specify the job 
management service port")
        webPort             = flag.Int("web_port", 8074, "specify the web ui 
port")
+       workerPoolPort      = flag.Int("worker_pool_port", 8075, "specify the 
worker pool port")

Review Comment:
   Part of the request for #32167 was to use the *same* port as Job Management, 
not simply a single port. This is a step in the right direction though. 
   
   Short term: Add a comment that this flag is an internal implementation 
detail, that must not be depended on and will be removed at some point.
   
   Multiple GRPC services can happily live on a single port, they just need to 
all live on the same GRPC "server". Sadly we can't *also* serve the UI from the 
same port, but we can come close.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -80,6 +82,23 @@ func executeWithT(ctx context.Context, t testing.TB, p 
*beam.Pipeline) (beam.Pip
        s1 := rand.NewSource(time.Now().UnixNano())
        r1 := rand.New(s1)
        *jobopts.JobName = fmt.Sprintf("%v-%v", strings.ToLower(t.Name()), 
r1.Intn(1000))
+       lis, err := net.Listen("tcp", ":0")
+       if err != nil {
+               t.Fatal(err)
+       }
+       _, port, _ := net.SplitHostPort(lis.Addr().String())
+       addr := "localhost:" + port
+       g := worker.NewMultiplexW()
+       t.Cleanup(g.Stop)
+       go g.Serve(lis)
+       s := jobservices.NewServer(0, internal.RunPipeline)
+       s.WorkerPoolEndpoint = addr
+       *jobopts.Endpoint = s.Endpoint()
+       go s.Serve()
+       t.Cleanup(func() {
+               *jobopts.Endpoint = ""
+               s.Stop()
+       })

Review Comment:
   It's not clear the job test setup helper suddenly needs to do an additional 
amount of set up. The engine package shouldn't need to care about this level of 
detail. This indicates there's something wrong with the abstraction, that could 
be improved.



##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -88,28 +91,41 @@ func RunPipeline(j *jobservices.Job) {
 
 // makeWorker creates a worker for that environment.
 func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
-       wk := worker.New(j.String()+"_"+env, env)
+       wk := worker.Pool.NewWorker(j.String()+"_"+env, env)
 
        wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
        wk.PipelineOptions = j.PipelineOptions()
        wk.JobKey = j.JobKey()
        wk.ArtifactEndpoint = j.ArtifactEndpoint()
-
-       go wk.Serve()
+       wk.WorkerPoolEndpoint = j.WorkerPoolEndpoint
 
        if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
                return nil, fmt.Errorf("failed to start environment %v for job 
%v: %w", env, j, err)
        }
        // Check for connection succeeding after we've created the environment 
successfully.
        timeout := 1 * time.Minute
-       time.AfterFunc(timeout, func() {
-               if wk.Connected() || wk.Stopped() {
-                       return
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer cancel()
+       go func() {
+               <-ctx.Done()
+               if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+                       err := fmt.Errorf("prism %v didn't get control 
connection to %v after %v", wk, j.WorkerPoolEndpoint, timeout)
+                       j.Failed(err)
+                       j.CancelFn(err)
                }
-               err := fmt.Errorf("prism %v didn't get control connection to %v 
after %v", wk, wk.Endpoint(), timeout)
+       }()
+       conn, err := grpc.NewClient(j.WorkerPoolEndpoint, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if err != nil {
                j.Failed(err)
                j.CancelFn(err)
-       })
+       }
+       health := healthpb.NewHealthClient(conn)
+       _, err = health.Check(ctx, &healthpb.HealthCheckRequest{})
+       if err != nil {
+               j.Failed(err)
+               j.CancelFn(err)
+       }

Review Comment:
   This block could use more comments, as it's not clear how it's supposed to 
work without a few re-reads.
   
   It's not immeadiately obvious at this point in my reviews why:
   
   1. We can no longer use the previous "wk.Connected() || wk.Stopped()" status 
functions to check for issues after a minute.
   2. Why we are making a new health client and connection that is then thrown 
away?



##########
sdks/go/pkg/beam/runners/prism/internal/execute_test.go:
##########
@@ -43,8 +45,18 @@ func TestMain(m *testing.M) {
 
 func initRunner(t testing.TB) {
        t.Helper()
+       lis, err := net.Listen("tcp", ":0")
+       if err != nil {
+               t.Fatal(err)
+       }
+       _, port, _ := net.SplitHostPort(lis.Addr().String())
+       addr := "localhost:" + port
+       g := worker.NewMultiplexW()
+       t.Cleanup(g.Stop)
+       go g.Serve(lis)

Review Comment:
   Another sign of a poor abstraction: Needing to pepper a new bit of 
boilerplate at multiple locations to get it to work.
   
   The internal integration tests / pipeline tests, shouldn't be needing to 
know about these details for setting up and executing pipelines.
   
   The fix is to move this work one level deeper than it is currently. This 
avoids it needing to be added in several places.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+       id, err := grpcx.ReadWorkerID(ctx)
+       if err != nil {
+               return nil, err
+       }
+       if id == "" {
+               return nil, fmt.Errorf("worker id in ctx metadata is an empty 
string")
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       if w, ok := m[id]; ok {
+               return w, nil
+       }
+       return nil, fmt.Errorf("worker id: '%s' read from ctx but not 
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+       wk := &W{
+               ID:  id,
+               Env: env,
+
+               InstReqs:    make(chan *fnpb.InstructionRequest, 10),
+               DataReqs:    make(chan *fnpb.Elements, 10),
+               StoppedChan: make(chan struct{}),
+
+               activeInstructions: make(map[string]controlResponder),
+               Descriptors:        
make(map[string]*fnpb.ProcessBundleDescriptor),
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       m[wk.ID] = wk
+       return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI 
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+       opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+       g := grpc.NewServer(opts...)
+       wk := &MultiplexW{
+               logger: slog.Default(),
+       }
+
+       fnpb.RegisterBeamFnControlServer(g, wk)
+       fnpb.RegisterBeamFnDataServer(g, wk)
+       fnpb.RegisterBeamFnLoggingServer(g, wk)
+       fnpb.RegisterBeamFnStateServer(g, wk)
+       fnpb.RegisterProvisionServiceServer(g, wk)
+       healthpb.RegisterHealthServer(g, wk)
+
+       return g
+}
+
+// MultiplexW multiplexes FnAPI gRPC requests to *W stored in the Pool.
+type MultiplexW struct {

Review Comment:
   Any reason this can't be the pool type itself?



##########
sdks/go/cmd/prism/prism.go:
##########
@@ -110,6 +113,18 @@ func main() {
                        log.Fatalf("error creating web server: %v", err)
                }
        }
+       g := prism.CreateWorkerPoolServer(ctx)
+       addr := fmt.Sprintf(":%d", *workerPoolPort)
+       lis, err := net.Listen("tcp", addr)
+       if err != nil {
+               log.Fatalf("error creating worker pool server: %v", err)
+       }
+       slog.Info("Serving Worker Pool", "endpoint", 
fmt.Sprintf("localhost:%d", *workerPoolPort))
+       go g.Serve(lis)
+       go func() {
+               <-ctx.Done()
+               g.GracefulStop()
+       }()

Review Comment:
   This is exposing the dialing details, which doesn't match the existing 
pattern for the Job management service and the web server. These should be 
migrated into the main prism packages, and not live on the cli binary side car.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+       id, err := grpcx.ReadWorkerID(ctx)
+       if err != nil {
+               return nil, err
+       }
+       if id == "" {
+               return nil, fmt.Errorf("worker id in ctx metadata is an empty 
string")
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       if w, ok := m[id]; ok {
+               return w, nil
+       }
+       return nil, fmt.Errorf("worker id: '%s' read from ctx but not 
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+       wk := &W{
+               ID:  id,
+               Env: env,
+
+               InstReqs:    make(chan *fnpb.InstructionRequest, 10),
+               DataReqs:    make(chan *fnpb.Elements, 10),
+               StoppedChan: make(chan struct{}),
+
+               activeInstructions: make(map[string]controlResponder),
+               Descriptors:        
make(map[string]*fnpb.ProcessBundleDescriptor),
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       m[wk.ID] = wk
+       return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI 
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+       opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+       g := grpc.NewServer(opts...)

Review Comment:
   If this function is turned into a method on some distinct Pool type, and 
instead receives the gRPC server instead of options, then the Pool can register 
its services onto that grpc server instance instead of making it's own.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)

Review Comment:
   It's poor design to have package level lookups. This would be the first one 
in prism.
   
   Consider instead making the pool a struct type that is hosted in the Job 
management server. That way we can have multiple instances. The instances can 
then be tested independantly, instead of mutating a global pool state.
   
   That type can have methods. We can avoid a direct dependency between the 
"management" and the "worker" packages by having an interface in the Job 
Management server to be able to host the worker pool.
   
   That type can have a method to be able to register the FnAPI services onto 
the same port/GRPC.Server as the JobManagment services. This allows us to have 
a single port.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W

Review Comment:
   This type is exported from the worker package, but isn't used anywhere 
outside of the workers package. It doesn't need to be exported as written.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+       "context"
+       "fmt"
+       "log/slog"
+       "math"
+       "sync"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
+       healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+       mu sync.Mutex
+
+       // Pool stores *W.
+       Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+       id, err := grpcx.ReadWorkerID(ctx)
+       if err != nil {
+               return nil, err
+       }
+       if id == "" {
+               return nil, fmt.Errorf("worker id in ctx metadata is an empty 
string")
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       if w, ok := m[id]; ok {
+               return w, nil
+       }
+       return nil, fmt.Errorf("worker id: '%s' read from ctx but not 
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+       wk := &W{
+               ID:  id,
+               Env: env,
+
+               InstReqs:    make(chan *fnpb.InstructionRequest, 10),
+               DataReqs:    make(chan *fnpb.Elements, 10),
+               StoppedChan: make(chan struct{}),
+
+               activeInstructions: make(map[string]controlResponder),
+               Descriptors:        
make(map[string]*fnpb.ProcessBundleDescriptor),
+       }
+       mu.Lock()
+       defer mu.Unlock()
+       m[wk.ID] = wk
+       return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI 
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+       opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+       g := grpc.NewServer(opts...)
+       wk := &MultiplexW{
+               logger: slog.Default(),
+       }
+
+       fnpb.RegisterBeamFnControlServer(g, wk)
+       fnpb.RegisterBeamFnDataServer(g, wk)
+       fnpb.RegisterBeamFnLoggingServer(g, wk)
+       fnpb.RegisterBeamFnStateServer(g, wk)
+       fnpb.RegisterProvisionServiceServer(g, wk)
+       healthpb.RegisterHealthServer(g, wk)
+
+       return g
+}
+
+// MultiplexW multiplexes FnAPI gRPC requests to *W stored in the Pool.
+type MultiplexW struct {
+       fnpb.UnimplementedBeamFnControlServer
+       fnpb.UnimplementedBeamFnDataServer
+       fnpb.UnimplementedBeamFnStateServer
+       fnpb.UnimplementedBeamFnLoggingServer
+       fnpb.UnimplementedProvisionServiceServer
+       healthpb.UnimplementedHealthServer
+
+       logger *slog.Logger
+}
+
+func (mw *MultiplexW) Check(_ context.Context, _ *healthpb.HealthCheckRequest) 
(*healthpb.HealthCheckResponse, error) {
+       return &healthpb.HealthCheckResponse{Status: 
healthpb.HealthCheckResponse_SERVING}, nil
+}
+
+func (mw *MultiplexW) GetProvisionInfo(ctx context.Context, req 
*fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) {
+       w, err := Pool.workerFromMetadataCtx(ctx)
+       if err != nil {
+               return nil, err
+       }
+       return w.GetProvisionInfo(ctx, req)

Review Comment:
   Generics can help with a bunch of this boilerplate:
   https://go.dev/play/p/0RImWo7U_kx
   
   Typesafe, reduces the 5 lines of boilerplate per method to 1 each, which 
reduces the surface that needs to be tested. That once the types are worked out 
by the compiler, it's not necessary to really call any of the multiplex methods 
here, in favour of just testing the generic boilerplate reducers. 
(streamHelper,and singleHelper in the example playground).



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -151,20 +105,10 @@ func (wk *W) shutdown() {
        }
 }
 
-// Stop the GRPC server.
+// Stop the worker and delete it from the Pool.
 func (wk *W) Stop() {
        wk.shutdown()
-
-       // Give the SDK side 5 seconds to gracefully stop, before
-       // hard stopping all RPCs.
-       tim := time.AfterFunc(5*time.Second, func() {
-               wk.server.Stop()
-       })
-       wk.server.GracefulStop()
-       tim.Stop()
-
-       wk.lis.Close()
-       slog.Debug("stopped", "worker", wk)
+       delete(Pool, wk.ID)

Review Comment:
   At minimum, this needs to use the lock before deleting it. Deletes to maps 
are writes, and map writes are never concurrency safe with any operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to