shunping commented on code in PR #36927:
URL: https://github.com/apache/beam/pull/36927#discussion_r2583011547
##########
sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go:
##########
@@ -173,18 +175,40 @@ type tsElementEvent struct {
// Execute this ElementEvent by routing pending element to their consuming
stages.
func (ev tsElementEvent) Execute(em *ElementManager) {
t := em.testStreamHandler.tagState[ev.Tag]
+ if t.pcollection == "" {
+ panic(fmt.Sprintf("TestStream tag %q not found in tagState",
ev.Tag))
+ }
+ info, ok := em.pcolInfo[t.pcollection]
+ if !ok {
+ panic(fmt.Sprintf("PColInfo not registered for TestStream
output PCollection %q (tag %q)", t.pcollection, ev.Tag))
+ }
var pending []element
for _, e := range ev.Elements {
+ if len(e.Encoded) == 0 {
+ panic(fmt.Sprintf("TestStream: empty encoded element
for tag %q", ev.Tag))
+ }
+ buf := bytes.NewBuffer(e.Encoded)
+ elmBytes := info.EDec(buf)
+ if len(elmBytes) == 0 {
+ panic(fmt.Sprintf("TestStream: decoded element bytes
are empty for tag %q, encoded length: %d", ev.Tag, len(e.Encoded)))
+ }
+
+ var keyBytes []byte
+ if info.KeyDec != nil {
Review Comment:
Unfortunately, KeyDec will always be nil in the current code because we
override the coder of the pcollection. Specifically, we length-prefix the coder
for the output pcollection of TestStream:
https://github.com/apache/beam/blob/e3afe6207d74d481886e1c7b0d78db4d9fb59ecf/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L315
Therefore, the coder will be a lp coder and we won't get a key decoder in
the following line.
https://github.com/apache/beam/blob/e3afe6207d74d481886e1c7b0d78db4d9fb59ecf/sdks/go/pkg/beam/runners/prism/internal/execute.go#L218
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]