lostluck commented on code in PR #33438:
URL: https://github.com/apache/beam/pull/33438#discussion_r1899811178
##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -88,28 +91,41 @@ func RunPipeline(j *jobservices.Job) {
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
- wk := worker.New(j.String()+"_"+env, env)
+ wk := worker.Pool.NewWorker(j.String()+"_"+env, env)
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()
-
- go wk.Serve()
+ wk.WorkerPoolEndpoint = j.WorkerPoolEndpoint
if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job
%v: %w", env, j, err)
}
// Check for connection succeeding after we've created the environment
successfully.
timeout := 1 * time.Minute
- time.AfterFunc(timeout, func() {
- if wk.Connected() || wk.Stopped() {
- return
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ go func() {
+ <-ctx.Done()
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+ err := fmt.Errorf("prism %v didn't get control
connection to %v after %v", wk, j.WorkerPoolEndpoint, timeout)
Review Comment:
This error message no longer describes what's happening.
Previously it was validating that the *SDK* was making a connection properly.
AFAICT, now, this message will happen if the context is not cancelled, and
that we can't make a (new) health connection to the worker endpoint. Which
isn't what we were checking for.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+ id, err := grpcx.ReadWorkerID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if id == "" {
+ return nil, fmt.Errorf("worker id in ctx metadata is an empty
string")
+ }
+ mu.Lock()
Review Comment:
This is a package level lock being used to lock all instances of MapW, while
mutating distinct state. That means that if we did for some reason have
multiple MapW's, they would all get serialized behind that package level lock
whenever they're used.
The way you get around this is by having a struct, and have the lock and map
be fields for that struct, as we do elsewhere in prism.
##########
sdks/go/cmd/prism/prism.go:
##########
@@ -37,6 +38,7 @@ import (
var (
jobPort = flag.Int("job_port", 8073, "specify the job
management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui
port")
+ workerPoolPort = flag.Int("worker_pool_port", 8075, "specify the
worker pool port")
Review Comment:
Part of the request for #32167 was to use the *same* port as Job Management,
not simply a single port. This is a step in the right direction though.
Short term: Add a comment that this flag is an internal implementation
detail, that must not be depended on and will be removed at some point.
Multiple GRPC services can happily live on a single port, they just need to
all live on the same GRPC "server". Sadly we can't *also* serve the UI from the
same port, but we can come close.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -80,6 +82,23 @@ func executeWithT(ctx context.Context, t testing.TB, p
*beam.Pipeline) (beam.Pip
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
*jobopts.JobName = fmt.Sprintf("%v-%v", strings.ToLower(t.Name()),
r1.Intn(1000))
+ lis, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, port, _ := net.SplitHostPort(lis.Addr().String())
+ addr := "localhost:" + port
+ g := worker.NewMultiplexW()
+ t.Cleanup(g.Stop)
+ go g.Serve(lis)
+ s := jobservices.NewServer(0, internal.RunPipeline)
+ s.WorkerPoolEndpoint = addr
+ *jobopts.Endpoint = s.Endpoint()
+ go s.Serve()
+ t.Cleanup(func() {
+ *jobopts.Endpoint = ""
+ s.Stop()
+ })
Review Comment:
It's not clear the job test setup helper suddenly needs to do an additional
amount of set up. The engine package shouldn't need to care about this level of
detail. This indicates there's something wrong with the abstraction, that could
be improved.
##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -88,28 +91,41 @@ func RunPipeline(j *jobservices.Job) {
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
- wk := worker.New(j.String()+"_"+env, env)
+ wk := worker.Pool.NewWorker(j.String()+"_"+env, env)
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()
-
- go wk.Serve()
+ wk.WorkerPoolEndpoint = j.WorkerPoolEndpoint
if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job
%v: %w", env, j, err)
}
// Check for connection succeeding after we've created the environment
successfully.
timeout := 1 * time.Minute
- time.AfterFunc(timeout, func() {
- if wk.Connected() || wk.Stopped() {
- return
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ go func() {
+ <-ctx.Done()
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+ err := fmt.Errorf("prism %v didn't get control
connection to %v after %v", wk, j.WorkerPoolEndpoint, timeout)
+ j.Failed(err)
+ j.CancelFn(err)
}
- err := fmt.Errorf("prism %v didn't get control connection to %v
after %v", wk, wk.Endpoint(), timeout)
+ }()
+ conn, err := grpc.NewClient(j.WorkerPoolEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
j.Failed(err)
j.CancelFn(err)
- })
+ }
+ health := healthpb.NewHealthClient(conn)
+ _, err = health.Check(ctx, &healthpb.HealthCheckRequest{})
+ if err != nil {
+ j.Failed(err)
+ j.CancelFn(err)
+ }
Review Comment:
This block could use more comments, as it's not clear how it's supposed to
work without a few re-reads.
It's not immeadiately obvious at this point in my reviews why:
1. We can no longer use the previous "wk.Connected() || wk.Stopped()" status
functions to check for issues after a minute.
2. Why we are making a new health client and connection that is then thrown
away?
##########
sdks/go/pkg/beam/runners/prism/internal/execute_test.go:
##########
@@ -43,8 +45,18 @@ func TestMain(m *testing.M) {
func initRunner(t testing.TB) {
t.Helper()
+ lis, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, port, _ := net.SplitHostPort(lis.Addr().String())
+ addr := "localhost:" + port
+ g := worker.NewMultiplexW()
+ t.Cleanup(g.Stop)
+ go g.Serve(lis)
Review Comment:
Another sign of a poor abstraction: Needing to pepper a new bit of
boilerplate at multiple locations to get it to work.
The internal integration tests / pipeline tests, shouldn't be needing to
know about these details for setting up and executing pipelines.
The fix is to move this work one level deeper than it is currently. This
avoids it needing to be added in several places.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+ id, err := grpcx.ReadWorkerID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if id == "" {
+ return nil, fmt.Errorf("worker id in ctx metadata is an empty
string")
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ if w, ok := m[id]; ok {
+ return w, nil
+ }
+ return nil, fmt.Errorf("worker id: '%s' read from ctx but not
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+ wk := &W{
+ ID: id,
+ Env: env,
+
+ InstReqs: make(chan *fnpb.InstructionRequest, 10),
+ DataReqs: make(chan *fnpb.Elements, 10),
+ StoppedChan: make(chan struct{}),
+
+ activeInstructions: make(map[string]controlResponder),
+ Descriptors:
make(map[string]*fnpb.ProcessBundleDescriptor),
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ m[wk.ID] = wk
+ return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+ opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+ g := grpc.NewServer(opts...)
+ wk := &MultiplexW{
+ logger: slog.Default(),
+ }
+
+ fnpb.RegisterBeamFnControlServer(g, wk)
+ fnpb.RegisterBeamFnDataServer(g, wk)
+ fnpb.RegisterBeamFnLoggingServer(g, wk)
+ fnpb.RegisterBeamFnStateServer(g, wk)
+ fnpb.RegisterProvisionServiceServer(g, wk)
+ healthpb.RegisterHealthServer(g, wk)
+
+ return g
+}
+
+// MultiplexW multiplexes FnAPI gRPC requests to *W stored in the Pool.
+type MultiplexW struct {
Review Comment:
Any reason this can't be the pool type itself?
##########
sdks/go/cmd/prism/prism.go:
##########
@@ -110,6 +113,18 @@ func main() {
log.Fatalf("error creating web server: %v", err)
}
}
+ g := prism.CreateWorkerPoolServer(ctx)
+ addr := fmt.Sprintf(":%d", *workerPoolPort)
+ lis, err := net.Listen("tcp", addr)
+ if err != nil {
+ log.Fatalf("error creating worker pool server: %v", err)
+ }
+ slog.Info("Serving Worker Pool", "endpoint",
fmt.Sprintf("localhost:%d", *workerPoolPort))
+ go g.Serve(lis)
+ go func() {
+ <-ctx.Done()
+ g.GracefulStop()
+ }()
Review Comment:
This is exposing the dialing details, which doesn't match the existing
pattern for the Job management service and the web server. These should be
migrated into the main prism packages, and not live on the cli binary side car.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+ id, err := grpcx.ReadWorkerID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if id == "" {
+ return nil, fmt.Errorf("worker id in ctx metadata is an empty
string")
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ if w, ok := m[id]; ok {
+ return w, nil
+ }
+ return nil, fmt.Errorf("worker id: '%s' read from ctx but not
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+ wk := &W{
+ ID: id,
+ Env: env,
+
+ InstReqs: make(chan *fnpb.InstructionRequest, 10),
+ DataReqs: make(chan *fnpb.Elements, 10),
+ StoppedChan: make(chan struct{}),
+
+ activeInstructions: make(map[string]controlResponder),
+ Descriptors:
make(map[string]*fnpb.ProcessBundleDescriptor),
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ m[wk.ID] = wk
+ return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+ opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+ g := grpc.NewServer(opts...)
Review Comment:
If this function is turned into a method on some distinct Pool type, and
instead receives the gRPC server instead of options, then the Pool can register
its services onto that grpc server instance instead of making it's own.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
Review Comment:
It's poor design to have package level lookups. This would be the first one
in prism.
Consider instead making the pool a struct type that is hosted in the Job
management server. That way we can have multiple instances. The instances can
then be tested independantly, instead of mutating a global pool state.
That type can have methods. We can avoid a direct dependency between the
"management" and the "worker" packages by having an interface in the Job
Management server to be able to host the worker pool.
That type can have a method to be able to register the FnAPI services onto
the same port/GRPC.Server as the JobManagment services. This allows us to have
a single port.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
Review Comment:
This type is exported from the worker package, but isn't used anywhere
outside of the workers package. It doesn't need to be exported as written.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/pool.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "math"
+ "sync"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+var (
+ mu sync.Mutex
+
+ // Pool stores *W.
+ Pool = make(MapW)
+)
+
+// MapW manages the creation and querying of *W.
+type MapW map[string]*W
+
+func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
+ id, err := grpcx.ReadWorkerID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if id == "" {
+ return nil, fmt.Errorf("worker id in ctx metadata is an empty
string")
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ if w, ok := m[id]; ok {
+ return w, nil
+ }
+ return nil, fmt.Errorf("worker id: '%s' read from ctx but not
registered in worker pool", id)
+}
+
+// NewWorker instantiates and registers new *W instances.
+func (m MapW) NewWorker(id, env string) *W {
+ wk := &W{
+ ID: id,
+ Env: env,
+
+ InstReqs: make(chan *fnpb.InstructionRequest, 10),
+ DataReqs: make(chan *fnpb.Elements, 10),
+ StoppedChan: make(chan struct{}),
+
+ activeInstructions: make(map[string]controlResponder),
+ Descriptors:
make(map[string]*fnpb.ProcessBundleDescriptor),
+ }
+ mu.Lock()
+ defer mu.Unlock()
+ m[wk.ID] = wk
+ return wk
+}
+
+// NewMultiplexW instantiates a grpc.Server for multiplexing worker FnAPI
requests.
+func NewMultiplexW(opts ...grpc.ServerOption) *grpc.Server {
+ opts = append(opts, grpc.MaxSendMsgSize(math.MaxInt32))
+
+ g := grpc.NewServer(opts...)
+ wk := &MultiplexW{
+ logger: slog.Default(),
+ }
+
+ fnpb.RegisterBeamFnControlServer(g, wk)
+ fnpb.RegisterBeamFnDataServer(g, wk)
+ fnpb.RegisterBeamFnLoggingServer(g, wk)
+ fnpb.RegisterBeamFnStateServer(g, wk)
+ fnpb.RegisterProvisionServiceServer(g, wk)
+ healthpb.RegisterHealthServer(g, wk)
+
+ return g
+}
+
+// MultiplexW multiplexes FnAPI gRPC requests to *W stored in the Pool.
+type MultiplexW struct {
+ fnpb.UnimplementedBeamFnControlServer
+ fnpb.UnimplementedBeamFnDataServer
+ fnpb.UnimplementedBeamFnStateServer
+ fnpb.UnimplementedBeamFnLoggingServer
+ fnpb.UnimplementedProvisionServiceServer
+ healthpb.UnimplementedHealthServer
+
+ logger *slog.Logger
+}
+
+func (mw *MultiplexW) Check(_ context.Context, _ *healthpb.HealthCheckRequest)
(*healthpb.HealthCheckResponse, error) {
+ return &healthpb.HealthCheckResponse{Status:
healthpb.HealthCheckResponse_SERVING}, nil
+}
+
+func (mw *MultiplexW) GetProvisionInfo(ctx context.Context, req
*fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error) {
+ w, err := Pool.workerFromMetadataCtx(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return w.GetProvisionInfo(ctx, req)
Review Comment:
Generics can help with a bunch of this boilerplate:
https://go.dev/play/p/0RImWo7U_kx
Typesafe, reduces the 5 lines of boilerplate per method to 1 each, which
reduces the surface that needs to be tested. That once the types are worked out
by the compiler, it's not necessary to really call any of the multiplex methods
here, in favour of just testing the generic boilerplate reducers.
(streamHelper,and singleHelper in the example playground).
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -151,20 +105,10 @@ func (wk *W) shutdown() {
}
}
-// Stop the GRPC server.
+// Stop the worker and delete it from the Pool.
func (wk *W) Stop() {
wk.shutdown()
-
- // Give the SDK side 5 seconds to gracefully stop, before
- // hard stopping all RPCs.
- tim := time.AfterFunc(5*time.Second, func() {
- wk.server.Stop()
- })
- wk.server.GracefulStop()
- tim.Stop()
-
- wk.lis.Close()
- slog.Debug("stopped", "worker", wk)
+ delete(Pool, wk.ID)
Review Comment:
At minimum, this needs to use the lock before deleting it. Deletes to maps
are writes, and map writes are never concurrency safe with any operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]