This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a58953470 Fix zero key length panic in Prism (#36983)
c0a58953470 is described below

commit c0a589534704cbdf8c43f0d56275332d99820cdf
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Dec 3 19:57:56 2025 -0500

    Fix zero key length panic in Prism (#36983)
    
    * Fix TestStream so kv coder is preserved. Put keyBytes into TestStream 
elements.
    
    * Fix a bug when panic in watermark eval goroutine not triggering job 
failure
    
    * Minor fix.
    
    * Sickbay some failed tests for later investigation.
---
 runners/prism/java/build.gradle                    |  7 ++
 .../prism/internal/engine/elementmanager.go        | 11 ++-
 .../runners/prism/internal/engine/teststream.go    |  9 ++
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  4 +
 .../beam/runners/prism/internal/handlerunner.go    | 97 +++++++++++++++++-----
 5 files changed, 107 insertions(+), 21 deletions(-)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 0754e714dd8..c89974cb6ea 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -115,6 +115,13 @@ def sickbayTests = [
     // ShardedKey not yet implemented.
     
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
 
+    // Some tests failed when using TestStream with keyed elements.
+    // https://github.com/apache/beam/issues/36984
+    
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithState',
+    
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testMapStateNoReadOnComputeIfAbsentAndPutIfAbsentInsertsElement',
+    'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp',
+    
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampWithProcessingTime',
+
     // Technically these tests "succeed"
     // the test is just complaining that an AssertionException isn't a 
RuntimeException
     //
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index de7b89e751e..f1ef9dd5028 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -225,6 +225,7 @@ type ElementManager struct {
        sideConsumers map[string][]LinkID // Map from pcollectionID to the 
stage+transform+input that consumes them as side input.
 
        pcolParents map[string]string // Map from pcollectionID to stageIDs 
that produce the pcollection.
+       pcolInfo    map[string]PColInfo
 
        refreshCond       sync.Cond   // refreshCond protects the following 
fields with it's lock, and unblocks bundle scheduling.
        inprogressBundles set[string] // Active bundleIDs
@@ -255,6 +256,7 @@ func NewElementManager(config Config) *ElementManager {
                consumers:         map[string][]string{},
                sideConsumers:     map[string][]LinkID{},
                pcolParents:       map[string]string{},
+               pcolInfo:          map[string]PColInfo{},
                changedStages:     set[string]{},
                inprogressBundles: set[string]{},
                refreshCond:       sync.Cond{L: &sync.Mutex{}},
@@ -324,6 +326,10 @@ func (em *ElementManager) StageProcessingTimeTimers(ID 
string, ptTimers map[stri
        em.stages[ID].processingTimeTimersFamilies = ptTimers
 }
 
+func (em *ElementManager) RegisterPColInfo(pcolID string, info PColInfo) {
+       em.pcolInfo[pcolID] = info
+}
+
 // AddTestStream provides a builder interface for the execution layer to build 
the test stream from
 // the protos.
 func (em *ElementManager) AddTestStream(id string, tagToPCol 
map[string]string) TestStreamBuilder {
@@ -386,6 +392,10 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
        }()
        // Watermark evaluation goroutine.
        go func() {
+               // We should defer closing of the channel first, so that when a 
panic happens,
+               // we will handle the panic and trigger a job failure BEFORE 
the job is
+               // prematurely marked as done.
+               defer close(runStageCh)
                defer func() {
                        // In case of panics in bundle generation, fail and 
cancel the job.
                        if e := recover(); e != nil {
@@ -393,7 +403,6 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
                                upstreamCancelFn(fmt.Errorf("panic in 
ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e, 
string(debug.Stack())))
                        }
                }()
-               defer close(runStageCh)
 
                for {
                        em.refreshCond.L.Lock()
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
index 90f81d3104b..e934e6a6bb4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
@@ -16,6 +16,7 @@
 package engine
 
 import (
+       "bytes"
        "log/slog"
        "time"
 
@@ -174,12 +175,20 @@ type tsElementEvent struct {
 func (ev tsElementEvent) Execute(em *ElementManager) {
        t := em.testStreamHandler.tagState[ev.Tag]
 
+       info := em.pcolInfo[t.pcollection]
        var pending []element
        for _, e := range ev.Elements {
+               var keyBytes []byte
+               if info.KeyDec != nil {
+                       kbuf := bytes.NewBuffer(e.Encoded)
+                       keyBytes = info.KeyDec(kbuf)
+               }
+
                pending = append(pending, element{
                        window:    window.GlobalWindow{},
                        timestamp: e.EventTime,
                        elmBytes:  e.Encoded,
+                       keyBytes:  keyBytes,
                        pane:      typex.NoFiringPane(),
                })
        }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 05e939411b0..853b7974479 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -277,6 +277,10 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                // Add a synthetic stage that should largely be 
unused.
                                em.AddStage(stage.ID, nil, 
maps.Values(t.GetOutputs()), nil)
 
+                               for pcolID, info := range stage.OutputsToCoders 
{
+                                       em.RegisterPColInfo(pcolID, info)
+                               }
+
                                // Decode the test stream, and convert it to 
the various events for the ElementManager.
                                var pyld pipepb.TestStreamPayload
                                if err := 
proto.Unmarshal(t.GetSpec().GetPayload(), &pyld); err != nil {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index 7b1ecee1977..3ac0d98850d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -227,7 +227,8 @@ func (h *runner) handleTestStream(tid string, t 
*pipepb.PTransform, comps *pipep
        }
        coders := map[string]*pipepb.Coder{}
        // Ensure awareness of the coder used for the teststream.
-       cID, err := lpUnknownCoders(pyld.GetCoderId(), coders, 
comps.GetCoders())
+       ocID := pyld.GetCoderId()
+       cID, err := lpUnknownCoders(ocID, coders, comps.GetCoders())
        if err != nil {
                panic(err)
        }
@@ -235,10 +236,10 @@ func (h *runner) handleTestStream(tid string, t 
*pipepb.PTransform, comps *pipep
        // If the TestStream coder needs to be LP'ed or if it is a coder that 
has different
        // behaviors between nested context and outer context (in Java SDK), 
then we must
        // LP this coder and the TestStream data elements.
-       forceLP := (cID != pyld.GetCoderId() && 
coders[pyld.GetCoderId()].GetSpec().GetUrn() != "beam:go:coder:custom:v1") ||
-               coders[cID].GetSpec().GetUrn() == urns.CoderStringUTF8 ||
-               coders[cID].GetSpec().GetUrn() == urns.CoderBytes ||
-               coders[cID].GetSpec().GetUrn() == urns.CoderKV
+       forceLP := (cID != ocID && coders[ocID].GetSpec().GetUrn() != 
"beam:go:coder:custom:v1") ||
+               coders[ocID].GetSpec().GetUrn() == urns.CoderStringUTF8 ||
+               coders[ocID].GetSpec().GetUrn() == urns.CoderBytes ||
+               coders[ocID].GetSpec().GetUrn() == urns.CoderKV
 
        if !forceLP {
                return prepareResult{SubbedComps: &pipepb.Components{
@@ -246,25 +247,81 @@ func (h *runner) handleTestStream(tid string, t 
*pipepb.PTransform, comps *pipep
                }}
        }
 
-       // The coder needed length prefixing. For simplicity, add a length 
prefix to each
-       // encoded element, since we will be sending a length prefixed coder to 
consume
-       // this anyway. This is simpler than trying to find all the re-written 
coders after the fact.
-       // This also adds a LP-coder for the original coder in comps.
-       cID, err = forceLpCoder(pyld.GetCoderId(), coders, comps.GetCoders())
-       if err != nil {
-               panic(err)
-       }
-       slog.Debug("teststream: add coder", "coderId", cID)
-
-       mustLP := func(v []byte) []byte {
-               var buf bytes.Buffer
-               if err := coder.EncodeVarInt((int64)(len(v)), &buf); err != nil 
{
+       var mustLP func(v []byte) []byte
+       if coders[ocID].GetSpec().GetUrn() != urns.CoderKV {
+               // The coder needed length prefixing. For simplicity, add a 
length prefix to each
+               // encoded element, since we will be sending a length prefixed 
coder to consume
+               // this anyway. This is simpler than trying to find all the 
re-written coders after the fact.
+               // This also adds a LP-coder for the original coder in comps.
+               cID, err = forceLpCoder(pyld.GetCoderId(), coders, 
comps.GetCoders())
+               if err != nil {
                        panic(err)
                }
-               if _, err := buf.Write(v); err != nil {
+               slog.Debug("teststream: add coder", "coderId", cID)
+
+               mustLP = func(v []byte) []byte {
+                       var buf bytes.Buffer
+                       if err := coder.EncodeVarInt((int64)(len(v)), &buf); 
err != nil {
+                               panic(err)
+                       }
+                       if _, err := buf.Write(v); err != nil {
+                               panic(err)
+                       }
+                       return buf.Bytes()
+               }
+       } else {
+               // For a KV coder, we only length-prefix the value coder 
because we need to
+               // preserve the original structure of the key coder. This 
allows the key
+               // coder to be easily extracted later to retrieve the KeyBytes 
from the
+               // encoded elements.
+
+               c := coders[ocID]
+               kcid := c.GetComponentCoderIds()[0]
+               vcid := c.GetComponentCoderIds()[1]
+
+               var lpvcid string
+               lpvcid, err = forceLpCoder(vcid, coders, comps.GetCoders())
+               if err != nil {
                        panic(err)
                }
-               return buf.Bytes()
+
+               slog.Debug("teststream: add coder", "coderId", lpvcid)
+
+               kvc := &pipepb.Coder{
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: urns.CoderKV,
+                       },
+                       ComponentCoderIds: []string{kcid, lpvcid},
+               }
+
+               kvcID := ocID + "_vlp"
+               coders[kvcID] = kvc
+
+               slog.Debug("teststream: add coder", "coderId", kvcID)
+
+               cID = kvcID
+
+               kd := collectionPullDecoder(kcid, coders, comps)
+               mustLP = func(v []byte) []byte {
+                       elmBuf := bytes.NewBuffer(v)
+                       keyBytes := kd(elmBuf)
+
+                       var buf bytes.Buffer
+                       if _, err := buf.Write(keyBytes); err != nil {
+                               panic(err)
+                       }
+
+                       // put the length of the value
+                       if err := 
coder.EncodeVarInt((int64)(len(v)-len(keyBytes)), &buf); err != nil {
+                               panic(err)
+                       }
+
+                       // write the value aka. the remaining bytes from the 
buffer
+                       if _, err := buf.Write(elmBuf.Bytes()); err != nil {
+                               panic(err)
+                       }
+                       return buf.Bytes()
+               }
        }
 
        // We need to loop over the events.

Reply via email to