shunping commented on code in PR #36927:
URL: https://github.com/apache/beam/pull/36927#discussion_r2583016760
##########
sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go:
##########
@@ -173,18 +175,40 @@ type tsElementEvent struct {
// Execute this ElementEvent by routing pending element to their consuming
stages.
func (ev tsElementEvent) Execute(em *ElementManager) {
t := em.testStreamHandler.tagState[ev.Tag]
+ if t.pcollection == "" {
+ panic(fmt.Sprintf("TestStream tag %q not found in tagState",
ev.Tag))
+ }
+ info, ok := em.pcolInfo[t.pcollection]
+ if !ok {
+ panic(fmt.Sprintf("PColInfo not registered for TestStream
output PCollection %q (tag %q)", t.pcollection, ev.Tag))
+ }
var pending []element
for _, e := range ev.Elements {
+ if len(e.Encoded) == 0 {
+ panic(fmt.Sprintf("TestStream: empty encoded element
for tag %q", ev.Tag))
+ }
+ buf := bytes.NewBuffer(e.Encoded)
+ elmBytes := info.EDec(buf)
+ if len(elmBytes) == 0 {
+ panic(fmt.Sprintf("TestStream: decoded element bytes
are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
+ }
+
+ var keyBytes []byte
+ if info.KeyDec != nil {
Review Comment:
I think one possible solution for this is not to LP the entire kv coder
during preprocessing, but only the value coder part.
In this case, we will keep a KV coder for this pcollection, then a fix like
the current one can pick up the key decoder.
WDYT @lostluck ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]