shunping commented on code in PR #36927:
URL: https://github.com/apache/beam/pull/36927#discussion_r2583011547


##########
sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go:
##########
@@ -173,18 +175,40 @@ type tsElementEvent struct {
 // Execute this ElementEvent by routing pending element to their consuming 
stages.
 func (ev tsElementEvent) Execute(em *ElementManager) {
        t := em.testStreamHandler.tagState[ev.Tag]
+       if t.pcollection == "" {
+               panic(fmt.Sprintf("TestStream tag %q not found in tagState", 
ev.Tag))
+       }
+       info, ok := em.pcolInfo[t.pcollection]
+       if !ok {
+               panic(fmt.Sprintf("PColInfo not registered for TestStream 
output PCollection %q (tag %q)", t.pcollection, ev.Tag))
+       }
 
        var pending []element
        for _, e := range ev.Elements {
+               if len(e.Encoded) == 0 {
+                       panic(fmt.Sprintf("TestStream: empty encoded element 
for tag %q", ev.Tag))
+               }
+               buf := bytes.NewBuffer(e.Encoded)
+               elmBytes := info.EDec(buf)
+               if len(elmBytes) == 0 {
+                       panic(fmt.Sprintf("TestStream: decoded element bytes 
are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
+               }
+               
+               var keyBytes []byte
+               if info.KeyDec != nil {

Review Comment:
   Unfortunately, KeyDec will always be nil in the current code because we 
override the coder of the pcollection. Specifically, we length-prefix the coder 
for this pcollection:
   
   
https://github.com/apache/beam/blob/e3afe6207d74d481886e1c7b0d78db4d9fb59ecf/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L315
 
   
   
   Therefore, the coder will be a lp coder and we won't get a key decoder in 
the following line.
   
https://github.com/apache/beam/blob/e3afe6207d74d481886e1c7b0d78db4d9fb59ecf/sdks/go/pkg/beam/runners/prism/internal/execute.go#L218
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to