shunping commented on code in PR #36424:
URL: https://github.com/apache/beam/pull/36424#discussion_r2414591347


##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -285,13 +285,25 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                        //slog.Warn("teststream bytes", 
"value", string(v), "bytes", v)
                                        return v
                                }
-                               // Hack for Java Strings in test stream, since 
it doesn't encode them correctly.
-                               forceLP := cID == "StringUtf8Coder" || cID != 
pyld.GetCoderId()
+                               // If the TestStream coder needs to be LP'ed or 
if it is a coder that has different
+                               // behaviors between nested context and outer 
context (in Java SDK), then we must
+                               // LP this coder and the TestStream data 
elements.
+                               forceLP := cID != pyld.GetCoderId() ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderStringUTF8 ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderBytes ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderKV
                                if forceLP {
                                        // slog.Warn("recoding 
TestStreamValue", "cID", cID, "newUrn", coders[cID].GetSpec().GetUrn(), 
"payloadCoder", pyld.GetCoderId(), "oldUrn", 
coders[pyld.GetCoderId()].GetSpec().GetUrn())
                                        // The coder needed length prefixing. 
For simplicity, add a length prefix to each
                                        // encoded element, since we will be 
sending a length prefixed coder to consume
                                        // this anyway. This is simpler than 
trying to find all the re-written coders after the fact.
+                                       // This also adds a LP-coder for the 
original coder in comps.

Review Comment:
   Will handle that in a follow-up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to