lostluck commented on a change in pull request #16957:
URL: https://github.com/apache/beam/pull/16957#discussion_r819020717
##########
File path: sdks/go.mod
##########
@@ -23,33 +23,34 @@ module github.com/apache/beam/sdks/v2
go 1.16
require (
- cloud.google.com/go/bigquery v1.17.0
- cloud.google.com/go/datastore v1.5.0
- cloud.google.com/go/pubsub v1.11.0-beta.schemas
- cloud.google.com/go/storage v1.15.0
+ cloud.google.com/go/bigquery v1.28.0
+ cloud.google.com/go/compute v1.5.0 // indirect
+ cloud.google.com/go/datastore v1.6.0
+ cloud.google.com/go/iam v0.2.0 // indirect
+ cloud.google.com/go/pubsub v1.18.0
+ cloud.google.com/go/storage v1.21.0
Review comment:
OK so these are being updated. Did you call `go get` explicitly?
`go mod tidy` shouldn't be updating things AFACIT from the docs, unless we
are actually changing/adding deps.
https://go.dev/doc/modules/managing-dependencies
But I might be mis-reading it.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for resp := range w.resp {
+ log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+ if err := stub.Send(resp); err != nil && err != io.EOF {
+ log.Errorf(ctx, "workerStatus.Writer: Failed to
respond: %v", err)
+ }
+ }
+ log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for atomic.LoadInt32(&w.shutdown) == 0 {
+ req, err := stub.Recv()
+ if err != nil {
+ log.Debugf(ctx, "exiting workerStatusHandler.Reader():
%v", err)
+ return
+ }
+ log.Debugf(ctx, "RECV-status: %v", req.GetId())
+ buf := make([]byte, 1<<16)
+ runtime.Stack(buf, true)
+ response := &fnpb.WorkerStatusResponse{Id: req.GetId(),
StatusInfo: string(buf)}
+ if w.resp != nil {
+ w.resp <- response
+ } else {
+ return
+ }
+ }
+}
+
+// close stops the reader first, closes the response channel thereby stopping
writer and finally closes the gRPC connection.
+func (w *workerStatusHandler) close(ctx context.Context, wg *sync.WaitGroup) {
+ atomic.StoreInt32(&w.shutdown, 1)
+ close(w.resp)
Review comment:
It's a bad idea to close a channel from goroutines that aren't doing the
writing. A race condition exists where this is closed, isn't yet niled out, and
the reader tries to send on the goroutine. Have the reader close the channel,
after it exists it's loop with a defer instead.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
Review comment:
Consider either filling out the comment, or remove it since it's
unexported and empty.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
Review comment:
```suggestion
// writer sends the WorkerStatusResponse received from the response channel.
```
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for resp := range w.resp {
+ log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+ if err := stub.Send(resp); err != nil && err != io.EOF {
+ log.Errorf(ctx, "workerStatus.Writer: Failed to
respond: %v", err)
+ }
+ }
+ log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for atomic.LoadInt32(&w.shutdown) == 0 {
+ req, err := stub.Recv()
+ if err != nil {
+ log.Debugf(ctx, "exiting workerStatusHandler.Reader():
%v", err)
+ return
+ }
+ log.Debugf(ctx, "RECV-status: %v", req.GetId())
+ buf := make([]byte, 1<<16)
Review comment:
Consider allocating this outside the loop so it's not constantly GC'd
and re-allocated, and documenting the intended human readable size cap.
(though, if it's once every 5 minutes, it's probably a non-issue, but in that
case, comment about that here instead, since moving a buffer out of a loop is a
common target for optimization passes). The conversion to string will avoid
aliasing problems with simultaneous calls issues.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "net"
+ "sync"
+ "testing"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+ fnpb.UnimplementedBeamFnWorkerStatusServer
+ response chan string
+}
+
+func (w BeamFnWorkerStatusServicer)
mustEmbedUnimplementedBeamFnWorkerStatusServer() {
Review comment:
Nit: You shouldn't need to implement this, since you've embedded it the
servicer status.
You're running into that you're using a value, rather than a pointer method
receiver for WorkerStatus.
Change `(w BeamFnWorkerStatusServicer)` to `(w *BeamFnWorkerStatusServicer)`
and `srv := BeamFnWorkerStatusServicer` to `srv :=
&BeamFnWorkerStatusServicer` and it should clear up. (and save you from
needing to do `&srv` later. Either will be stack or heap allocated as needed.)
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
Review comment:
Consider also launching the reader as a goroutine here instead of
launching the handleRequest as a goroutine. It doesn't make sense to split the
launching to multiple levels and will make it harder to track what's going on.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for resp := range w.resp {
+ log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+ if err := stub.Send(resp); err != nil && err != io.EOF {
+ log.Errorf(ctx, "workerStatus.Writer: Failed to
respond: %v", err)
+ }
+ }
+ log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for atomic.LoadInt32(&w.shutdown) == 0 {
+ req, err := stub.Recv()
+ if err != nil {
+ log.Debugf(ctx, "exiting workerStatusHandler.Reader():
%v", err)
+ return
+ }
+ log.Debugf(ctx, "RECV-status: %v", req.GetId())
+ buf := make([]byte, 1<<16)
+ runtime.Stack(buf, true)
+ response := &fnpb.WorkerStatusResponse{Id: req.GetId(),
StatusInfo: string(buf)}
+ if w.resp != nil {
+ w.resp <- response
Review comment:
I suppose my question here is: Do we need two separate goroutines for
handling these responses? Could we not have a single receive/send loop that
receives a status request, processes it, replying back immeadiately? IIRC it
took a several minute job to even trigger this.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
Review comment:
If an error is returned, the stub is invalid, so this should also return
out instead of allowing the writer and reader to be launched.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus
API.
+type workerStatusHandler struct {
+ conn *grpc.ClientConn
+ shutdown int32
+ resp chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string)
(*workerStatusHandler, error) {
+ sconn, err := dial(ctx, endpoint, 60*time.Second)
+ if err != nil {
+ return &workerStatusHandler{}, errors.Wrapf(err, "failed to
connect: %v\n", endpoint)
+ }
+ return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg
*sync.WaitGroup) {
+ defer wg.Done()
+ statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+ stub, err := statusClient.WorkerStatus(ctx)
+ if err != nil {
+ log.Errorf(ctx, "status client not established: %v", err)
+ }
+ go w.writer(ctx, stub)
+ w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for resp := range w.resp {
+ log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+ if err := stub.Send(resp); err != nil && err != io.EOF {
+ log.Errorf(ctx, "workerStatus.Writer: Failed to
respond: %v", err)
+ }
+ }
+ log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+ for atomic.LoadInt32(&w.shutdown) == 0 {
+ req, err := stub.Recv()
+ if err != nil {
+ log.Debugf(ctx, "exiting workerStatusHandler.Reader():
%v", err)
+ return
+ }
+ log.Debugf(ctx, "RECV-status: %v", req.GetId())
+ buf := make([]byte, 1<<16)
+ runtime.Stack(buf, true)
+ response := &fnpb.WorkerStatusResponse{Id: req.GetId(),
StatusInfo: string(buf)}
+ if w.resp != nil {
+ w.resp <- response
+ } else {
+ return
+ }
+ }
+}
+
+// close stops the reader first, closes the response channel thereby stopping
writer and finally closes the gRPC connection.
+func (w *workerStatusHandler) close(ctx context.Context, wg *sync.WaitGroup) {
+ atomic.StoreInt32(&w.shutdown, 1)
Review comment:
Neat pattern! Consider isolating the atomic stuff into methods for
documentation purposes. It's opaque to a reader what these are supposed to do,
but in their own methods, they become documented and implementation details.
The other place we're doing this should likely be doing the same, but that's
a separate problem.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "net"
+ "sync"
+ "testing"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+ fnpb.UnimplementedBeamFnWorkerStatusServer
+ response chan string
+}
+
+func (w BeamFnWorkerStatusServicer)
mustEmbedUnimplementedBeamFnWorkerStatusServer() {
+
+}
+
+func (w BeamFnWorkerStatusServicer) WorkerStatus(b
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+ b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+ resp, err := b.Recv()
+ if err != nil {
+ return fmt.Errorf("error receiving response b.recv: %v", err)
+ }
+ w.response <- resp.GetStatusInfo()
+ return nil
+}
+
+type serverData struct {
Review comment:
It doesn't seem like you need this type. Consider just having setUp be a
method instead, or even a method on your BeamFnWorkerStatusServicer type
instead.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "net"
+ "sync"
+ "testing"
+
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+ fnpb.UnimplementedBeamFnWorkerStatusServer
+ response chan string
+}
+
+func (w BeamFnWorkerStatusServicer)
mustEmbedUnimplementedBeamFnWorkerStatusServer() {
+
+}
+
+func (w BeamFnWorkerStatusServicer) WorkerStatus(b
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+ b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+ resp, err := b.Recv()
+ if err != nil {
+ return fmt.Errorf("error receiving response b.recv: %v", err)
+ }
+ w.response <- resp.GetStatusInfo()
+ return nil
+}
+
+type serverData struct {
+ server *grpc.Server
+}
+
+func (s *serverData) setUp(port string, srv *BeamFnWorkerStatusServicer) {
+ l, err := net.Listen("tcp", ":9000")
+ if err != nil {
+ log.Fatalf("failed to listen on port 9000 %v", err)
+ }
+
+ fnpb.RegisterBeamFnWorkerStatusServer(s.server, srv)
+ if err := s.server.Serve(l); err != nil {
+ log.Fatalf("failed to serve: %v", err)
+ }
+}
+
+func TestSendStatusResponse(t *testing.T) {
+ server := &serverData{server: grpc.NewServer()}
+ srv := BeamFnWorkerStatusServicer{response: make(chan string)}
+ go server.setUp("9000", &srv)
+ defer server.server.Stop()
Review comment:
In tests, prefer t.Cleanup(...) to defer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]