This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b4edaf83bf [#24789][prism] V0 Go Direct Runner Replacement, Prism. 
(#25568)
5b4edaf83bf is described below

commit 5b4edaf83bf8abaee79670e58766c91bd203ef46
Author: Robert Burke <[email protected]>
AuthorDate: Tue Feb 21 18:48:42 2023 -0800

    [#24789][prism] V0 Go Direct Runner Replacement, Prism. (#25568)
    
    * [prism] add in execution layer
    
    * [prism] update CHANGES.md and README.md
    
    * [prism] tage and associated code to it's own file. typo fixes
    
    * [prism] register sepHarnessSdf to avoid unused.
    
    * [prism] More register + delinte
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 CHANGES.md                                         |   2 +
 sdks/go/pkg/beam/runners/prism/README.md           |   1 +
 .../pkg/beam/runners/prism/internal/coders_test.go |   6 +-
 sdks/go/pkg/beam/runners/prism/internal/execute.go | 278 +++++++++-
 .../beam/runners/prism/internal/execute_test.go    | 417 +++++++++++++++
 .../beam/runners/prism/internal/separate_test.go   | 595 +++++++++++++++++++++
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 400 ++++++++++++++
 sdks/go/pkg/beam/runners/prism/prism.go            |  48 ++
 8 files changed, 1740 insertions(+), 7 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d8f322755fb..35c14046b0b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -77,6 +77,8 @@
 * Add support for loading TorchScript models with `PytorchModelHandler`. The 
TorchScript model path can be
   passed to PytorchModelHandler using 
`torch_script_model_path=<path_to_model>`. 
([#25321](https://github.com/apache/beam/pull/25321))
 * The Go SDK now requires Go 1.19 to build. 
([#25545](https://github.com/apache/beam/pull/25545))
+* The Go SDK now has an initial native Go implementation of a portable Beam 
Runner called Prism. ([#24789](https://github.com/apache/beam/pull/24789))
+  * For more details and current state see 
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism.
 
 ## Breaking Changes
 
diff --git a/sdks/go/pkg/beam/runners/prism/README.md 
b/sdks/go/pkg/beam/runners/prism/README.md
index 1e91a3d64f8..0fc6e6e6841 100644
--- a/sdks/go/pkg/beam/runners/prism/README.md
+++ b/sdks/go/pkg/beam/runners/prism/README.md
@@ -29,6 +29,7 @@ It's intended to replace the current Go Direct runner, but 
also be for general
 single machine use.
 
 For Go SDK users:
+  - `import "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"`
   - Short term: set runner to "prism" to use it, or invoke directly.
   - Medium term: switch the default from "direct" to "prism".
   - Long term: alias "direct" to "prism", and delete legacy Go direct runner.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
index ad6e3649628..c6e32c895fe 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
@@ -334,7 +334,7 @@ func Test_pullDecoder(t *testing.T) {
                                ComponentCoderIds: []string{"elm"},
                        },
                        map[string]*pipepb.Coder{
-                               "elm": &pipepb.Coder{
+                               "elm": {
                                        Spec: &pipepb.FunctionSpec{
                                                Urn: urns.CoderVarInt,
                                        },
@@ -350,12 +350,12 @@ func Test_pullDecoder(t *testing.T) {
                                ComponentCoderIds: []string{"key", "value"},
                        },
                        map[string]*pipepb.Coder{
-                               "key": &pipepb.Coder{
+                               "key": {
                                        Spec: &pipepb.FunctionSpec{
                                                Urn: urns.CoderVarInt,
                                        },
                                },
-                               "value": &pipepb.Coder{
+                               "value": {
                                        Spec: &pipepb.FunctionSpec{
                                                Urn: urns.CoderBool,
                                        },
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 7c979ebf730..2329a43d214 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -16,15 +16,112 @@
 package internal
 
 import (
+       "context"
+       "fmt"
+       "io"
+       "sort"
+
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
        
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+       "golang.org/x/exp/maps"
+       "golang.org/x/exp/slog"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/proto"
 )
 
-// stage represents a fused subgraph.
-// temporary implementation to break up PRs.
-type stage struct {
-       transforms []string
+// RunPipeline starts the main thread fo executing this job.
+// It's analoguous to the manager side process for a distributed pipeline.
+// It will begin "workers"
+func RunPipeline(j *jobservices.Job) {
+       j.SendMsg("starting " + j.String())
+       j.Start()
+
+       // In a "proper" runner, we'd iterate through all the
+       // environments, and start up docker containers, but
+       // here, we only want and need the go one, operating
+       // in loopback mode.
+       env := "go"
+       wk := worker.New(env) // Cheating by having the worker id match the 
environment id.
+       go wk.Serve()
+
+       // When this function exits, we
+       defer func() {
+               j.CancelFn()
+       }()
+       go runEnvironment(j.RootCtx, j, env, wk)
+
+       j.SendMsg("running " + j.String())
+       j.Running()
+
+       executePipeline(j.RootCtx, wk, j)
+       j.SendMsg("pipeline completed " + j.String())
+
+       // Stop the worker.
+       wk.Stop()
+
+       j.SendMsg("terminating " + j.String())
+       j.Done()
+}
+
+// TODO move environment handling to the worker package.
+
+func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk 
*worker.W) {
+       // TODO fix broken abstraction.
+       // We're starting a worker pool here, because that's the loopback 
environment.
+       // It's sort of a mess, largely because of loopback, which has
+       // a different flow from a provisioned docker container.
+       e := j.Pipeline.GetComponents().GetEnvironments()[env]
+       switch e.GetUrn() {
+       case urns.EnvExternal:
+               ep := &pipepb.ExternalPayload{}
+               if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), 
ep); err != nil {
+                       slog.Error("unmarshing environment payload", err, 
slog.String("envID", wk.ID))
+               }
+               externalEnvironment(ctx, ep, wk)
+               slog.Info("environment stopped", slog.String("envID", 
wk.String()), slog.String("job", j.String()))
+       default:
+               panic(fmt.Sprintf("environment %v with urn %v unimplemented", 
env, e.GetUrn()))
+       }
+}
+
+func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk 
*worker.W) {
+       conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if err != nil {
+               panic(fmt.Sprintf("unable to dial sdk worker %v: %v", 
ep.GetEndpoint().GetUrl(), err))
+       }
+       defer conn.Close()
+       pool := fnpb.NewBeamFnExternalWorkerPoolClient(conn)
+
+       endpoint := &pipepb.ApiServiceDescriptor{
+               Url: wk.Endpoint(),
+       }
+
+       pool.StartWorker(ctx, &fnpb.StartWorkerRequest{
+               WorkerId:          wk.ID,
+               ControlEndpoint:   endpoint,
+               LoggingEndpoint:   endpoint,
+               ArtifactEndpoint:  endpoint,
+               ProvisionEndpoint: endpoint,
+               Params:            nil,
+       })
+
+       // Job processing happens here, but orchestrated by other goroutines
+       // This goroutine blocks until the context is cancelled, signalling
+       // that the pool runner should stop the worker.
+       <-ctx.Done()
+
+       // Previous context cancelled so we need a new one
+       // for this request.
+       pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{
+               WorkerId: wk.ID,
+       })
 }
 
 type transformExecuter interface {
@@ -32,3 +129,176 @@ type transformExecuter interface {
        ExecuteWith(t *pipepb.PTransform) string
        ExecuteTransform(tid string, t *pipepb.PTransform, comps 
*pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
 }
+
+type processor struct {
+       transformExecuters map[string]transformExecuter
+}
+
+func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
+       pipeline := j.Pipeline
+       comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components)
+
+       // TODO, configure the preprocessor from pipeline options.
+       // Maybe change these returns to a single struct for convenience and 
further
+       // annotation?
+
+       handlers := []any{
+               Combine(CombineCharacteristic{EnableLifting: true}),
+               ParDo(ParDoCharacteristic{DisableSDF: true}),
+               Runner(RunnerCharacteristic{
+                       SDKFlatten: false,
+               }),
+       }
+
+       proc := processor{
+               transformExecuters: map[string]transformExecuter{},
+       }
+
+       var preppers []transformPreparer
+       for _, h := range handlers {
+               if th, ok := h.(transformPreparer); ok {
+                       preppers = append(preppers, th)
+               }
+               if th, ok := h.(transformExecuter); ok {
+                       for _, urn := range th.ExecuteUrns() {
+                               proc.transformExecuters[urn] = th
+                       }
+               }
+       }
+
+       prepro := newPreprocessor(preppers)
+
+       topo := prepro.preProcessGraph(comps)
+       ts := comps.GetTransforms()
+
+       em := engine.NewElementManager(engine.Config{})
+
+       // TODO move this loop and code into the preprocessor instead.
+       stages := map[string]*stage{}
+       var impulses []string
+       for i, stage := range topo {
+               if len(stage.transforms) != 1 {
+                       panic(fmt.Sprintf("unsupported stage[%d]: contains 
multiple transforms: %v; TODO: implement fusion", i, stage.transforms))
+               }
+               tid := stage.transforms[0]
+               t := ts[tid]
+               urn := t.GetSpec().GetUrn()
+               stage.exe = proc.transformExecuters[urn]
+
+               // Stopgap until everythinng's moved to handlers.
+               stage.envID = t.GetEnvironmentId()
+               if stage.exe != nil {
+                       stage.envID = stage.exe.ExecuteWith(t)
+               }
+               stage.ID = wk.NextStage()
+
+               switch stage.envID {
+               case "": // Runner Transforms
+
+                       var onlyOut string
+                       for _, out := range t.GetOutputs() {
+                               onlyOut = out
+                       }
+                       stage.OutputsToCoders = map[string]engine.PColInfo{}
+                       coders := map[string]*pipepb.Coder{}
+                       makeWindowedValueCoder(onlyOut, comps, coders)
+
+                       col := comps.GetPcollections()[onlyOut]
+                       ed := collectionPullDecoder(col.GetCoderId(), coders, 
comps)
+                       wDec, wEnc := getWindowValueCoders(comps, col, coders)
+
+                       stage.OutputsToCoders[onlyOut] = engine.PColInfo{
+                               GlobalID: onlyOut,
+                               WDec:     wDec,
+                               WEnc:     wEnc,
+                               EDec:     ed,
+                       }
+
+                       // There's either 0, 1 or many inputs, but they should 
be all the same
+                       // so break after the first one.
+                       for _, global := range t.GetInputs() {
+                               col := comps.GetPcollections()[global]
+                               ed := collectionPullDecoder(col.GetCoderId(), 
coders, comps)
+                               wDec, wEnc := getWindowValueCoders(comps, col, 
coders)
+                               stage.inputInfo = engine.PColInfo{
+                                       GlobalID: global,
+                                       WDec:     wDec,
+                                       WEnc:     wEnc,
+                                       EDec:     ed,
+                               }
+                               break
+                       }
+
+                       switch urn {
+                       case urns.TransformGBK:
+                               em.AddStage(stage.ID, 
[]string{getOnlyValue(t.GetInputs())}, nil, 
[]string{getOnlyValue(t.GetOutputs())})
+                               for _, global := range t.GetInputs() {
+                                       col := comps.GetPcollections()[global]
+                                       ed := 
collectionPullDecoder(col.GetCoderId(), coders, comps)
+                                       wDec, wEnc := 
getWindowValueCoders(comps, col, coders)
+                                       stage.inputInfo = engine.PColInfo{
+                                               GlobalID: global,
+                                               WDec:     wDec,
+                                               WEnc:     wEnc,
+                                               EDec:     ed,
+                                       }
+                               }
+                               em.StageAggregates(stage.ID)
+                       case urns.TransformImpulse:
+                               impulses = append(impulses, stage.ID)
+                               em.AddStage(stage.ID, nil, nil, 
[]string{getOnlyValue(t.GetOutputs())})
+                       case urns.TransformFlatten:
+                               inputs := maps.Values(t.GetInputs())
+                               sort.Strings(inputs)
+                               em.AddStage(stage.ID, inputs, nil, 
[]string{getOnlyValue(t.GetOutputs())})
+                       }
+                       stages[stage.ID] = stage
+                       wk.Descriptors[stage.ID] = stage.desc
+               case wk.ID:
+                       // Great! this is for this environment. // Broken 
abstraction.
+                       buildStage(stage, tid, t, comps, wk)
+                       stages[stage.ID] = stage
+                       slog.Debug("pipelineBuild", slog.Group("stage", 
slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
+                       outputs := maps.Keys(stage.OutputsToCoders)
+                       sort.Strings(outputs)
+                       em.AddStage(stage.ID, []string{stage.mainInputPCol}, 
stage.sides, outputs)
+               default:
+                       err := fmt.Errorf("unknown environment[%v]", 
t.GetEnvironmentId())
+                       slog.Error("Execute", err)
+                       panic(err)
+               }
+       }
+
+       // Prime the initial impulses, since we now know what consumes them.
+       for _, id := range impulses {
+               em.Impulse(id)
+       }
+
+       // Execute stages here
+       for rb := range em.Bundles(ctx, wk.NextInst) {
+               s := stages[rb.StageID]
+               s.Execute(j, wk, comps, em, rb)
+       }
+       slog.Info("pipeline done!", slog.String("job", j.String()))
+}
+
+func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, 
comps *pipepb.Components) func(io.Reader) []byte {
+       cID := lpUnknownCoders(coldCId, coders, comps.GetCoders())
+       return pullDecoder(coders[cID], coders)
+}
+
+func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, 
coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
+       ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
+       wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, 
comps.GetCoders())
+       return makeWindowCoders(coders[wcID])
+}
+
+func getOnlyValue[K comparable, V any](in map[K]V) V {
+       if len(in) != 1 {
+               panic(fmt.Sprintf("expected single value map, had %v", len(in)))
+       }
+       for _, v := range in {
+               return v
+       }
+       panic("unreachable")
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
new file mode 100644
index 00000000000..de7247486bb
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -0,0 +1,417 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+       "context"
+       "os"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
+       "github.com/apache/beam/sdks/v2/go/test/integration/primitives"
+)
+
+func initRunner(t *testing.T) {
+       t.Helper()
+       if *jobopts.Endpoint == "" {
+               s := jobservices.NewServer(0, RunPipeline)
+               *jobopts.Endpoint = s.Endpoint()
+               go s.Serve()
+               t.Cleanup(func() {
+                       *jobopts.Endpoint = ""
+                       s.Stop()
+               })
+       }
+       if !jobopts.IsLoopback() {
+               *jobopts.EnvironmentType = "loopback"
+       }
+       // Since we force loopback, avoid cross-compilation.
+       f, err := os.CreateTemp("", "dummy")
+       if err != nil {
+               t.Fatal(err)
+       }
+       t.Cleanup(func() { os.Remove(f.Name()) })
+       *jobopts.WorkerBinary = f.Name()
+}
+
+func execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
+       return universal.Execute(ctx, p)
+}
+
+func executeWithT(ctx context.Context, t *testing.T, p *beam.Pipeline) 
(beam.PipelineResult, error) {
+       t.Log("startingTest - ", t.Name())
+       return execute(ctx, p)
+}
+
+func init() {
+       // Not actually being used, but explicitly registering
+       // will avoid accidentally using a different runner for
+       // the tests if I change things later.
+       beam.RegisterRunner("testlocal", execute)
+}
+
+func TestRunner_Pipelines(t *testing.T) {
+       initRunner(t)
+
+       tests := []struct {
+               name     string
+               pipeline func(s beam.Scope)
+               metrics  func(t *testing.T, pr beam.PipelineResult)
+       }{
+               {
+                       name: "simple",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col := beam.ParDo(s, dofn1, imp)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "simple",
+                                       Want: []int{1, 2, 3},
+                               }, col)
+                       },
+               }, {
+                       name: "sequence",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               beam.Seq(s, imp, dofn1, dofn2, dofn2, dofn2, 
&int64Check{Name: "sequence", Want: []int{4, 5, 6}})
+                       },
+               }, {
+                       name: "gbk",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col := beam.ParDo(s, dofnKV, imp)
+                               gbk := beam.GroupByKey(s, col)
+                               beam.Seq(s, gbk, dofnGBK, &int64Check{Name: 
"gbk", Want: []int{9, 12}})
+                       },
+               }, {
+                       name: "gbk2",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col := beam.ParDo(s, dofnKV2, imp)
+                               gbk := beam.GroupByKey(s, col)
+                               beam.Seq(s, gbk, dofnGBK2, &stringCheck{Name: 
"gbk2", Want: []string{"aaa", "bbb"}})
+                       },
+               }, {
+                       name: "gbk3",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col := beam.ParDo(s, dofnKV3, imp)
+                               gbk := beam.GroupByKey(s, col)
+                               beam.Seq(s, gbk, dofnGBK3, &stringCheck{Name: 
"gbk3", Want: []string{"{a 1}: {a 1}"}})
+                       },
+               }, {
+                       name: "sink_nooutputs",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               beam.ParDo0(s, dofnSink, imp)
+                       },
+                       metrics: func(t *testing.T, pr beam.PipelineResult) {
+                               qr := pr.Metrics().Query(func(sr 
metrics.SingleResult) bool {
+                                       return sr.Name() == "sunk"
+                               })
+                               if got, want := qr.Counters()[0].Committed, 
int64(73); got != want {
+                                       t.Errorf("pr.Metrics.Query(Name = 
\"sunk\")).Committed = %v, want %v", got, want)
+                               }
+                       },
+               }, {
+                       name: "fork_impulse",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofn1, imp)
+                               col2 := beam.ParDo(s, dofn1, imp)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "fork check1",
+                                       Want: []int{1, 2, 3},
+                               }, col1)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "fork check2",
+                                       Want: []int{1, 2, 3},
+                               }, col2)
+                       },
+               }, {
+                       name: "fork_postDoFn",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col := beam.ParDo(s, dofn1, imp)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "fork check1",
+                                       Want: []int{1, 2, 3},
+                               }, col)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "fork check2",
+                                       Want: []int{1, 2, 3},
+                               }, col)
+                       },
+               }, {
+                       name: "fork_multipleOutputs1",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1, col2, col3, col4, col5 := beam.ParDo5(s, 
dofn1x5, imp)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col1",
+                                       Want: []int{1, 6},
+                               }, col1)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col2",
+                                       Want: []int{2, 7},
+                               }, col2)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col3",
+                                       Want: []int{3, 8},
+                               }, col3)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col4",
+                                       Want: []int{4, 9},
+                               }, col4)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col5",
+                                       Want: []int{5, 10},
+                               }, col5)
+                       },
+               }, {
+                       name: "fork_multipleOutputs2",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1, col2, col3, col4, col5 := beam.ParDo5(s, 
dofn1x5, imp)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col1",
+                                       Want: []int{1, 6},
+                               }, col1)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col2",
+                                       Want: []int{2, 7},
+                               }, col2)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col3",
+                                       Want: []int{3, 8},
+                               }, col3)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col4",
+                                       Want: []int{4, 9},
+                               }, col4)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "col5",
+                                       Want: []int{5, 10},
+                               }, col5)
+                       },
+               }, {
+                       name: "flatten",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofn1, imp)
+                               col2 := beam.ParDo(s, dofn1, imp)
+                               flat := beam.Flatten(s, col1, col2)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "flatten check",
+                                       Want: []int{1, 1, 2, 2, 3, 3},
+                               }, flat)
+                       },
+               }, {
+                       name: "sideinput_iterable_oneimpulse",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofn1, imp)
+                               sum := beam.ParDo(s, dofn2x1, imp, 
beam.SideInput{Input: col1})
+                               beam.ParDo(s, &int64Check{
+                                       Name: "iter sideinput check",
+                                       Want: []int{6},
+                               }, sum)
+                       },
+               }, {
+                       name: "sideinput_iterable_twoimpulse",
+                       pipeline: func(s beam.Scope) {
+                               imp1 := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofn1, imp1)
+                               imp2 := beam.Impulse(s)
+                               sum := beam.ParDo(s, dofn2x1, imp2, 
beam.SideInput{Input: col1})
+                               beam.ParDo(s, &int64Check{
+                                       Name: "iter sideinput check",
+                                       Want: []int{6},
+                               }, sum)
+                       },
+               }, {
+                       name: "sideinput_iterableKV",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofnKV, imp)
+                               keys, sum := beam.ParDo2(s, dofn2x2KV, imp, 
beam.SideInput{Input: col1})
+                               beam.ParDo(s, &stringCheck{
+                                       Name: "iterKV sideinput check K",
+                                       Want: []string{"a", "a", "a", "b", "b", 
"b"},
+                               }, keys)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "iterKV sideinput check V",
+                                       Want: []int{21},
+                               }, sum)
+                       },
+               }, {
+                       name: "sideinput_iterableKV",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofnKV, imp)
+                               keys, sum := beam.ParDo2(s, dofn2x2KV, imp, 
beam.SideInput{Input: col1})
+                               beam.ParDo(s, &stringCheck{
+                                       Name: "iterKV sideinput check K",
+                                       Want: []string{"a", "a", "a", "b", "b", 
"b"},
+                               }, keys)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "iterKV sideinput check V",
+                                       Want: []int{21},
+                               }, sum)
+                       },
+               }, {
+                       name: "sideinput_multimap",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofnKV, imp)
+                               keys := filter.Distinct(s, beam.DropValue(s, 
col1))
+                               ks, sum := beam.ParDo2(s, dofnMultiMap, keys, 
beam.SideInput{Input: col1})
+                               beam.ParDo(s, &stringCheck{
+                                       Name: "multiMap sideinput check K",
+                                       Want: []string{"a", "b"},
+                               }, ks)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "multiMap sideinput check V",
+                                       Want: []int{9, 12},
+                               }, sum)
+                       },
+               }, {
+                       // Ensures topological sort is correct.
+                       name: "sideinput_2iterable",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col0 := beam.ParDo(s, dofn1, imp)
+                               col1 := beam.ParDo(s, dofn1, imp)
+                               col2 := beam.ParDo(s, dofn2, col1)
+                               sum := beam.ParDo(s, dofn3x1, col0, 
beam.SideInput{Input: col1}, beam.SideInput{Input: col2})
+                               beam.ParDo(s, &int64Check{
+                                       Name: "iter sideinput check",
+                                       Want: []int{16, 17, 18},
+                               }, sum)
+                       },
+               }, {
+                       name: "combine_perkey",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               in := beam.ParDo(s, dofn1kv, imp)
+                               keyedsum := beam.CombinePerKey(s, 
combineIntSum, in)
+                               sum := beam.DropKey(s, keyedsum)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "combine",
+                                       Want: []int{6},
+                               }, sum)
+                       },
+               }, {
+                       name: "combine_global",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               in := beam.ParDo(s, dofn1, imp)
+                               sum := beam.Combine(s, combineIntSum, in)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "combine",
+                                       Want: []int{6},
+                               }, sum)
+                       },
+               }, {
+                       name: "sdf_single_split",
+                       pipeline: func(s beam.Scope) {
+                               configs := beam.Create(s, 
SourceConfig{NumElements: 10, InitialSplits: 1})
+                               in := beam.ParDo(s, &intRangeFn{}, configs)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "sdf_single",
+                                       Want: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 
10},
+                               }, in)
+                       },
+               }, {
+                       name:     "WindowedSideInputs",
+                       pipeline: primitives.ValidateWindowedSideInputs,
+               }, {
+                       name:     "WindowSums_GBK",
+                       pipeline: primitives.WindowSums_GBK,
+               }, {
+                       name:     "WindowSums_Lifted",
+                       pipeline: primitives.WindowSums_Lifted,
+               }, {
+                       name: "ProcessContinuations_globalCombine",
+                       pipeline: func(s beam.Scope) {
+                               out := beam.ParDo(s, &selfCheckpointingDoFn{}, 
beam.Impulse(s))
+                               passert.Count(s, out, "num ints", 10)
+                       },
+               }, {
+                       name: "flatten_to_sideInput",
+                       pipeline: func(s beam.Scope) {
+                               imp := beam.Impulse(s)
+                               col1 := beam.ParDo(s, dofn1, imp)
+                               col2 := beam.ParDo(s, dofn1, imp)
+                               flat := beam.Flatten(s, col1, col2)
+                               beam.ParDo(s, &int64Check{
+                                       Name: "flatten check",
+                                       Want: []int{1, 1, 2, 2, 3, 3},
+                               }, flat)
+                               passert.NonEmpty(s, flat)
+                       },
+               },
+       }
+       // TODO: Explicit DoFn Failure case.
+       // TODO: Session windows, where some are not merged.
+
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       p, s := beam.NewPipelineWithRoot()
+                       test.pipeline(s)
+                       pr, err := executeWithT(context.Background(), t, p)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if test.metrics != nil {
+                               test.metrics(t, pr)
+                       }
+               })
+       }
+}
+
+func TestRunner_Metrics(t *testing.T) {
+       initRunner(t)
+       t.Run("counter", func(t *testing.T) {
+               p, s := beam.NewPipelineWithRoot()
+               imp := beam.Impulse(s)
+               beam.ParDo(s, dofn1Counter, imp)
+               pr, err := executeWithT(context.Background(), t, p)
+               if err != nil {
+                       t.Fatal(err)
+               }
+               qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
+                       return sr.Name() == "count"
+               })
+               if got, want := qr.Counters()[0].Committed, int64(1); got != 
want {
+                       t.Errorf("pr.Metrics.Query(Name = \"count\")).Committed 
= %v, want %v", got, want)
+               }
+       })
+}
+
+// TODO: PCollection metrics tests, in particular for element counts, in multi 
transform pipelines
+// There's a doubling bug since we re-use the same pcollection IDs for the 
source & sink, and
+// don't do any re-writing.
+
+func TestMain(m *testing.M) {
+       ptest.MainWithDefault(m, "testlocal")
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/separate_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go
new file mode 100644
index 00000000000..2e96651bfe9
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/separate_test.go
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "net/http"
+       "net/rpc"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+       "golang.org/x/exp/slog"
+)
+
+// separate_test.go retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+       initRunner(t)
+
+       ws.initRPCServer()
+
+       tests := []struct {
+               name     string
+               pipeline func(s beam.Scope)
+               metrics  func(t *testing.T, pr beam.PipelineResult)
+       }{
+               {
+                       name: "ProcessContinuations_combine_globalWindow",
+                       pipeline: func(s beam.Scope) {
+                               count := 10
+                               imp := beam.Impulse(s)
+                               out := beam.ParDo(s, &sepHarnessSdfStream{
+                                       Base: sepHarnessBase{
+                                               WatcherID:         
ws.newWatcher(3),
+                                               Sleep:             time.Second,
+                                               IsSentinelEncoded: 
beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+                                               LocalService:      
ws.serviceAddress,
+                                       },
+                                       RestSize: int64(count),
+                               }, imp)
+                               passert.Count(s, out, "global num ints", count)
+                       },
+               }, {
+                       name: 
"ProcessContinuations_stepped_combine_globalWindow",
+                       pipeline: func(s beam.Scope) {
+                               count := 10
+                               imp := beam.Impulse(s)
+                               out := beam.ParDo(s, &singleStepSdfStream{
+                                       Sleep:    time.Second,
+                                       RestSize: int64(count),
+                               }, imp)
+                               passert.Count(s, out, "global stepped num 
ints", count)
+                               sum := beam.ParDo(s, dofn2x1, imp, 
beam.SideInput{Input: out})
+                               beam.ParDo(s, &int64Check{Name: "stepped", 
Want: []int{45}}, sum)
+                       },
+               }, {
+                       name: 
"ProcessContinuations_stepped_combine_fixedWindow",
+                       pipeline: func(s beam.Scope) {
+                               elms, mod := 1000, 10
+                               count := int(elms / mod)
+                               imp := beam.Impulse(s)
+                               out := beam.ParDo(s, &eventtimeSDFStream{
+                                       Sleep:    time.Second,
+                                       RestSize: int64(elms),
+                                       Mod:      int64(mod),
+                                       Fixed:    1,
+                               }, imp)
+                               windowed := beam.WindowInto(s, 
window.NewFixedWindows(time.Second*10), out)
+                               sum := stats.Sum(s, windowed)
+                               // We expect each window to be processed ASAP, 
and produced one
+                               // at a time, with the same results.
+                               beam.ParDo(s, &int64Check{Name: "single", Want: 
[]int{55}}, sum)
+                               // But we need to receive the expected number 
of identical results
+                               gsum := beam.WindowInto(s, 
window.NewGlobalWindows(), sum)
+                               passert.Count(s, gsum, "total sums", count)
+                       },
+               },
+       }
+
+       // TODO: Channel Splits
+       // TODO: SubElement/dynamic splits.
+
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       p, s := beam.NewPipelineWithRoot()
+                       test.pipeline(s)
+                       pr, err := executeWithT(context.Background(), t, p)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if test.metrics != nil {
+                               test.metrics(t, pr)
+                       }
+               })
+       }
+}
+
+func init() {
+       register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+       return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+       id                         int
+       mu                         sync.Mutex
+       sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+       return slog.GroupValue(
+               slog.Int("id", w.id),
+               slog.Int("sentinelCount", w.sentinelCount),
+               slog.Int("sentinelCap", w.sentinelCap),
+       )
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+       mu             sync.Mutex
+       nextID         int
+       lookup         map[int]*watcher
+       serviceOnce    sync.Once
+       serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methods.
+type Args struct {
+       WatcherID int
+}
+
+// Block is called once per sentinel, to indicate it will block
+// until all sentinels are blocked.
+func (ws *Watchers) Block(args *Args, _ *bool) error {
+       ws.mu.Lock()
+       defer ws.mu.Unlock()
+       w, ok := ws.lookup[args.WatcherID]
+       if !ok {
+               return fmt.Errorf("no watcher with id %v", args.WatcherID)
+       }
+       w.mu.Lock()
+       w.sentinelCount++
+       w.mu.Unlock()
+       return nil
+}
+
+// Check returns whether the sentinels are unblocked or not.
+func (ws *Watchers) Check(args *Args, unblocked *bool) error {
+       ws.mu.Lock()
+       defer ws.mu.Unlock()
+       w, ok := ws.lookup[args.WatcherID]
+       if !ok {
+               return fmt.Errorf("no watcher with id %v", args.WatcherID)
+       }
+       w.mu.Lock()
+       *unblocked = w.sentinelCount >= w.sentinelCap
+       w.mu.Unlock()
+       slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", 
args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked)
+       return nil
+}
+
+// Delay returns whether the sentinels should delay.
+// This increments the sentinel cap, and returns unblocked.
+// Intended to validate ProcessContinuation behavior.
+func (ws *Watchers) Delay(args *Args, delay *bool) error {
+       ws.mu.Lock()
+       defer ws.mu.Unlock()
+       w, ok := ws.lookup[args.WatcherID]
+       if !ok {
+               return fmt.Errorf("no watcher with id %v", args.WatcherID)
+       }
+       w.mu.Lock()
+       w.sentinelCount++
+       // Delay as long as the sentinel count is under the cap.
+       *delay = w.sentinelCount < w.sentinelCap
+       w.mu.Unlock()
+       slog.Debug("Delay: sentinel target", "watcher", w, slog.Bool("delay", 
*delay))
+       return nil
+}
+
+func (ws *Watchers) initRPCServer() {
+       ws.serviceOnce.Do(func() {
+               l, err := net.Listen("tcp", ":0")
+               if err != nil {
+                       panic(err)
+               }
+               rpc.Register(ws)
+               rpc.HandleHTTP()
+               go http.Serve(l, nil)
+               ws.serviceAddress = l.Addr().String()
+       })
+}
+
+// newWatcher starts an rpc server to manage state for watching for
+// sentinels across local machines.
+func (ws *Watchers) newWatcher(sentinelCap int) int {
+       ws.mu.Lock()
+       defer ws.mu.Unlock()
+       ws.initRPCServer()
+       if ws.lookup == nil {
+               ws.lookup = map[int]*watcher{}
+       }
+       w := &watcher{id: ws.nextID, sentinelCap: sentinelCap}
+       ws.nextID++
+       ws.lookup[w.id] = w
+       return w.id
+}
+
+// sepHarnessBase contains fields and functions that are shared by all
+// versions of the separation harness.
+type sepHarnessBase struct {
+       WatcherID         int
+       Sleep             time.Duration
+       IsSentinelEncoded beam.EncodedFunc
+       LocalService      string
+}
+
+// One connection per binary.
+var (
+       sepClientOnce sync.Once
+       sepClient     *rpc.Client
+       sepClientMu   sync.Mutex
+       sepWaitMap    map[int]chan struct{}
+)
+
+func (fn *sepHarnessBase) setup() error {
+       sepClientMu.Lock()
+       defer sepClientMu.Unlock()
+       sepClientOnce.Do(func() {
+               client, err := rpc.DialHTTP("tcp", fn.LocalService)
+               if err != nil {
+                       slog.Error("failed to dial sentinels  server", err, 
slog.String("endpoint", fn.LocalService))
+                       panic(fmt.Sprintf("dialing sentinels server %v: %v", 
fn.LocalService, err))
+               }
+               sepClient = client
+               sepWaitMap = map[int]chan struct{}{}
+       })
+
+       // Check if there's already a local channel for this id, and if not
+       // start a watcher goroutine to poll and unblock the harness when
+       // the expected number of sentinels is reached.
+       if _, ok := sepWaitMap[fn.WatcherID]; !ok {
+               return nil
+       }
+       // We need a channel to block on for this watcherID
+       // We use a channel instead of a wait group since the finished
+       // count is hosted in a different process.
+       c := make(chan struct{})
+       sepWaitMap[fn.WatcherID] = c
+       go func(id int, c chan struct{}) {
+               for {
+                       time.Sleep(time.Second * 1) // Check counts every 
second.
+                       sepClientMu.Lock()
+                       var unblock bool
+                       err := sepClient.Call("Watchers.Check", 
&Args{WatcherID: id}, &unblock)
+                       if err != nil {
+                               slog.Error("Watchers.Check: sentinels server 
error", err, slog.String("endpoint", fn.LocalService))
+                               panic("sentinel server error")
+                       }
+                       if unblock {
+                               close(c) // unblock all the local waiters.
+                               slog.Debug("sentinel target for watcher, 
unblocking", slog.Int("watcherID", id))
+                               sepClientMu.Unlock()
+                               return
+                       }
+                       slog.Debug("sentinel target for watcher not met", 
slog.Int("watcherID", id))
+                       sepClientMu.Unlock()
+               }
+       }(fn.WatcherID, c)
+       return nil
+}
+
+func (fn *sepHarnessBase) block() {
+       sepClientMu.Lock()
+       var ignored bool
+       err := sepClient.Call("Watchers.Block", &Args{WatcherID: fn.WatcherID}, 
&ignored)
+       if err != nil {
+               slog.Error("Watchers.Block error", err, slog.String("endpoint", 
fn.LocalService))
+               panic(err)
+       }
+       c := sepWaitMap[fn.WatcherID]
+       sepClientMu.Unlock()
+
+       // Block until the watcher closes the channel.
+       <-c
+}
+
+// delay inform the DoFn whether or not to return a delayed Processing 
continuation for this position.
+func (fn *sepHarnessBase) delay() bool {
+       sepClientMu.Lock()
+       defer sepClientMu.Unlock()
+       var delay bool
+       err := sepClient.Call("Watchers.Delay", &Args{WatcherID: fn.WatcherID}, 
&delay)
+       if err != nil {
+               slog.Error("Watchers.Delay error", err)
+               panic(err)
+       }
+       return delay
+}
+
+// sepHarness is a simple DoFn that blocks when reaching a sentinel.
+// It's useful for testing blocks on channel splits.
+type sepHarness struct {
+       Base sepHarnessBase
+}
+
+func (fn *sepHarness) Setup() error {
+       return fn.Base.setup()
+}
+
+func (fn *sepHarness) ProcessElement(v beam.T) beam.T {
+       if fn.Base.IsSentinelEncoded.Fn.Call([]any{v})[0].(bool) {
+               slog.Debug("blocking on sentinel", slog.Any("sentinel", v))
+               fn.Base.block()
+               slog.Debug("unblocking from sentinel", slog.Any("sentinel", v))
+       } else {
+               time.Sleep(fn.Base.Sleep)
+       }
+       return v
+}
+
+type sepHarnessSdf struct {
+       Base     sepHarnessBase
+       RestSize int64
+}
+
+func (fn *sepHarnessSdf) Setup() error {
+       return fn.Base.setup()
+}
+
+func (fn *sepHarnessSdf) CreateInitialRestriction(v beam.T) 
offsetrange.Restriction {
+       return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *sepHarnessSdf) SplitRestriction(v beam.T, r offsetrange.Restriction) 
[]offsetrange.Restriction {
+       return r.EvenSplits(2)
+}
+
+func (fn *sepHarnessSdf) RestrictionSize(v beam.T, r offsetrange.Restriction) 
float64 {
+       return r.Size()
+}
+
+func (fn *sepHarnessSdf) CreateTracker(r offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *sepHarnessSdf) ProcessElement(rt *sdf.LockRTracker, v beam.T, emit 
func(beam.T)) {
+       i := rt.GetRestriction().(offsetrange.Restriction).Start
+       for rt.TryClaim(i) {
+               if fn.Base.IsSentinelEncoded.Fn.Call([]any{i, v})[0].(bool) {
+                       slog.Debug("blocking on sentinel", 
slog.Group("sentinel", slog.Any("value", v), slog.Int64("pos", i)))
+                       fn.Base.block()
+                       slog.Debug("unblocking from sentinel", 
slog.Group("sentinel", slog.Any("value", v), slog.Int64("pos", i)))
+               } else {
+                       time.Sleep(fn.Base.Sleep)
+               }
+               emit(v)
+               i++
+       }
+}
+
+func init() {
+       register.DoFn1x1[beam.T, beam.T]((*sepHarness)(nil))
+       register.DoFn3x0[*sdf.LockRTracker, beam.T, 
func(beam.T)]((*sepHarnessSdf)(nil))
+       register.Emitter1[beam.T]()
+       register.DoFn3x1[*sdf.LockRTracker, beam.T, func(beam.T), 
sdf.ProcessContinuation]((*sepHarnessSdfStream)(nil))
+       register.DoFn3x1[*sdf.LockRTracker, beam.T, func(int64), 
sdf.ProcessContinuation]((*singleStepSdfStream)(nil))
+       register.Emitter1[int64]()
+       register.DoFn4x1[*CWE, *sdf.LockRTracker, beam.T, func(beam.EventTime, 
int64), sdf.ProcessContinuation]((*eventtimeSDFStream)(nil))
+       register.Emitter2[beam.EventTime, int64]()
+}
+
+type sepHarnessSdfStream struct {
+       Base     sepHarnessBase
+       RestSize int64
+}
+
+func (fn *sepHarnessSdfStream) Setup() error {
+       return fn.Base.setup()
+}
+
+func (fn *sepHarnessSdfStream) CreateInitialRestriction(v beam.T) 
offsetrange.Restriction {
+       return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *sepHarnessSdfStream) SplitRestriction(v beam.T, r 
offsetrange.Restriction) []offsetrange.Restriction {
+       return r.EvenSplits(2)
+}
+
+func (fn *sepHarnessSdfStream) RestrictionSize(v beam.T, r 
offsetrange.Restriction) float64 {
+       return r.Size()
+}
+
+func (fn *sepHarnessSdfStream) CreateTracker(r offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *sepHarnessSdfStream) ProcessElement(rt *sdf.LockRTracker, v beam.T, 
emit func(beam.T)) sdf.ProcessContinuation {
+       if fn.Base.IsSentinelEncoded.Fn.Call([]any{v})[0].(bool) {
+               if fn.Base.delay() {
+                       slog.Debug("delaying on sentinel", 
slog.Group("sentinel", slog.Any("value", v)))
+                       return sdf.ResumeProcessingIn(fn.Base.Sleep)
+               }
+               slog.Debug("cleared to process sentinel", 
slog.Group("sentinel", slog.Any("value", v)))
+       }
+       r := rt.GetRestriction().(offsetrange.Restriction)
+       i := r.Start
+       for rt.TryClaim(i) {
+               emit(v)
+               i++
+       }
+       return sdf.StopProcessing()
+}
+
+// singleStepSdfStream only emits a single position at a time then sleeps.
+// Stops when a restriction of size 0 is provided.
+type singleStepSdfStream struct {
+       RestSize int64
+       Sleep    time.Duration
+}
+
+func (fn *singleStepSdfStream) Setup() error {
+       return nil
+}
+
+func (fn *singleStepSdfStream) CreateInitialRestriction(v beam.T) 
offsetrange.Restriction {
+       return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *singleStepSdfStream) SplitRestriction(v beam.T, r 
offsetrange.Restriction) []offsetrange.Restriction {
+       return r.EvenSplits(2)
+}
+
+func (fn *singleStepSdfStream) RestrictionSize(v beam.T, r 
offsetrange.Restriction) float64 {
+       return r.Size()
+}
+
+func (fn *singleStepSdfStream) CreateTracker(r offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *singleStepSdfStream) ProcessElement(rt *sdf.LockRTracker, v beam.T, 
emit func(int64)) sdf.ProcessContinuation {
+       r := rt.GetRestriction().(offsetrange.Restriction)
+       i := r.Start
+       if r.Size() < 1 {
+               slog.Debug("size 0 restriction, stoping to process sentinel", 
slog.Any("value", v))
+               return sdf.StopProcessing()
+       }
+       slog.Debug("emitting element to restriction", slog.Any("value", v), 
slog.Group("restriction",
+               slog.Any("value", v),
+               slog.Float64("size", r.Size()),
+               slog.Int64("pos", i),
+       ))
+       if rt.TryClaim(i) {
+               emit(i)
+       }
+       return sdf.ResumeProcessingIn(fn.Sleep)
+}
+
+type eventtimeSDFStream struct {
+       RestSize, Mod, Fixed int64
+       Sleep                time.Duration
+}
+
+func (fn *eventtimeSDFStream) Setup() error {
+       return nil
+}
+
+func (fn *eventtimeSDFStream) CreateInitialRestriction(v beam.T) 
offsetrange.Restriction {
+       return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *eventtimeSDFStream) SplitRestriction(v beam.T, r 
offsetrange.Restriction) []offsetrange.Restriction {
+       // No split
+       return []offsetrange.Restriction{r}
+}
+
+func (fn *eventtimeSDFStream) RestrictionSize(v beam.T, r 
offsetrange.Restriction) float64 {
+       return r.Size()
+}
+
+func (fn *eventtimeSDFStream) CreateTracker(r offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *eventtimeSDFStream) ProcessElement(_ *CWE, rt *sdf.LockRTracker, v 
beam.T, emit func(beam.EventTime, int64)) sdf.ProcessContinuation {
+       r := rt.GetRestriction().(offsetrange.Restriction)
+       i := r.Start
+       if r.Size() < 1 {
+               slog.Debug("size 0 restriction, stoping to process sentinel", 
slog.Any("value", v))
+               return sdf.StopProcessing()
+       }
+       slog.Debug("emitting element to restriction", slog.Any("value", v), 
slog.Group("restriction",
+               slog.Any("value", v),
+               slog.Float64("size", r.Size()),
+               slog.Int64("pos", i),
+       ))
+       if rt.TryClaim(i) {
+               timestamp := mtime.FromMilliseconds(int64((i + 1) * 
1000)).Subtract(10 * time.Millisecond)
+               v := (i % fn.Mod) + fn.Fixed
+               emit(timestamp, v)
+       }
+       return sdf.ResumeProcessingIn(fn.Sleep)
+}
+
+func (fn *eventtimeSDFStream) InitialWatermarkEstimatorState(_ beam.EventTime, 
_ offsetrange.Restriction, _ beam.T) int64 {
+       return int64(mtime.MinTimestamp)
+}
+
+func (fn *eventtimeSDFStream) CreateWatermarkEstimator(initialState int64) 
*CWE {
+       return &CWE{Watermark: initialState}
+}
+
+func (fn *eventtimeSDFStream) WatermarkEstimatorState(e *CWE) int64 {
+       return e.Watermark
+}
+
+type CWE struct {
+       Watermark int64 // uses int64, since the SDK prevent mtime.Time from 
serialization.
+}
+
+func (e *CWE) CurrentWatermark() time.Time {
+       return mtime.Time(e.Watermark).ToTime()
+}
+
+func (e *CWE) ObserveTimestamp(ts time.Time) {
+       // We add 10 milliseconds to allow window boundaries to
+       // progress after emitting
+       e.Watermark = int64(mtime.FromTime(ts.Add(-90 * time.Millisecond)))
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
new file mode 100644
index 00000000000..39c3f5ea5ff
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+       "golang.org/x/exp/maps"
+       "golang.org/x/exp/slog"
+       "google.golang.org/protobuf/proto"
+)
+
+// stage represents a fused subgraph.
+//
+// TODO: do we guarantee that they are all
+// the same environment at this point, or
+// should that be handled later?
+type stage struct {
+       ID         string
+       transforms []string
+
+       envID            string
+       exe              transformExecuter
+       outputCount      int
+       inputTransformID string
+       mainInputPCol    string
+       inputInfo        engine.PColInfo
+       desc             *fnpb.ProcessBundleDescriptor
+       sides            []string
+       prepareSides     func(b *worker.B, tid string, watermark mtime.Time)
+
+       SinkToPCollection map[string]string
+       OutputsToCoders   map[string]engine.PColInfo
+}
+
+func (s *stage) Execute(j *jobservices.Job, wk *worker.W, comps 
*pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) {
+       tid := s.transforms[0]
+       slog.Debug("Execute: starting bundle", "bundle", rb, slog.String("tid", 
tid))
+
+       var b *worker.B
+       var send bool
+       inputData := em.InputForBundle(rb, s.inputInfo)
+       switch s.envID {
+       case "": // Runner Transforms
+               // Runner transforms are processed immeadiately.
+               b = s.exe.ExecuteTransform(tid, comps.GetTransforms()[tid], 
comps, rb.Watermark, inputData)
+               b.InstID = rb.BundleID
+               slog.Debug("Execute: runner transform", "bundle", rb, 
slog.String("tid", tid))
+       case wk.ID:
+               send = true
+               b = &worker.B{
+                       PBDID:  s.ID,
+                       InstID: rb.BundleID,
+
+                       InputTransformID: s.inputTransformID,
+
+                       // TODO Here's where we can split data for processing 
in multiple bundles.
+                       InputData: inputData,
+
+                       SinkToPCollection: s.SinkToPCollection,
+                       OutputCount:       s.outputCount,
+               }
+               b.Init()
+
+               s.prepareSides(b, s.transforms[0], rb.Watermark)
+       default:
+               err := fmt.Errorf("unknown environment[%v]", s.envID)
+               slog.Error("Execute", err)
+               panic(err)
+       }
+
+       if send {
+               slog.Debug("Execute: processing", "bundle", rb)
+               b.ProcessOn(wk) // Blocks until finished.
+       }
+       // Tentative Data is ready, commit it to the main datastore.
+       slog.Debug("Execute: commiting data", "bundle", rb, 
slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", 
maps.Keys(s.OutputsToCoders)))
+
+       resp := &fnpb.ProcessBundleResponse{}
+       if send {
+               resp = <-b.Resp
+               // Tally metrics immeadiately so they're available before
+               // pipeline termination.
+               j.ContributeMetrics(resp)
+       }
+       // TODO handle side input data properly.
+       wk.D.Commit(b.OutputData)
+       var residualData [][]byte
+       var minOutputWatermark map[string]mtime.Time
+       for _, rr := range resp.GetResidualRoots() {
+               ba := rr.GetApplication()
+               residualData = append(residualData, ba.GetElement())
+               if len(ba.GetElement()) == 0 {
+                       slog.Log(slog.LevelError, "returned empty residual 
application", "bundle", rb)
+                       panic("sdk returned empty residual application")
+               }
+               for col, wm := range ba.GetOutputWatermarks() {
+                       if minOutputWatermark == nil {
+                               minOutputWatermark = map[string]mtime.Time{}
+                       }
+                       cur, ok := minOutputWatermark[col]
+                       if !ok {
+                               cur = mtime.MaxTimestamp
+                       }
+                       minOutputWatermark[col] = 
mtime.Min(mtime.FromTime(wm.AsTime()), cur)
+               }
+       }
+       if l := len(residualData); l > 0 {
+               slog.Debug("returned empty residual application", "bundle", rb, 
slog.Int("numResiduals", l), slog.String("pcollection", s.mainInputPCol))
+       }
+       em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, 
residualData, minOutputWatermark)
+       b.OutputData = engine.TentativeData{} // Clear the data.
+}
+
+func getSideInputs(t *pipepb.PTransform) (map[string]*pipepb.SideInput, error) 
{
+       if t.GetSpec().GetUrn() != urns.TransformParDo {
+               return nil, nil
+       }
+       pardo := &pipepb.ParDoPayload{}
+       if err := 
(proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != 
nil {
+               return nil, fmt.Errorf("unable to decode ParDoPayload")
+       }
+       return pardo.GetSideInputs(), nil
+}
+
+func portFor(wInCid string, wk *worker.W) []byte {
+       sourcePort := &fnpb.RemoteGrpcPort{
+               CoderId: wInCid,
+               ApiServiceDescriptor: &pipepb.ApiServiceDescriptor{
+                       Url: wk.Endpoint(),
+               },
+       }
+       sourcePortBytes, err := proto.Marshal(sourcePort)
+       if err != nil {
+               slog.Error("bad port", err, slog.String("endpoint", 
sourcePort.ApiServiceDescriptor.GetUrl()))
+       }
+       return sourcePortBytes
+}
+
+func buildStage(s *stage, tid string, t *pipepb.PTransform, comps 
*pipepb.Components, wk *worker.W) {
+       s.inputTransformID = tid + "_source"
+
+       coders := map[string]*pipepb.Coder{}
+       transforms := map[string]*pipepb.PTransform{
+               tid: t, // The Transform to Execute!
+       }
+
+       sis, err := getSideInputs(t)
+       if err != nil {
+               slog.Error("buildStage: getSide Inputs", err, 
slog.String("transformID", tid))
+               panic(err)
+       }
+       var inputInfo engine.PColInfo
+       var sides []string
+       for local, global := range t.GetInputs() {
+               // This id is directly used for the source, but this also copies
+               // coders used by side inputs to the coders map for the bundle, 
so
+               // needs to be run for every ID.
+               wInCid := makeWindowedValueCoder(global, comps, coders)
+               _, ok := sis[local]
+               if ok {
+                       sides = append(sides, global)
+               } else {
+                       // this is the main input
+                       transforms[s.inputTransformID] = 
sourceTransform(s.inputTransformID, portFor(wInCid, wk), global)
+                       col := comps.GetPcollections()[global]
+                       ed := collectionPullDecoder(col.GetCoderId(), coders, 
comps)
+                       wDec, wEnc := getWindowValueCoders(comps, col, coders)
+                       inputInfo = engine.PColInfo{
+                               GlobalID: global,
+                               WDec:     wDec,
+                               WEnc:     wEnc,
+                               EDec:     ed,
+                       }
+               }
+               // We need to process all inputs to ensure we have all input 
coders, so we must continue.
+       }
+
+       prepareSides, err := handleSideInputs(t, comps, coders, wk)
+       if err != nil {
+               slog.Error("buildStage: handleSideInputs", err, 
slog.String("transformID", tid))
+               panic(err)
+       }
+
+       // TODO: We need a new logical PCollection to represent the source
+       // so we can avoid double counting PCollection metrics later.
+       // But this also means replacing the ID for the input in the bundle.
+       sink2Col := map[string]string{}
+       col2Coders := map[string]engine.PColInfo{}
+       for local, global := range t.GetOutputs() {
+               wOutCid := makeWindowedValueCoder(global, comps, coders)
+               sinkID := tid + "_" + local
+               col := comps.GetPcollections()[global]
+               ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
+               wDec, wEnc := getWindowValueCoders(comps, col, coders)
+               sink2Col[sinkID] = global
+               col2Coders[global] = engine.PColInfo{
+                       GlobalID: global,
+                       WDec:     wDec,
+                       WEnc:     wEnc,
+                       EDec:     ed,
+               }
+               transforms[sinkID] = sinkTransform(sinkID, portFor(wOutCid, 
wk), global)
+       }
+
+       reconcileCoders(coders, comps.GetCoders())
+
+       desc := &fnpb.ProcessBundleDescriptor{
+               Id:                  s.ID,
+               Transforms:          transforms,
+               WindowingStrategies: comps.GetWindowingStrategies(),
+               Pcollections:        comps.GetPcollections(),
+               Coders:              coders,
+               StateApiServiceDescriptor: &pipepb.ApiServiceDescriptor{
+                       Url: wk.Endpoint(),
+               },
+       }
+
+       s.desc = desc
+       s.outputCount = len(t.Outputs)
+       s.prepareSides = prepareSides
+       s.sides = sides
+       s.SinkToPCollection = sink2Col
+       s.OutputsToCoders = col2Coders
+       s.mainInputPCol = inputInfo.GlobalID
+       s.inputInfo = inputInfo
+
+       wk.Descriptors[s.ID] = s.desc
+}
+
+// handleSideInputs ensures appropriate coders are available to the bundle, 
and prepares a function to stage the data.
+func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders 
map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, tid string, 
watermark mtime.Time), error) {
+       sis, err := getSideInputs(t)
+       if err != nil {
+               return nil, err
+       }
+       var prepSides []func(b *worker.B, tid string, watermark mtime.Time)
+
+       // Get WindowedValue Coders for the transform's input and output 
PCollections.
+       for local, global := range t.GetInputs() {
+               si, ok := sis[local]
+               if !ok {
+                       continue // This is the main input.
+               }
+
+               // this is a side input
+               switch si.GetAccessPattern().GetUrn() {
+               case urns.SideInputIterable:
+                       slog.Debug("urnSideInputIterable",
+                               slog.String("sourceTransform", 
t.GetUniqueName()),
+                               slog.String("local", local),
+                               slog.String("global", global))
+                       col := comps.GetPcollections()[global]
+                       ed := collectionPullDecoder(col.GetCoderId(), coders, 
comps)
+                       wDec, wEnc := getWindowValueCoders(comps, col, coders)
+                       // May be of zero length, but that's OK. Side inputs 
can be empty.
+
+                       global, local := global, local
+                       prepSides = append(prepSides, func(b *worker.B, tid 
string, watermark mtime.Time) {
+                               data := wk.D.GetAllData(global)
+
+                               if b.IterableSideInputData == nil {
+                                       b.IterableSideInputData = 
map[string]map[string]map[typex.Window][][]byte{}
+                               }
+                               if _, ok := b.IterableSideInputData[tid]; !ok {
+                                       b.IterableSideInputData[tid] = 
map[string]map[typex.Window][][]byte{}
+                               }
+                               b.IterableSideInputData[tid][local] = 
collateByWindows(data, watermark, wDec, wEnc,
+                                       func(r io.Reader) [][]byte {
+                                               return [][]byte{ed(r)}
+                                       }, func(a, b [][]byte) [][]byte {
+                                               return append(a, b...)
+                                       })
+                       })
+
+               case urns.SideInputMultiMap:
+                       slog.Debug("urnSideInputMultiMap",
+                               slog.String("sourceTransform", 
t.GetUniqueName()),
+                               slog.String("local", local),
+                               slog.String("global", global))
+                       col := comps.GetPcollections()[global]
+
+                       kvc := comps.GetCoders()[col.GetCoderId()]
+                       if kvc.GetSpec().GetUrn() != urns.CoderKV {
+                               return nil, fmt.Errorf("multimap side inputs 
needs KV coder, got %v", kvc.GetSpec().GetUrn())
+                       }
+
+                       kd := 
collectionPullDecoder(kvc.GetComponentCoderIds()[0], coders, comps)
+                       vd := 
collectionPullDecoder(kvc.GetComponentCoderIds()[1], coders, comps)
+                       wDec, wEnc := getWindowValueCoders(comps, col, coders)
+
+                       global, local := global, local
+                       prepSides = append(prepSides, func(b *worker.B, tid 
string, watermark mtime.Time) {
+                               // May be of zero length, but that's OK. Side 
inputs can be empty.
+                               data := wk.D.GetAllData(global)
+                               if b.MultiMapSideInputData == nil {
+                                       b.MultiMapSideInputData = 
map[string]map[string]map[typex.Window]map[string][][]byte{}
+                               }
+                               if _, ok := b.MultiMapSideInputData[tid]; !ok {
+                                       b.MultiMapSideInputData[tid] = 
map[string]map[typex.Window]map[string][][]byte{}
+                               }
+                               b.MultiMapSideInputData[tid][local] = 
collateByWindows(data, watermark, wDec, wEnc,
+                                       func(r io.Reader) map[string][][]byte {
+                                               kb := kd(r)
+                                               return map[string][][]byte{
+                                                       string(kb): {vd(r)},
+                                               }
+                                       }, func(a, b map[string][][]byte) 
map[string][][]byte {
+                                               if len(a) == 0 {
+                                                       return b
+                                               }
+                                               for k, vs := range b {
+                                                       a[k] = append(a[k], 
vs...)
+                                               }
+                                               return a
+                                       })
+                       })
+               default:
+                       return nil, fmt.Errorf("local input %v (global %v) uses 
accesspattern %v", local, global, si.GetAccessPattern().GetUrn())
+               }
+       }
+       return func(b *worker.B, tid string, watermark mtime.Time) {
+               for _, prep := range prepSides {
+                       prep(b, tid, watermark)
+               }
+       }, nil
+}
+
+func sourceTransform(parentID string, sourcePortBytes []byte, outPID string) 
*pipepb.PTransform {
+       source := &pipepb.PTransform{
+               UniqueName: parentID,
+               Spec: &pipepb.FunctionSpec{
+                       Urn:     urns.TransformSource,
+                       Payload: sourcePortBytes,
+               },
+               Outputs: map[string]string{
+                       "i0": outPID,
+               },
+       }
+       return source
+}
+
+func sinkTransform(sinkID string, sinkPortBytes []byte, inPID string) 
*pipepb.PTransform {
+       source := &pipepb.PTransform{
+               UniqueName: sinkID,
+               Spec: &pipepb.FunctionSpec{
+                       Urn:     urns.TransformSink,
+                       Payload: sinkPortBytes,
+               },
+               Inputs: map[string]string{
+                       "i0": inPID,
+               },
+       }
+       return source
+}
+
+// collateByWindows takes the data and collates them into window keyed maps.
+// Uses generics to consolidate the repetitive window loops.
+func collateByWindows[T any](data [][]byte, watermark mtime.Time, wDec 
exec.WindowDecoder, wEnc exec.WindowEncoder, ed func(io.Reader) T, join func(T, 
T) T) map[typex.Window]T {
+       windowed := map[typex.Window]T{}
+       for _, datum := range data {
+               inBuf := bytes.NewBuffer(datum)
+               for {
+                       ws, _, _, err := exec.DecodeWindowedValueHeader(wDec, 
inBuf)
+                       if err == io.EOF {
+                               break
+                       }
+                       // Get the element out, and window them properly.
+                       e := ed(inBuf)
+                       for _, w := range ws {
+                               windowed[w] = join(windowed[w], e)
+                       }
+               }
+       }
+       return windowed
+}
diff --git a/sdks/go/pkg/beam/runners/prism/prism.go 
b/sdks/go/pkg/beam/runners/prism/prism.go
new file mode 100644
index 00000000000..dc78e5e6c23
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/prism.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package prism contains a local runner for running
+// pipelines in the current process. Useful for testing.
+package prism
+
+import (
+       "context"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
+)
+
+func init() {
+       beam.RegisterRunner("prism", Execute)
+       beam.RegisterRunner("PrismRunner", Execute)
+}
+
+func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, 
error) {
+       if *jobopts.Endpoint == "" {
+               // One hasn't been selected, so lets start one up and set the 
address.
+               // Conveniently, this means that if multiple pipelines are 
executed against
+               // the local runner, they will all use the same server.
+               s := jobservices.NewServer(0, internal.RunPipeline)
+               *jobopts.Endpoint = s.Endpoint()
+               go s.Serve()
+       }
+       if !jobopts.IsLoopback() {
+               *jobopts.EnvironmentType = "loopback"
+       }
+       return universal.Execute(ctx, p)
+}

Reply via email to