This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec152e28355 [#32139] Fail pipelines with Stateful SDFs. (#32140)
ec152e28355 is described below

commit ec152e283557a7b9ba273dac4b6fc6400786d2cf
Author: Robert Burke <[email protected]>
AuthorDate: Fri Aug 9 13:31:17 2024 -0700

    [#32139] Fail pipelines with Stateful SDFs. (#32140)
    
    * [#32139] Fail pipelines with Stateful SDFs.
    
    * rm debug print
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 .../go/pkg/beam/runners/prism/internal/jobservices/management.go | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 7676d958031..2b03eddff05 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -158,17 +158,26 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
                                return nil, fmt.Errorf("unable to unmarshal 
ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
                        }
 
+                       isStateful := false
+
                        // Validate all the state features
                        for _, spec := range pardo.GetStateSpecs() {
+                               isStateful = true
                                check("StateSpec.Protocol.Urn", 
spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap)
                        }
                        // Validate all the timer features
                        for _, spec := range pardo.GetTimerFamilySpecs() {
+                               isStateful = true
                                check("TimerFamilySpecs.TimeDomain.Urn", 
spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, 
pipepb.TimeDomain_PROCESSING_TIME)
                        }
 
                        check("OnWindowExpirationTimerFamily", 
pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.
 
+                       // Check for a stateful SDF and direct user to 
https://github.com/apache/beam/issues/32139
+                       if pardo.GetRestrictionCoderId() != "" && isStateful {
+                               check("Splittable+Stateful DoFn", "See 
https://github.com/apache/beam/issues/32139 for information.", "")
+                       }
+
                case urns.TransformTestStream:
                        var testStream pipepb.TestStreamPayload
                        if err := proto.Unmarshal(t.GetSpec().GetPayload(), 
&testStream); err != nil {

Reply via email to