lostluck commented on a change in pull request #16957:
URL: https://github.com/apache/beam/pull/16957#discussion_r819020717



##########
File path: sdks/go.mod
##########
@@ -23,33 +23,34 @@ module github.com/apache/beam/sdks/v2
 go 1.16
 
 require (
-       cloud.google.com/go/bigquery v1.17.0
-       cloud.google.com/go/datastore v1.5.0
-       cloud.google.com/go/pubsub v1.11.0-beta.schemas
-       cloud.google.com/go/storage v1.15.0
+       cloud.google.com/go/bigquery v1.28.0
+       cloud.google.com/go/compute v1.5.0 // indirect
+       cloud.google.com/go/datastore v1.6.0
+       cloud.google.com/go/iam v0.2.0 // indirect
+       cloud.google.com/go/pubsub v1.18.0
+       cloud.google.com/go/storage v1.21.0

Review comment:
       OK so these are being updated. Did you call `go get` explicitly?
   
   `go mod tidy` shouldn't be updating things AFACIT from the docs, unless we 
are actually changing/adding deps. 
https://go.dev/doc/modules/managing-dependencies
   
   But I might be mis-reading it.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for resp := range w.resp {
+               log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+               if err := stub.Send(resp); err != nil && err != io.EOF {
+                       log.Errorf(ctx, "workerStatus.Writer: Failed to 
respond: %v", err)
+               }
+       }
+       log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for atomic.LoadInt32(&w.shutdown) == 0 {
+               req, err := stub.Recv()
+               if err != nil {
+                       log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
+                       return
+               }
+               log.Debugf(ctx, "RECV-status: %v", req.GetId())
+               buf := make([]byte, 1<<16)
+               runtime.Stack(buf, true)
+               response := &fnpb.WorkerStatusResponse{Id: req.GetId(), 
StatusInfo: string(buf)}
+               if w.resp != nil {
+                       w.resp <- response
+               } else {
+                       return
+               }
+       }
+}
+
+// close stops the reader first, closes the response channel thereby stopping 
writer and finally closes the gRPC connection.
+func (w *workerStatusHandler) close(ctx context.Context, wg *sync.WaitGroup) {
+       atomic.StoreInt32(&w.shutdown, 1)
+       close(w.resp)

Review comment:
       It's a bad idea to close a channel from goroutines that aren't doing the 
writing. A race condition exists where this is closed, isn't yet niled out, and 
the reader tries to send on the goroutine. Have the reader close the channel, 
after it exists it's loop with a defer instead.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler

Review comment:
       Consider either filling out the comment, or remove it since it's 
unexported and empty.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.

Review comment:
       ```suggestion
   // writer sends the WorkerStatusResponse received from the response channel.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for resp := range w.resp {
+               log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+               if err := stub.Send(resp); err != nil && err != io.EOF {
+                       log.Errorf(ctx, "workerStatus.Writer: Failed to 
respond: %v", err)
+               }
+       }
+       log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for atomic.LoadInt32(&w.shutdown) == 0 {
+               req, err := stub.Recv()
+               if err != nil {
+                       log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
+                       return
+               }
+               log.Debugf(ctx, "RECV-status: %v", req.GetId())
+               buf := make([]byte, 1<<16)

Review comment:
       Consider allocating this outside the loop so it's not constantly GC'd 
and re-allocated, and documenting the intended human readable size cap. 
(though, if it's once every 5 minutes, it's probably a non-issue, but in that 
case, comment about that here instead, since moving a buffer out of a loop is a 
common target for optimization passes). The conversion to string will avoid 
aliasing problems with simultaneous calls issues.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "sync"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+       fnpb.UnimplementedBeamFnWorkerStatusServer
+       response chan string
+}
+
+func (w BeamFnWorkerStatusServicer) 
mustEmbedUnimplementedBeamFnWorkerStatusServer() {

Review comment:
       Nit: You shouldn't need to implement this, since you've embedded it the 
servicer status.
   
   You're running into that you're using a value, rather than a pointer method 
receiver for WorkerStatus. 
   
   Change `(w BeamFnWorkerStatusServicer)` to `(w *BeamFnWorkerStatusServicer)` 
 and   `srv := BeamFnWorkerStatusServicer` to `srv := 
&BeamFnWorkerStatusServicer` and it should clear up.  (and save you from 
needing to do `&srv` later. Either will be stack or heap allocated as needed.)

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)

Review comment:
       Consider also launching the reader as a goroutine here instead of 
launching the handleRequest as a goroutine. It doesn't make sense to split the 
launching to multiple levels and will make it harder to track what's going on. 

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for resp := range w.resp {
+               log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+               if err := stub.Send(resp); err != nil && err != io.EOF {
+                       log.Errorf(ctx, "workerStatus.Writer: Failed to 
respond: %v", err)
+               }
+       }
+       log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for atomic.LoadInt32(&w.shutdown) == 0 {
+               req, err := stub.Recv()
+               if err != nil {
+                       log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
+                       return
+               }
+               log.Debugf(ctx, "RECV-status: %v", req.GetId())
+               buf := make([]byte, 1<<16)
+               runtime.Stack(buf, true)
+               response := &fnpb.WorkerStatusResponse{Id: req.GetId(), 
StatusInfo: string(buf)}
+               if w.resp != nil {
+                       w.resp <- response

Review comment:
       I suppose my question here is: Do we need two separate goroutines for 
handling these responses? Could we not have a single receive/send loop that 
receives a status request, processes it, replying back immeadiately? IIRC it 
took a several minute job to even trigger this.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)

Review comment:
       If an error is returned, the stub is invalid, so this should also return 
out instead of allowing the writer and reader to be launched.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn     *grpc.ClientConn
+       shutdown int32
+       resp     chan *fnpb.WorkerStatusResponse
+}
+
+// newWorkerStatusHandler
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return &workerStatusHandler{}, errors.Wrapf(err, "failed to 
connect: %v\n", endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shutdown: 0, resp: make(chan 
*fnpb.WorkerStatusResponse)}, nil
+}
+
+// handleRequest manages the WorkerStatus API.
+func (w *workerStatusHandler) handleRequest(ctx context.Context, wg 
*sync.WaitGroup) {
+       defer wg.Done()
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+       }
+       go w.writer(ctx, stub)
+       w.reader(ctx, stub)
+}
+
+// writer writes the WorkerStatusResponse recevied from the response channel.
+func (w *workerStatusHandler) writer(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for resp := range w.resp {
+               log.Debugf(ctx, "RESP-status: %v", resp.GetId())
+               if err := stub.Send(resp); err != nil && err != io.EOF {
+                       log.Errorf(ctx, "workerStatus.Writer: Failed to 
respond: %v", err)
+               }
+       }
+       log.Debugf(ctx, "exiting workerStatusHandler.Writer()")
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       for atomic.LoadInt32(&w.shutdown) == 0 {
+               req, err := stub.Recv()
+               if err != nil {
+                       log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
+                       return
+               }
+               log.Debugf(ctx, "RECV-status: %v", req.GetId())
+               buf := make([]byte, 1<<16)
+               runtime.Stack(buf, true)
+               response := &fnpb.WorkerStatusResponse{Id: req.GetId(), 
StatusInfo: string(buf)}
+               if w.resp != nil {
+                       w.resp <- response
+               } else {
+                       return
+               }
+       }
+}
+
+// close stops the reader first, closes the response channel thereby stopping 
writer and finally closes the gRPC connection.
+func (w *workerStatusHandler) close(ctx context.Context, wg *sync.WaitGroup) {
+       atomic.StoreInt32(&w.shutdown, 1)

Review comment:
       Neat pattern! Consider isolating the atomic stuff into methods for 
documentation purposes. It's opaque to a reader what these are supposed to do, 
but in their own methods, they become documented and implementation details.
   
   The other place we're doing this should likely be doing the same, but that's 
a separate problem.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "sync"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+       fnpb.UnimplementedBeamFnWorkerStatusServer
+       response chan string
+}
+
+func (w BeamFnWorkerStatusServicer) 
mustEmbedUnimplementedBeamFnWorkerStatusServer() {
+
+}
+
+func (w BeamFnWorkerStatusServicer) WorkerStatus(b 
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+       b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+       resp, err := b.Recv()
+       if err != nil {
+               return fmt.Errorf("error receiving response b.recv: %v", err)
+       }
+       w.response <- resp.GetStatusInfo()
+       return nil
+}
+
+type serverData struct {

Review comment:
       It doesn't seem like you need this type. Consider just having setUp be a 
method instead, or even a method on your BeamFnWorkerStatusServicer type 
instead.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "sync"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+       fnpb.UnimplementedBeamFnWorkerStatusServer
+       response chan string
+}
+
+func (w BeamFnWorkerStatusServicer) 
mustEmbedUnimplementedBeamFnWorkerStatusServer() {
+
+}
+
+func (w BeamFnWorkerStatusServicer) WorkerStatus(b 
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+       b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+       resp, err := b.Recv()
+       if err != nil {
+               return fmt.Errorf("error receiving response b.recv: %v", err)
+       }
+       w.response <- resp.GetStatusInfo()
+       return nil
+}
+
+type serverData struct {
+       server *grpc.Server
+}
+
+func (s *serverData) setUp(port string, srv *BeamFnWorkerStatusServicer) {
+       l, err := net.Listen("tcp", ":9000")
+       if err != nil {
+               log.Fatalf("failed to listen on port 9000 %v", err)
+       }
+
+       fnpb.RegisterBeamFnWorkerStatusServer(s.server, srv)
+       if err := s.server.Serve(l); err != nil {
+               log.Fatalf("failed to serve: %v", err)
+       }
+}
+
+func TestSendStatusResponse(t *testing.T) {
+       server := &serverData{server: grpc.NewServer()}
+       srv := BeamFnWorkerStatusServicer{response: make(chan string)}
+       go server.setUp("9000", &srv)
+       defer server.server.Stop()

Review comment:
       In tests, prefer t.Cleanup(...) to defer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to