lostluck commented on code in PR #36927:
URL: https://github.com/apache/beam/pull/36927#discussion_r2582822414


##########
sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go:
##########
@@ -173,18 +175,40 @@ type tsElementEvent struct {
 // Execute this ElementEvent by routing pending element to their consuming 
stages.
 func (ev tsElementEvent) Execute(em *ElementManager) {
        t := em.testStreamHandler.tagState[ev.Tag]
+       if t.pcollection == "" {
+               panic(fmt.Sprintf("TestStream tag %q not found in tagState", 
ev.Tag))
+       }
+       info, ok := em.pcolInfo[t.pcollection]
+       if !ok {
+               panic(fmt.Sprintf("PColInfo not registered for TestStream 
output PCollection %q (tag %q)", t.pcollection, ev.Tag))
+       }
 
        var pending []element
        for _, e := range ev.Elements {
+               if len(e.Encoded) == 0 {
+                       panic(fmt.Sprintf("TestStream: empty encoded element 
for tag %q", ev.Tag))
+               }
+               buf := bytes.NewBuffer(e.Encoded)
+               elmBytes := info.EDec(buf)
+               if len(elmBytes) == 0 {
+                       panic(fmt.Sprintf("TestStream: decoded element bytes 
are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
+               }
+               
+               var keyBytes []byte
+               if info.KeyDec != nil {
+                       kbuf := bytes.NewBuffer(elmBytes)
+                       keyBytes = info.KeyDec(kbuf)
+               }
+               
                pending = append(pending, element{
                        window:    window.GlobalWindow{},
                        timestamp: e.EventTime,
-                       elmBytes:  e.Encoded,
+                       elmBytes:  elmBytes,
+                       keyBytes:  keyBytes,
                        pane:      typex.NoFiringPane(),
                })
        }
 
-       // Update the consuming state.

Review Comment:
   Please keep the comment unless there's a good reason to remove it. It 
indicates intent of the following code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to