johannaojeling commented on code in PR #25478:
URL: https://github.com/apache/beam/pull/25478#discussion_r1111310249


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK 
environments.
+package worker
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "net"
+       "sync"
+       "sync/atomic"
+
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+       "golang.org/x/exp/slog"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+       fnpb.UnimplementedBeamFnControlServer
+       fnpb.UnimplementedBeamFnDataServer
+       fnpb.UnimplementedBeamFnStateServer
+       fnpb.UnimplementedBeamFnLoggingServer
+
+       ID string
+
+       // Server management
+       lis    net.Listener
+       server *grpc.Server
+
+       // These are the ID sources
+       inst, bund uint64
+
+       // descs map[string]*fnpb.ProcessBundleDescriptor
+
+       InstReqs chan *fnpb.InstructionRequest
+       DataReqs chan *fnpb.Elements
+
+       mu          sync.Mutex
+       bundles     map[string]*B                            // Bundles keyed 
by InstructionID
+       Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by 
PBDID
+
+       D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+       lis, err := net.Listen("tcp", ":0")
+       if err != nil {
+               panic(fmt.Sprintf("failed to listen: %v", err))
+       }
+       var opts []grpc.ServerOption
+       wk := &W{
+               ID:     id,
+               lis:    lis,
+               server: grpc.NewServer(opts...),
+
+               InstReqs: make(chan *fnpb.InstructionRequest, 10),
+               DataReqs: make(chan *fnpb.Elements, 10),
+
+               bundles:     make(map[string]*B),
+               Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+               D: &DataService{},
+       }
+       slog.Info("Serving Worker components", slog.String("endpoint", 
wk.Endpoint()))
+       fnpb.RegisterBeamFnControlServer(wk.server, wk)
+       fnpb.RegisterBeamFnDataServer(wk.server, wk)
+       fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+       fnpb.RegisterBeamFnStateServer(wk.server, wk)
+       return wk
+}
+
+func (wk *W) Endpoint() string {
+       return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+       wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+       return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+       return slog.GroupValue(
+               slog.String("ID", wk.ID),
+               slog.String("endpoint", wk.Endpoint()),
+       )
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+       slog.Debug("stopping", "worker", wk)
+       close(wk.InstReqs)
+       close(wk.DataReqs)
+       wk.server.Stop()
+       wk.lis.Close()
+       slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+       return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+       return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+       for {
+               in, err := stream.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       slog.Error("logging.Recv", err, "worker", wk)
+                       return err
+               }
+               for _, l := range in.GetLogEntries() {
+                       if l.Severity >= minsev {
+                               // TODO: Connect to the associated Job for this 
worker instead of
+                               // logging locally for SDK side logging.
+                               slog.Log(toSlogSev(l.GetSeverity()), 
l.GetMessage(),
+                                       slog.String(slog.SourceKey, 
l.GetLogLocation()),
+                                       slog.Time(slog.TimeKey, 
l.GetTimestamp().AsTime()),
+                                       "worker", wk,
+                               )
+                       }
+               }
+       }
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+       switch sev {
+       case fnpb.LogEntry_Severity_TRACE:
+               return slog.Level(-8) //
+       case fnpb.LogEntry_Severity_DEBUG:
+               return slog.LevelDebug // -4
+       case fnpb.LogEntry_Severity_INFO:
+               return slog.LevelInfo // 0
+       case fnpb.LogEntry_Severity_NOTICE:
+               return slog.Level(2)
+       case fnpb.LogEntry_Severity_WARN:
+               return slog.LevelWarn // 4
+       case fnpb.LogEntry_Severity_ERROR:
+               return slog.LevelError // 8
+       case fnpb.LogEntry_Severity_CRITICAL:
+               return slog.Level(10)
+       }
+       return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req 
*fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) 
{
+       desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+       if !ok {
+               return nil, fmt.Errorf("descriptor %v not found", 
req.GetProcessBundleDescriptorId())
+       }
+       return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique 
instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+       done := make(chan bool)
+       go func() {
+               for {
+                       resp, err := ctrl.Recv()
+                       if err == io.EOF {
+                               slog.Debug("ctrl.Recv finished; marking done", 
"worker", wk)
+                               done <- true // means stream is finished
+                               return
+                       }
+                       if err != nil {
+                               switch status.Code(err) {
+                               case codes.Canceled: // Might ignore this all 
the time instead.
+                                       slog.Error("ctrl.Recv Canceled", err, 
"worker", wk)
+                                       done <- true // means stream is finished
+                                       return
+                               default:
+                                       slog.Error("ctrl.Recv failed", err, 
"worker", wk)
+                                       panic(err)
+                               }
+                       }
+
+                       // TODO: Do more than assume these are 
ProcessBundleResponses.
+                       wk.mu.Lock()
+                       if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+                               // TODO. Better pipeline error handling.
+                               if resp.Error != "" {
+                                       slog.Log(slog.LevelError, "ctrl.Recv 
pipeline error", slog.ErrorKey, resp.GetError())
+                                       panic(resp.GetError())
+                               }
+                               b.Resp <- resp.GetProcessBundle()
+                       } else {
+                               slog.Debug("ctrl.Recv: %v", resp)
+                       }
+                       wk.mu.Unlock()
+               }
+       }()
+
+       for req := range wk.InstReqs {
+               ctrl.Send(req)
+       }
+       slog.Debug("ctrl.Send finished waiting on done")
+       <-done
+       slog.Debug("Control done")
+       return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+       go func() {
+               for {
+                       resp, err := data.Recv()
+                       if err == io.EOF {
+                               return
+                       }
+                       if err != nil {
+                               switch status.Code(err) {
+                               case codes.Canceled:
+                                       slog.Error("data.Recv Canceled", err, 
"worker", wk)
+                                       return
+                               default:
+                                       slog.Error("data.Recv failed", err, 
"worker", wk)
+                                       panic(err)
+                               }
+                       }
+                       wk.mu.Lock()
+                       for _, d := range resp.GetData() {
+                               b, ok := wk.bundles[d.GetInstructionId()]
+                               if !ok {
+                                       slog.Info("data.Recv for unknown 
bundle", "response", resp)
+                                       continue
+                               }
+                               colID := b.SinkToPCollection[d.GetTransformId()]
+
+                               // There might not be data, eg. for side 
inputs, so we need to reconcile this elsewhere for
+                               // downstream side inputs.
+                               if len(d.GetData()) > 0 {
+                                       b.OutputData.WriteData(colID, 
d.GetData())
+                               }
+                               if d.GetIsLast() {
+                                       b.dataWait.Done()
+                               }
+                       }
+                       wk.mu.Unlock()
+               }
+       }()
+
+       for req := range wk.DataReqs {
+               if err := data.Send(req); err != nil {
+                       slog.Log(slog.LevelDebug, "data.Send error", 
slog.ErrorKey, err)
+               }
+       }
+       return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated 
via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+       responses := make(chan *fnpb.StateResponse)
+       go func() {
+               // This go routine creates all responses to state requests from 
the worker
+               // so we want to close the State handler when it's all done.
+               defer close(responses)
+               for {
+                       req, err := state.Recv()
+                       if err == io.EOF {
+                               return
+                       }
+                       if err != nil {
+                               switch status.Code(err) {
+                               case codes.Canceled:
+                                       slog.Error("state.Recv Canceled", err, 
"worker", wk)
+                                       return
+                               default:
+                                       slog.Error("state.Recv failed", err, 
"worker", wk)
+                                       panic(err)
+                               }
+                       }
+                       switch req.GetRequest().(type) {
+                       case *fnpb.StateRequest_Get:
+                               // TODO: move data handling to be pcollection 
based.
+                               b := wk.bundles[req.GetInstructionId()]
+                               key := req.GetStateKey()
+                               slog.Debug("StateRequest_Get", 
prototext.Format(req), "bundle", b)
+
+                               var data [][]byte
+                               switch key.GetType().(type) {
+                               case *fnpb.StateKey_IterableSideInput_:
+                                       ikey := key.GetIterableSideInput()
+                                       wKey := ikey.GetWindow()
+                                       var w typex.Window
+                                       if len(wKey) == 0 {
+                                               w = window.GlobalWindow{}
+                                       } else {
+                                               w, err = 
exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+                                               if err != nil {
+                                                       
panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, 
err))
+                                               }
+                                       }
+                                       winMap := 
b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+                                       var wins []typex.Window
+                                       for w := range winMap {
+                                               wins = append(wins, w)
+                                       }
+                                       slog.Debug(fmt.Sprintf("side 
input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, 
wins))
+                                       data = winMap[w]
+
+                               case *fnpb.StateKey_MultimapSideInput_:
+                                       mmkey := key.GetMultimapSideInput()
+                                       wKey := mmkey.GetWindow()
+                                       var w typex.Window
+                                       if len(wKey) == 0 {
+                                               w = window.GlobalWindow{}
+                                       } else {
+                                               w, err = 
exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+                                               if err != nil {
+                                                       
panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, 
err))
+                                               }
+                                       }
+                                       dKey := mmkey.GetKey()
+                                       winMap := 
b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+                                       var wins []typex.Window
+                                       for w := range winMap {
+                                               wins = append(wins, w)
+                                       }
+                                       slog.Debug(fmt.Sprintf("side 
input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, 
wins))
+
+                                       data = winMap[w][string(dKey)]
+
+                               default:
+                                       panic(fmt.Sprintf("unsupported StateKey 
Access type: %T: %v", key.GetType(), prototext.Format(key)))
+                               }
+
+                               // Encode the runner iterable (no length, just 
consecutive elements), and send it out.
+                               // This is also where we can handle things like 
State Backed Iterables.
+                               var buf bytes.Buffer
+                               for _, value := range data {
+                                       buf.Write(value)
+                               }
+                               responses <- &fnpb.StateResponse{
+                                       Id: req.GetId(),
+                                       Response: &fnpb.StateResponse_Get{
+                                               Get: &fnpb.StateGetResponse{
+                                                       Data: buf.Bytes(),
+                                               },
+                                       },
+                               }
+                       default:
+                               panic(fmt.Sprintf("unsupported StateRequest 
kind %T: %v", req.GetRequest(), prototext.Format(req)))
+                       }
+               }
+       }()
+       for resp := range responses {
+               if err := state.Send(resp); err != nil {
+                       slog.Error("state.Send error", err)
+               }
+       }
+       return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+       // TODO actually quick process the data to windows here as well.
+       raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+       if d.raw == nil {
+               d.raw = map[string][][]byte{}

Review Comment:
   Ok, understood. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to