damondouglas commented on code in PR #30072:
URL: https://github.com/apache/beam/pull/30072#discussion_r1492794851
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -320,21 +331,27 @@ func (em *ElementManager) Bundles(ctx context.Context,
nextBundID func() string)
}
}
if len(em.inprogressBundles) == 0 &&
len(em.watermarkRefreshes) == 0 {
- v := em.livePending.Load()
- slog.Debug("Bundles: nothing in progress and no
refreshes", slog.Int64("pendingElementCount", v))
- if v > 0 {
- var stageState []string
- ids := maps.Keys(em.stages)
- sort.Strings(ids)
- for _, id := range ids {
- ss := em.stages[id]
- inW := ss.InputWatermark()
- outW := ss.OutputWatermark()
- upPCol, upW :=
ss.UpstreamWatermark()
- upS := em.pcolParents[upPCol]
- stageState = append(stageState,
fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from",
upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys",
ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds",
ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
+ nextEvent := em.testStreamHandler.NextEvent()
+ if nextEvent == nil {
+ v := em.livePending.Load()
+ slog.Debug("Bundles: nothing in
progress and no refreshes", slog.Int64("pendingElementCount", v))
+ if v > 0 {
+ var stageState []string
+ ids := maps.Keys(em.stages)
+ sort.Strings(ids)
+ for _, id := range ids {
+ ss := em.stages[id]
+ inW :=
ss.InputWatermark()
+ outW :=
ss.OutputWatermark()
+ upPCol, upW :=
ss.UpstreamWatermark()
+ upS :=
em.pcolParents[upPCol]
+ stageState =
append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW,
"upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys,
"inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle,
"holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
+ }
+ panic(fmt.Sprintf("nothing in
progress and no refreshes with non zero pending elements: %v\n%v", v,
strings.Join(stageState, "")))
}
- panic(fmt.Sprintf("nothing in progress
and no refreshes with non zero pending elements: %v\n%v", v,
strings.Join(stageState, "")))
+ } else {
+ nextEvent.Execute(em)
+ em.addPending(-1) // Decrement for the
event being processed.
}
} else if len(em.inprogressBundles) == 0 {
v := em.livePending.Load()
Review Comment:
I was wondering if we could refactor to reduce some of the nested if/else
statements. I gave it a try only to validate for myself whether it might be
done.
```
defer em.refreshCmd.L.Unlock()
if len(em.inprogressBundles) > 0 {
return
}
if len(em.watermarkRefreshes) > 0 {
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
return
}
nextEvent := em.testStreamHandler.NextEvent()
if nextEvent != null {
nextEvent.Execute(em)
em.addPending(-1) // Decrement for the event being processed.
return
}
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress and no refreshes",
slog.Int64("pendingElementCount", v))
if v > 0 {
var stageState []string
ids := maps.Keys(em.stages)
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW,
"out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey",
ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts",
ss.watermarkHoldsCounts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero
pending elements: %v\n%v", v, strings.Join(stageState, "")))
```
##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -169,3 +169,50 @@ func TestElementManagerCoverage(t *testing.T) {
})
}
}
+
+func TestTestStream(t *testing.T) {
+ initRunner(t)
+
+ tests := []struct {
+ pipeline func(s beam.Scope)
+ }{
+ {pipeline: primitives.TestStreamBoolSequence},
+ {pipeline: primitives.TestStreamByteSliceSequence},
+ {pipeline: primitives.TestStreamFloat64Sequence},
+ {pipeline: primitives.TestStreamInt64Sequence},
+ {pipeline: primitives.TestStreamInt16Sequence},
+ {pipeline: primitives.TestStreamStrings},
+ {pipeline: primitives.TestStreamTwoBoolSequences},
+ {pipeline: primitives.TestStreamTwoFloat64Sequences},
+ {pipeline: primitives.TestStreamTwoInt64Sequences},
+ {pipeline: primitives.TestStreamTwoUserTypeSequences},
+ }
+
+ configs := []struct {
+ name string
+ OneElementPerKey, OneKeyPerBundle bool
+ }{
+ {"Greedy", false, false},
+ {"AllElementsPerKey", false, true},
+ {"OneElementPerKey", true, false},
+ {"OneElementPerBundle", true, true},
+ }
+ for _, config := range configs {
+ for _, test := range tests {
+ t.Run(initTestName(test.pipeline)+"_"+config.name,
func(t *testing.T) {
+ t.Cleanup(func() {
+ engine.OneElementPerKey = false
+ engine.OneKeyPerBundle = false
+ })
+ engine.OneElementPerKey =
config.OneElementPerKey
+ engine.OneKeyPerBundle = config.OneKeyPerBundle
+ p, s := beam.NewPipelineWithRoot()
+ test.pipeline(s)
+ _, err := executeWithT(context.Background(), t,
p)
+ if err != nil {
+ t.Fatalf("pipeline failed, but feature
should be implemented in Prism: %v", err)
+ }
+ })
+ }
+ }
+}
Review Comment:
I realize we have a `primitives/teststream_test.go` but was wondering if
`TestTestStream` could be placed in a `enginge/teststream_test.go` file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]