This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 580db936824 [Prism] LP SOME coder and its data elements for TestStream
(#36424)
580db936824 is described below
commit 580db9368243eb8b072de99307c60ba625676157
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Oct 8 13:44:26 2025 -0400
[Prism] LP SOME coder and its data elements for TestStream (#36424)
* Always LP coder and its data elements for TestStream
* Fix the failed tests in go sdk due to side input coders not setting
correctly.
* Reduce the change scope. Also avoid LP'ing an LP'ed coder..
* Mention breaking changes in CHANGES.md
---
CHANGES.md | 1 +
runners/prism/java/build.gradle | 5 +----
sdks/go/pkg/beam/core/runtime/graphx/coder.go | 10 ++++-----
sdks/go/pkg/beam/runners/prism/internal/coders.go | 25 ++++++++++++++++++++++
sdks/go/pkg/beam/runners/prism/internal/execute.go | 23 ++++++++++++++++++--
5 files changed, 53 insertions(+), 11 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ca1a589ccd0..9aa2346941e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -86,6 +86,7 @@
* (Python) Fixed transform naming conflict when executing DataTransform on a
dictionary of PColls ([#30445](https://github.com/apache/beam/issues/30445)).
This may break update compatibility if you don't provide a
`--transform_name_mapping`.
* Removed deprecated Hadoop versions (2.10.2 and 3.2.4) that are no longer
supported for [Iceberg](https://github.com/apache/iceberg/issues/10940) from
IcebergIO ([#36282](https://github.com/apache/beam/issues/36282)).
+* (Go) Coder construction on SDK side is more faithful to the specs from
runners without stripping length-prefix. This may break streaming pipeline
update as the underlying coder could be changed
([#36424](https://github.com/apache/beam/pull/36424)).
## Deprecations
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 5e5ddbe139e..7ce4e4d9061 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -106,14 +106,11 @@ def sickbayTests = [
// Prism doesn't support multiple TestStreams.
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
- // Sometimes fails missing a final 'AFTER'. Otherwise, Hangs in
ElementManager.FailBundle due to a held stageState lock.
- 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',
// GroupIntoBatchesTest tests that fail:
- // Teststream has bad KV encodings due to using an outer context.
+ // Wrong number of elements in windows after GroupIntoBatches.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInFixedWindow',
- // sdk worker disconnected
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testBufferingTimerInGlobalWindow',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 99ca5517d3d..2b769c873ec 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -266,9 +266,9 @@ func (b *CoderUnmarshaller) makeCoder(id string, c
*pipepb.Coder) (*coder.Coder,
// No payload means this coder was length prefixed by the runner
// but is likely self describing - AKA a beam coder.
- if len(sub.GetSpec().GetPayload()) == 0 {
- return b.makeCoder(components[0], sub)
- }
+ // if len(sub.GetSpec().GetPayload()) == 0 {
+ // return b.makeCoder(components[0], sub)
+ // }
// TODO(lostluck) 2018/10/17: Make this strict again, once
dataflow can use
// the portable pipeline model directly (BEAM-2885)
switch u := sub.GetSpec().GetUrn(); u {
@@ -285,8 +285,8 @@ func (b *CoderUnmarshaller) makeCoder(id string, c
*pipepb.Coder) (*coder.Coder,
t := typex.New(custom.Type)
cc := &coder.Coder{Kind: coder.Custom, T: t, Custom:
custom}
return cc, nil
- case urnBytesCoder, urnStringCoder: // implicitly length
prefixed types.
- return b.makeCoder(components[0], sub)
+ // case urnBytesCoder, urnStringCoder: // implicitly length
prefixed types.
+ // return b.makeCoder(components[0], sub)
default:
// Handle Length prefixing dictated by the runner.
cc, err := b.makeCoder(components[0], sub)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index 9b8e0fe731b..885d0eeef43 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -198,6 +198,31 @@ func lpUnknownCoders(cID string, bundle, base
map[string]*pipepb.Coder) (string,
return cID, nil
}
+// forceLpCoder always add a new LP-coder for a given coder into the "base" map
+func forceLpCoder(cID string, base map[string]*pipepb.Coder) (string, error) {
+ // First check if we've already added the LP version of this coder to
coders already.
+ lpcID := cID + "_flp"
+ // Check if we've done this one before.
+ if _, ok := base[lpcID]; ok {
+ return lpcID, nil
+ }
+ // Look up the canonical location.
+ _, ok := base[cID]
+ if !ok {
+ // We messed up somewhere.
+ return "", fmt.Errorf("forceLpCoders: coder %q not present in
base map", cID)
+ }
+
+ lpc := &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderLengthPrefix,
+ },
+ ComponentCoderIds: []string{cID},
+ }
+ base[lpcID] = lpc
+ return lpcID, nil
+}
+
// retrieveCoders recursively ensures that the coder along with all its direct
// and indirect component coders, are present in the `bundle` map.
// If a coder is already in `bundle`, it's skipped. Returns an error if any
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index cad1fb7e547..7c7526b3d4d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -285,13 +285,25 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
//slog.Warn("teststream bytes",
"value", string(v), "bytes", v)
return v
}
- // Hack for Java Strings in test stream, since
it doesn't encode them correctly.
- forceLP := cID == "StringUtf8Coder" || cID !=
pyld.GetCoderId()
+ // If the TestStream coder needs to be LP'ed or
if it is a coder that has different
+ // behaviors between nested context and outer
context (in Java SDK), then we must
+ // LP this coder and the TestStream data
elements.
+ forceLP := cID != pyld.GetCoderId() ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderStringUTF8 ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderBytes ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderKV
if forceLP {
// slog.Warn("recoding
TestStreamValue", "cID", cID, "newUrn", coders[cID].GetSpec().GetUrn(),
"payloadCoder", pyld.GetCoderId(), "oldUrn",
coders[pyld.GetCoderId()].GetSpec().GetUrn())
// The coder needed length prefixing.
For simplicity, add a length prefix to each
// encoded element, since we will be
sending a length prefixed coder to consume
// this anyway. This is simpler than
trying to find all the re-written coders after the fact.
+ // This also adds a LP-coder for the
original coder in comps.
+ cID, err :=
forceLpCoder(pyld.GetCoderId(), comps.GetCoders())
+ if err != nil {
+ panic(err)
+ }
+ slog.Debug("teststream: add coder",
"coderId", cID)
+
mayLP = func(v []byte) []byte {
var buf bytes.Buffer
if err :=
coder.EncodeVarInt((int64)(len(v)), &buf); err != nil {
@@ -303,6 +315,13 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
//slog.Warn("teststream bytes -
after LP", "value", string(v), "bytes", buf.Bytes())
return buf.Bytes()
}
+
+ // we need to change Coder and
Pcollection in comps directly before they are used to build descriptors
+ for _, col := range t.GetOutputs() {
+ oCID :=
comps.Pcollections[col].CoderId
+ comps.Pcollections[col].CoderId
= cID
+ slog.Debug("teststream: rewrite
coder for output pcoll", "colId", col, "oldId", oCID, "newId", cID)
+ }
}
tsb := em.AddTestStream(stage.ID, t.Outputs)