lostluck commented on code in PR #33438:
URL: https://github.com/apache/beam/pull/33438#discussion_r1901108093
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -710,3 +660,130 @@ func (wk *W) MonitoringMetadata(ctx context.Context,
unknownIDs []string) *fnpb.
},
}).GetMonitoringInfos()
}
+
+type MultiplexW struct {
Review Comment:
Please add a comment about the purpose of MultiplexW.
If only because we'll get a lint comment about it on importing the code.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -76,51 +71,15 @@ type W struct {
mu sync.Mutex
activeInstructions map[string]controlResponder // Active
instructions keyed by InstructionID
Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages
keyed by PBDID
+ mw *MultiplexW
Review Comment:
Please move `mw` to the higher block, possibly right under the Unimplemented
interface embeddings. Consider renaming it `parentPool`, which is how it's
related to this W.
Brevity in field and variable names is up to convention and context. In this
case, a slightly longer name is better since it's only used in two places. See
https://go.dev/talks/2014/readability.slide#24 for further guidance.
Positioned where it is, it's implied that accessing `mw` is protected by the
mutex `mu`. This pattern is known as a `Mutex Hat`. See
https://go.dev/talks/2014/readability.slide#21.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -710,3 +660,130 @@ func (wk *W) MonitoringMetadata(ctx context.Context,
unknownIDs []string) *fnpb.
},
}).GetMonitoringInfos()
}
+
+type MultiplexW struct {
+ fnpb.UnimplementedBeamFnControlServer
+ fnpb.UnimplementedBeamFnDataServer
+ fnpb.UnimplementedBeamFnStateServer
+ fnpb.UnimplementedBeamFnLoggingServer
+ fnpb.UnimplementedProvisionServiceServer
+
+ mu sync.Mutex
+ endpoint string
+ logger *slog.Logger
+ pool map[string]*W
+}
+
+// New instantiates a new MultiplexW for multiplexing FnAPI requests to a W.
+func New(lis net.Listener, g *grpc.Server, logger *slog.Logger) *MultiplexW {
+ _, p, _ := net.SplitHostPort(lis.Addr().String())
+ mw := &MultiplexW{
Review Comment:
Note that in *this* context, `mw` is a good name, it's used many times in a
short context, meaning a shorter name is more readable. The scope is just
barely a dozen lines long.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -87,19 +140,22 @@ func serveTestWorker(t *testing.T) (context.Context, *W,
*grpc.ClientConn) {
ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
- w := New("test", "testEnv")
- lis := bufconn.Listen(2048)
- w.lis = lis
Review Comment:
I will note that we used bufconn here to avoid actually using a network port
during the unit tests. Not a hard blocker, but it would be nice to try to
maintain that instead of using a new port every time.
What lead you to stop using the buffered connection?
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -710,3 +660,130 @@ func (wk *W) MonitoringMetadata(ctx context.Context,
unknownIDs []string) *fnpb.
},
}).GetMonitoringInfos()
}
+
+type MultiplexW struct {
+ fnpb.UnimplementedBeamFnControlServer
+ fnpb.UnimplementedBeamFnDataServer
+ fnpb.UnimplementedBeamFnStateServer
+ fnpb.UnimplementedBeamFnLoggingServer
+ fnpb.UnimplementedProvisionServiceServer
+
+ mu sync.Mutex
+ endpoint string
+ logger *slog.Logger
+ pool map[string]*W
+}
+
+// New instantiates a new MultiplexW for multiplexing FnAPI requests to a W.
+func New(lis net.Listener, g *grpc.Server, logger *slog.Logger) *MultiplexW {
Review Comment:
Naming is one of the hardest problems in computer science.
Here's the current rundown:
Since the package is named `worker` naming the type `Worker` leads to
stuttering when using the type elsewhere, which is visually unappealing. Hence
why we named the original abstraction type `W`.
Since external workers would be calling the function qualified with the
package name, we used a shorter name for it, so callers would simply be
`worker.New`, instead of `worker.NewWorker`. (This avoids the stuttering.)
But now this function is not creating a direct worker analog, but something
else. So instead I'd recommend a new name: `NewMultiplexer`. This tells users
that they aren't getting a worker directly, but instead a more complex object.
##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -86,33 +87,6 @@ func RunPipeline(j *jobservices.Job) {
j.Done()
}
-// makeWorker creates a worker for that environment.
-func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
- wk := worker.New(j.String()+"_"+env, env)
-
- wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
- wk.PipelineOptions = j.PipelineOptions()
- wk.JobKey = j.JobKey()
- wk.ArtifactEndpoint = j.ArtifactEndpoint()
-
- go wk.Serve()
-
- if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
- return nil, fmt.Errorf("failed to start environment %v for job
%v: %w", env, j, err)
- }
- // Check for connection succeeding after we've created the environment
successfully.
- timeout := 1 * time.Minute
- time.AfterFunc(timeout, func() {
- if wk.Connected() || wk.Stopped() {
- return
- }
- err := fmt.Errorf("prism %v didn't get control connection to %v
after %v", wk, wk.Endpoint(), timeout)
- j.Failed(err)
- j.CancelFn(err)
- })
Review Comment:
We may still want this block of code moved to the MakeWorker method. This
checks that the SDK has connected to the worker and serves as a liveness check
for the SDK.
If we don't get this connection, the job *must* not hang forever. It's
better that prism fails the job than waits forever, since it indicates there's
something weird/bad going on in the SDK side.
##########
sdks/python/test-suites/portable/common.gradle:
##########
@@ -206,7 +206,10 @@ def createPrismRunnerTestTask(String workerType) {
def taskName = "prismCompatibilityMatrix${workerType}"
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
- def options = "--prism_bin=${prismBin} --environment_type=${workerType}"
+ def options = "--prism_bin=${prismBin}"
+ if (workerType != 'LOOPBACK') {
+ options += " --environment_type=${workerType}"
Review Comment:
Is this change because something weird was going on when environment type
was "docker"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]