This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 580db936824 [Prism] LP SOME coder and its data elements for TestStream 
(#36424)
580db936824 is described below

commit 580db9368243eb8b072de99307c60ba625676157
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Oct 8 13:44:26 2025 -0400

    [Prism] LP SOME coder and its data elements for TestStream (#36424)
    
    * Always LP coder and its data elements for TestStream
    
    * Fix the failed tests in go sdk due to side input coders not setting 
correctly.
    
    * Reduce the change scope. Also avoid LP'ing an LP'ed coder..
    
    * Mention breaking changes in CHANGES.md
---
 CHANGES.md                                         |  1 +
 runners/prism/java/build.gradle                    |  5 +----
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      | 10 ++++-----
 sdks/go/pkg/beam/runners/prism/internal/coders.go  | 25 ++++++++++++++++++++++
 sdks/go/pkg/beam/runners/prism/internal/execute.go | 23 ++++++++++++++++++--
 5 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ca1a589ccd0..9aa2346941e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -86,6 +86,7 @@
 * (Python) Fixed transform naming conflict when executing DataTransform on a 
dictionary of PColls ([#30445](https://github.com/apache/beam/issues/30445)).
   This may break update compatibility if you don't provide a 
`--transform_name_mapping`.
 * Removed deprecated Hadoop versions (2.10.2 and 3.2.4) that are no longer 
supported for [Iceberg](https://github.com/apache/iceberg/issues/10940) from 
IcebergIO ([#36282](https://github.com/apache/beam/issues/36282)).
+* (Go) Coder construction on SDK side is more faithful to the specs from 
runners without stripping length-prefix. This may break streaming pipeline 
update as the underlying coder could be changed 
([#36424](https://github.com/apache/beam/pull/36424)).
 
 ## Deprecations
 
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 5e5ddbe139e..7ce4e4d9061 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -106,14 +106,11 @@ def sickbayTests = [
 
     // Prism doesn't support multiple TestStreams.
     'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
-    // Sometimes fails missing a final 'AFTER'. Otherwise, Hangs in 
ElementManager.FailBundle due to a held stageState lock.
-    'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',
 
     // GroupIntoBatchesTest tests that fail:
-    // Teststream has bad KV encodings due to using an outer context.
+    // Wrong number of elements in windows after GroupIntoBatches.
     'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
-    // sdk worker disconnected
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
     // ShardedKey not yet implemented.
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 99ca5517d3d..2b769c873ec 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -266,9 +266,9 @@ func (b *CoderUnmarshaller) makeCoder(id string, c 
*pipepb.Coder) (*coder.Coder,
 
                // No payload means this coder was length prefixed by the runner
                // but is likely self describing - AKA a beam coder.
-               if len(sub.GetSpec().GetPayload()) == 0 {
-                       return b.makeCoder(components[0], sub)
-               }
+               // if len(sub.GetSpec().GetPayload()) == 0 {
+               //      return b.makeCoder(components[0], sub)
+               // }
                // TODO(lostluck) 2018/10/17: Make this strict again, once 
dataflow can use
                // the portable pipeline model directly (BEAM-2885)
                switch u := sub.GetSpec().GetUrn(); u {
@@ -285,8 +285,8 @@ func (b *CoderUnmarshaller) makeCoder(id string, c 
*pipepb.Coder) (*coder.Coder,
                        t := typex.New(custom.Type)
                        cc := &coder.Coder{Kind: coder.Custom, T: t, Custom: 
custom}
                        return cc, nil
-               case urnBytesCoder, urnStringCoder: // implicitly length 
prefixed types.
-                       return b.makeCoder(components[0], sub)
+               // case urnBytesCoder, urnStringCoder: // implicitly length 
prefixed types.
+               //      return b.makeCoder(components[0], sub)
                default:
                        // Handle Length prefixing dictated by the runner.
                        cc, err := b.makeCoder(components[0], sub)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index 9b8e0fe731b..885d0eeef43 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -198,6 +198,31 @@ func lpUnknownCoders(cID string, bundle, base 
map[string]*pipepb.Coder) (string,
        return cID, nil
 }
 
+// forceLpCoder always add a new LP-coder for a given coder into the "base" map
+func forceLpCoder(cID string, base map[string]*pipepb.Coder) (string, error) {
+       // First check if we've already added the LP version of this coder to 
coders already.
+       lpcID := cID + "_flp"
+       // Check if we've done this one before.
+       if _, ok := base[lpcID]; ok {
+               return lpcID, nil
+       }
+       // Look up the canonical location.
+       _, ok := base[cID]
+       if !ok {
+               // We messed up somewhere.
+               return "", fmt.Errorf("forceLpCoders: coder %q not present in 
base map", cID)
+       }
+
+       lpc := &pipepb.Coder{
+               Spec: &pipepb.FunctionSpec{
+                       Urn: urns.CoderLengthPrefix,
+               },
+               ComponentCoderIds: []string{cID},
+       }
+       base[lpcID] = lpc
+       return lpcID, nil
+}
+
 // retrieveCoders recursively ensures that the coder along with all its direct
 // and indirect component coders, are present in the `bundle` map.
 // If a coder is already in `bundle`, it's skipped. Returns an error if any
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index cad1fb7e547..7c7526b3d4d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -285,13 +285,25 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                        //slog.Warn("teststream bytes", 
"value", string(v), "bytes", v)
                                        return v
                                }
-                               // Hack for Java Strings in test stream, since 
it doesn't encode them correctly.
-                               forceLP := cID == "StringUtf8Coder" || cID != 
pyld.GetCoderId()
+                               // If the TestStream coder needs to be LP'ed or 
if it is a coder that has different
+                               // behaviors between nested context and outer 
context (in Java SDK), then we must
+                               // LP this coder and the TestStream data 
elements.
+                               forceLP := cID != pyld.GetCoderId() ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderStringUTF8 ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderBytes ||
+                                       coders[cID].GetSpec().GetUrn() == 
urns.CoderKV
                                if forceLP {
                                        // slog.Warn("recoding 
TestStreamValue", "cID", cID, "newUrn", coders[cID].GetSpec().GetUrn(), 
"payloadCoder", pyld.GetCoderId(), "oldUrn", 
coders[pyld.GetCoderId()].GetSpec().GetUrn())
                                        // The coder needed length prefixing. 
For simplicity, add a length prefix to each
                                        // encoded element, since we will be 
sending a length prefixed coder to consume
                                        // this anyway. This is simpler than 
trying to find all the re-written coders after the fact.
+                                       // This also adds a LP-coder for the 
original coder in comps.
+                                       cID, err := 
forceLpCoder(pyld.GetCoderId(), comps.GetCoders())
+                                       if err != nil {
+                                               panic(err)
+                                       }
+                                       slog.Debug("teststream: add coder", 
"coderId", cID)
+
                                        mayLP = func(v []byte) []byte {
                                                var buf bytes.Buffer
                                                if err := 
coder.EncodeVarInt((int64)(len(v)), &buf); err != nil {
@@ -303,6 +315,13 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                                //slog.Warn("teststream bytes - 
after LP", "value", string(v), "bytes", buf.Bytes())
                                                return buf.Bytes()
                                        }
+
+                                       // we need to change Coder and 
Pcollection in comps directly before they are used to build descriptors
+                                       for _, col := range t.GetOutputs() {
+                                               oCID := 
comps.Pcollections[col].CoderId
+                                               comps.Pcollections[col].CoderId 
= cID
+                                               slog.Debug("teststream: rewrite 
coder for output pcoll", "colId", col, "oldId", oCID, "newId", cID)
+                                       }
                                }
 
                                tsb := em.AddTestStream(stage.ID, t.Outputs)

Reply via email to