damondouglas commented on code in PR #30072:
URL: https://github.com/apache/beam/pull/30072#discussion_r1492794851


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -320,21 +331,27 @@ func (em *ElementManager) Bundles(ctx context.Context, 
nextBundID func() string)
                                }
                        }
                        if len(em.inprogressBundles) == 0 && 
len(em.watermarkRefreshes) == 0 {
-                               v := em.livePending.Load()
-                               slog.Debug("Bundles: nothing in progress and no 
refreshes", slog.Int64("pendingElementCount", v))
-                               if v > 0 {
-                                       var stageState []string
-                                       ids := maps.Keys(em.stages)
-                                       sort.Strings(ids)
-                                       for _, id := range ids {
-                                               ss := em.stages[id]
-                                               inW := ss.InputWatermark()
-                                               outW := ss.OutputWatermark()
-                                               upPCol, upW := 
ss.UpstreamWatermark()
-                                               upS := em.pcolParents[upPCol]
-                                               stageState = append(stageState, 
fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", 
upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", 
ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", 
ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
+                               nextEvent := em.testStreamHandler.NextEvent()
+                               if nextEvent == nil {
+                                       v := em.livePending.Load()
+                                       slog.Debug("Bundles: nothing in 
progress and no refreshes", slog.Int64("pendingElementCount", v))
+                                       if v > 0 {
+                                               var stageState []string
+                                               ids := maps.Keys(em.stages)
+                                               sort.Strings(ids)
+                                               for _, id := range ids {
+                                                       ss := em.stages[id]
+                                                       inW := 
ss.InputWatermark()
+                                                       outW := 
ss.OutputWatermark()
+                                                       upPCol, upW := 
ss.UpstreamWatermark()
+                                                       upS := 
em.pcolParents[upPCol]
+                                                       stageState = 
append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, 
"upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, 
"inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, 
"holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
+                                               }
+                                               panic(fmt.Sprintf("nothing in 
progress and no refreshes with non zero pending elements: %v\n%v", v, 
strings.Join(stageState, "")))
                                        }
-                                       panic(fmt.Sprintf("nothing in progress 
and no refreshes with non zero pending elements: %v\n%v", v, 
strings.Join(stageState, "")))
+                               } else {
+                                       nextEvent.Execute(em)
+                                       em.addPending(-1) // Decrement for the 
event being processed.
                                }
                        } else if len(em.inprogressBundles) == 0 {
                                v := em.livePending.Load()

Review Comment:
   I was wondering if we could refactor to reduce some of the nested if/else 
statements. I gave it a try only to validate for myself whether it might be 
done.
   ```
   defer em.refreshCmd.L.Unlock()
   if len(em.inprogressBundles) > 0 {
      return
   }
   if len(em.watermarkRefreshes) > 0 {
       v := em.livePending.Load()
       slog.Debug("Bundles: nothing in progress after advance",
            slog.Any("advanced", advanced),
            slog.Int("refreshCount", len(em.watermarkRefreshes)),
            slog.Int64("pendingElementCount", v),
       )
      return
   }
   
   nextEvent := em.testStreamHandler.NextEvent()
   if nextEvent != null {
       nextEvent.Execute(em)
       em.addPending(-1) // Decrement for the event being processed.
       return
   }
   
   v := em.livePending.Load()
   slog.Debug("Bundles: nothing in progress and no refreshes", 
slog.Int64("pendingElementCount", v))
   if v > 0 {
   var stageState []string
   ids := maps.Keys(em.stages)
   sort.Strings(ids)
   for _, id := range ids {
        ss := em.stages[id]
        inW := ss.InputWatermark()
        outW := ss.OutputWatermark()
        upPCol, upW := ss.UpstreamWatermark()
        upS := em.pcolParents[upPCol]
        stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, 
"out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", 
ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", 
ss.watermarkHoldsCounts))
   }
   panic(fmt.Sprintf("nothing in progress and no refreshes with non zero 
pending elements: %v\n%v", v, strings.Join(stageState, "")))
   
   
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go:
##########
@@ -169,3 +169,50 @@ func TestElementManagerCoverage(t *testing.T) {
                })
        }
 }
+
+func TestTestStream(t *testing.T) {
+       initRunner(t)
+
+       tests := []struct {
+               pipeline func(s beam.Scope)
+       }{
+               {pipeline: primitives.TestStreamBoolSequence},
+               {pipeline: primitives.TestStreamByteSliceSequence},
+               {pipeline: primitives.TestStreamFloat64Sequence},
+               {pipeline: primitives.TestStreamInt64Sequence},
+               {pipeline: primitives.TestStreamInt16Sequence},
+               {pipeline: primitives.TestStreamStrings},
+               {pipeline: primitives.TestStreamTwoBoolSequences},
+               {pipeline: primitives.TestStreamTwoFloat64Sequences},
+               {pipeline: primitives.TestStreamTwoInt64Sequences},
+               {pipeline: primitives.TestStreamTwoUserTypeSequences},
+       }
+
+       configs := []struct {
+               name                              string
+               OneElementPerKey, OneKeyPerBundle bool
+       }{
+               {"Greedy", false, false},
+               {"AllElementsPerKey", false, true},
+               {"OneElementPerKey", true, false},
+               {"OneElementPerBundle", true, true},
+       }
+       for _, config := range configs {
+               for _, test := range tests {
+                       t.Run(initTestName(test.pipeline)+"_"+config.name, 
func(t *testing.T) {
+                               t.Cleanup(func() {
+                                       engine.OneElementPerKey = false
+                                       engine.OneKeyPerBundle = false
+                               })
+                               engine.OneElementPerKey = 
config.OneElementPerKey
+                               engine.OneKeyPerBundle = config.OneKeyPerBundle
+                               p, s := beam.NewPipelineWithRoot()
+                               test.pipeline(s)
+                               _, err := executeWithT(context.Background(), t, 
p)
+                               if err != nil {
+                                       t.Fatalf("pipeline failed, but feature 
should be implemented in Prism: %v", err)
+                               }
+                       })
+               }
+       }
+}

Review Comment:
   I realize we have a `primitives/teststream_test.go` but was wondering if 
`TestTestStream` could be placed in a `enginge/teststream_test.go` file? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to