This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bd47dc90c37 Use component coder when handling nullable coder in prism. 
(#34615)
bd47dc90c37 is described below

commit bd47dc90c3740b64ba05a306506c018646deb8a8
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Apr 11 23:40:31 2025 -0400

    Use component coder when handling nullable coder in prism. (#34615)
    
    * Invoke component coder in nullable coder.
    
    * Add a check to ensure the correct number of components in other coders.
    
    * Re-enable a previously failed java flatten test.
---
 runners/prism/java/build.gradle                   |  2 --
 sdks/go/pkg/beam/runners/prism/internal/coders.go | 12 +++++++++---
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 1d58a568c43..93b3bef3ce5 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -110,8 +110,6 @@ def sickbayTests = [
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
 
     // Java side dying during execution.
-    // https://github.com/apache/beam/issues/32930
-    'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
     // Stream corruption error java side: 
failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
     // Likely due to prism't coder changes.
     
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
index 6b88790521c..d1bd33fdb93 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/coders.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -250,6 +250,11 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders 
map[string]*pipepb.Coder) func(i
                        ioutilx.ReadN(r, int(l))
                }
        case urns.CoderNullable:
+               ccids := c.GetComponentCoderIds()
+               if len(ccids) != 1 {
+                       panic(fmt.Sprintf("Nullable coder must have only one 
component: %s", prototext.Format(c)))
+               }
+               ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
                return func(r io.Reader) {
                        b, _ := ioutilx.ReadN(r, 1)
                        if len(b) == 0 {
@@ -260,8 +265,7 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders 
map[string]*pipepb.Coder) func(i
                        if prefix == 0 {
                                return
                        }
-                       l, _ := coder.DecodeVarInt(r)
-                       ioutilx.ReadN(r, int(l))
+                       ed(r)
                }
        case urns.CoderVarInt:
                return func(r io.Reader) {
@@ -277,6 +281,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders 
map[string]*pipepb.Coder) func(i
                }
        case urns.CoderIterable:
                ccids := c.GetComponentCoderIds()
+               if len(ccids) != 1 {
+                       panic(fmt.Sprintf("Iterable coder must have only one 
component: %s", prototext.Format(c)))
+               }
                ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
                return func(r io.Reader) {
                        l, _ := coder.DecodeInt32(r)
@@ -284,7 +291,6 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders 
map[string]*pipepb.Coder) func(i
                                ed(r)
                        }
                }
-
        case urns.CoderKV:
                ccids := c.GetComponentCoderIds()
                if len(ccids) != 2 {

Reply via email to