lostluck commented on code in PR #16957:
URL: https://github.com/apache/beam/pull/16957#discussion_r842077050


##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -98,6 +111,18 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                log.Debugf(ctx, "control response channel closed")
        }()
 
+       // if the runner supports worker status api then expose SDK harness 
status
+       if statusEndpoint != "" {
+               statusHandler, err := newWorkerStatusHandler(ctx, 
statusEndpoint)
+               if err != nil {
+                       log.Errorf(ctx, "error establishing connection to 
worker status API: %v", err)
+               } else {
+                       statusHandler.wg.Add(1)

Review Comment:
   Consider calling this in the `start` method instead. It should go right 
before the `go w.reader(...)` call. It can't be called in the `reader` method 
because it's being run on a different goroutine, and there's no guarantee that 
it will run ASAP (though that is true in practice). Calling it before, ensures 
that there shouldn't be a race condition on something to wait on, before it's 
time to stop.
   
   (Unless I'm missing something of course, at which point we should have a 
comment here explaining why we need to do the wg.Add here)



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/test/bufconn"
+)
+
+type BeamFnWorkerStatusServicer struct {
+       fnpb.UnimplementedBeamFnWorkerStatusServer
+       response chan string
+       lis      *bufconn.Listener
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b 
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+       b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+       resp, err := b.Recv()
+       if err != nil {
+               return fmt.Errorf("error receiving response b.recv: %v", err)
+       }
+       w.response <- resp.GetStatusInfo()
+       return nil
+}
+
+const buffsize = 1024 * 1024
+
+var lis *bufconn.Listener
+
+func setup(t *testing.T, srv *BeamFnWorkerStatusServicer) {
+       server := grpc.NewServer()
+       lis = bufconn.Listen(buffsize)
+       fnpb.RegisterBeamFnWorkerStatusServer(server, srv)
+       go func() {
+               if err := server.Serve(lis); err != nil {
+                       log.Fatalf("failed to serve: %v", err)
+               }
+       }()
+       t.Cleanup(func() {
+               server.Stop()
+       })
+}
+
+func dialer(context.Context, string) (net.Conn, error) {
+       return lis.Dial()
+}
+
+func TestSendStatusResponse(t *testing.T) {
+       ctx := context.Background()
+       srv := &BeamFnWorkerStatusServicer{response: make(chan string)}
+       setup(t, srv)
+       conn, err := grpc.DialContext(ctx, "bufnet", 
grpc.WithContextDialer(dialer), grpc.WithInsecure())
+       if err != nil {
+               t.Fatalf("unable to start test server: %v", err)
+       }
+       statusHandler := workerStatusHandler{conn: conn}
+       statusHandler.wg.Add(1)

Review Comment:
   Don't forget to remove this line once it's been added to start.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus 
API.
+type workerStatusHandler struct {
+       conn           *grpc.ClientConn
+       shouldShutdown int32
+       wg             sync.WaitGroup
+}
+
+func newWorkerStatusHandler(ctx context.Context, endpoint string) 
(*workerStatusHandler, error) {
+       sconn, err := dial(ctx, endpoint, 60*time.Second)
+       if err != nil {
+               return nil, errors.Wrapf(err, "failed to connect: %v\n", 
endpoint)
+       }
+       return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil
+}
+
+func (w *workerStatusHandler) isAlive() bool {
+       return atomic.LoadInt32(&w.shouldShutdown) == 0
+}
+
+func (w *workerStatusHandler) shutdown() {
+       atomic.StoreInt32(&w.shouldShutdown, 1)
+}
+
+// start starts the reader to accept WorkerStatusRequest and send 
WorkerStatusResponse with WorkerStatus API.
+func (w *workerStatusHandler) start(ctx context.Context) {
+       statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+       stub, err := statusClient.WorkerStatus(ctx)
+       if err != nil {
+               log.Errorf(ctx, "status client not established: %v", err)
+               return
+       }
+       go w.reader(ctx, stub)
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed 
WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub 
fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+       defer w.wg.Done()
+       buf := make([]byte, 1<<16)
+       for w.isAlive() {
+               req, err := stub.Recv()
+               if err != nil {
+                       log.Debugf(ctx, "exiting workerStatusHandler.Reader(): 
%v", err)
+                       return
+               }
+               log.Debugf(ctx, "RECV-status: %v", req.GetId())
+               runtime.Stack(buf, true)
+               response := &fnpb.WorkerStatusResponse{Id: req.GetId(), 
StatusInfo: string(buf)}
+               if err := stub.Send(response); err != nil && err != io.EOF {
+                       log.Errorf(ctx, "workerStatus.Writer: Failed to 
respond: %v", err)
+               }
+       }
+}
+
+// stop stops the reader and closes worker status endpoint connection with the 
runner.
+func (w *workerStatusHandler) stop(ctx context.Context) {

Review Comment:
   Technically, since we're only calling this in the tests (which is fine, it's 
good to have), we should have it return the error as well as log it (just in 
case).
   
   This way the test can simply verify that the returned error is nil, and 
it'll be properly covered.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "testing"
+
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/test/bufconn"
+)
+
+type BeamFnWorkerStatusServicer struct {
+       fnpb.UnimplementedBeamFnWorkerStatusServer
+       response chan string
+       lis      *bufconn.Listener
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b 
fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+       b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+       resp, err := b.Recv()
+       if err != nil {
+               return fmt.Errorf("error receiving response b.recv: %v", err)
+       }
+       w.response <- resp.GetStatusInfo()
+       return nil
+}
+
+const buffsize = 1024 * 1024
+
+var lis *bufconn.Listener
+
+func setup(t *testing.T, srv *BeamFnWorkerStatusServicer) {
+       server := grpc.NewServer()
+       lis = bufconn.Listen(buffsize)
+       fnpb.RegisterBeamFnWorkerStatusServer(server, srv)
+       go func() {
+               if err := server.Serve(lis); err != nil {
+                       log.Fatalf("failed to serve: %v", err)
+               }
+       }()
+       t.Cleanup(func() {
+               server.Stop()
+       })
+}
+
+func dialer(context.Context, string) (net.Conn, error) {
+       return lis.Dial()
+}
+
+func TestSendStatusResponse(t *testing.T) {
+       ctx := context.Background()
+       srv := &BeamFnWorkerStatusServicer{response: make(chan string)}
+       setup(t, srv)
+       conn, err := grpc.DialContext(ctx, "bufnet", 
grpc.WithContextDialer(dialer), grpc.WithInsecure())
+       if err != nil {
+               t.Fatalf("unable to start test server: %v", err)
+       }
+       statusHandler := workerStatusHandler{conn: conn}
+       statusHandler.wg.Add(1)
+       statusHandler.start(ctx)
+       t.Cleanup(func() {
+               statusHandler.stop(ctx)

Review Comment:
   I thought I mentioned this in my previous review, but I apparently didn't.
   
   We shouldn't be calling the stop here in Cleanup. We need to have a clearer 
signal that the stop actually happens.
   
   Call it after trying the response, and then verifying that the 
`shouldShutdown` is set to 1.
   
   Effectively, because Cleanup isn't being called until *after* the test is 
finished, it's not considered part of the test. So, we need to call it inline 
with the test, and then verify it's behavior (technically by having the test 
not timeout, because otherwise the WG will block forever.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to