shunping commented on code in PR #36927:
URL: https://github.com/apache/beam/pull/36927#discussion_r2583016760


##########
sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go:
##########
@@ -173,18 +175,40 @@ type tsElementEvent struct {
 // Execute this ElementEvent by routing pending element to their consuming 
stages.
 func (ev tsElementEvent) Execute(em *ElementManager) {
        t := em.testStreamHandler.tagState[ev.Tag]
+       if t.pcollection == "" {
+               panic(fmt.Sprintf("TestStream tag %q not found in tagState", 
ev.Tag))
+       }
+       info, ok := em.pcolInfo[t.pcollection]
+       if !ok {
+               panic(fmt.Sprintf("PColInfo not registered for TestStream 
output PCollection %q (tag %q)", t.pcollection, ev.Tag))
+       }
 
        var pending []element
        for _, e := range ev.Elements {
+               if len(e.Encoded) == 0 {
+                       panic(fmt.Sprintf("TestStream: empty encoded element 
for tag %q", ev.Tag))
+               }
+               buf := bytes.NewBuffer(e.Encoded)
+               elmBytes := info.EDec(buf)
+               if len(elmBytes) == 0 {
+                       panic(fmt.Sprintf("TestStream: decoded element bytes 
are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
+               }
+               
+               var keyBytes []byte
+               if info.KeyDec != nil {

Review Comment:
   I think one possible solution for this is not to LP the entire kv coder 
during preprocessing, but only the value coder part. 
   
   In this case, we will keep a KV coder for this pcollection, then a fix like 
the current one can pick up the key decoder.
   
   WDYT @lostluck ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to