This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bd47dc90c37 Use component coder when handling nullable coder in prism.
(#34615)
bd47dc90c37 is described below
commit bd47dc90c3740b64ba05a306506c018646deb8a8
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Apr 11 23:40:31 2025 -0400
Use component coder when handling nullable coder in prism. (#34615)
* Invoke component coder in nullable coder.
* Add a check to ensure the correct number of components in other coders.
* Re-enable a previously failed java flatten test.
---
runners/prism/java/build.gradle | 2 --
sdks/go/pkg/beam/runners/prism/internal/coders.go | 12 +++++++++---
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 1d58a568c43..93b3bef3ce5 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -110,8 +110,6 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
// Java side dying during execution.
- // https://github.com/apache/beam/issues/32930
- 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
// Stream corruption error java side:
failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
// Likely due to prism't coder changes.
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index 6b88790521c..d1bd33fdb93 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -250,6 +250,11 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders
map[string]*pipepb.Coder) func(i
ioutilx.ReadN(r, int(l))
}
case urns.CoderNullable:
+ ccids := c.GetComponentCoderIds()
+ if len(ccids) != 1 {
+ panic(fmt.Sprintf("Nullable coder must have only one
component: %s", prototext.Format(c)))
+ }
+ ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
return func(r io.Reader) {
b, _ := ioutilx.ReadN(r, 1)
if len(b) == 0 {
@@ -260,8 +265,7 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders
map[string]*pipepb.Coder) func(i
if prefix == 0 {
return
}
- l, _ := coder.DecodeVarInt(r)
- ioutilx.ReadN(r, int(l))
+ ed(r)
}
case urns.CoderVarInt:
return func(r io.Reader) {
@@ -277,6 +281,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders
map[string]*pipepb.Coder) func(i
}
case urns.CoderIterable:
ccids := c.GetComponentCoderIds()
+ if len(ccids) != 1 {
+ panic(fmt.Sprintf("Iterable coder must have only one
component: %s", prototext.Format(c)))
+ }
ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
return func(r io.Reader) {
l, _ := coder.DecodeInt32(r)
@@ -284,7 +291,6 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders
map[string]*pipepb.Coder) func(i
ed(r)
}
}
-
case urns.CoderKV:
ccids := c.GetComponentCoderIds()
if len(ccids) != 2 {