damondouglas commented on code in PR #32029:
URL: https://github.com/apache/beam/pull/32029#discussion_r1697715863


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -211,6 +211,16 @@ func (em *ElementManager) AddStage(ID string, inputIDs, 
outputIDs []string, side
        for _, input := range inputIDs {
                em.consumers[input] = append(em.consumers[input], ss.ID)
        }
+
+       // In very rare cases, we can have a stage without any inputs, such as 
a flatten.
+       // In that case, there's nothing that will start the watermark refresh 
cycle,
+       // so we must do it here.
+       if len(inputIDs) == 0 {

Review Comment:
   Non-PR blocking comment to validate my understanding. Do I understand 
correctly?
   
   In the PTransform with no PCollection case, there are no inputIDs because 
the result of `internal.transformPreparer.PrepareTransform` returns an 
`internal.prepareResult` where `prepResult.SubbedComps.GetPcollections()` is 
empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to