This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec152e28355 [#32139] Fail pipelines with Stateful SDFs. (#32140)
ec152e28355 is described below
commit ec152e283557a7b9ba273dac4b6fc6400786d2cf
Author: Robert Burke <[email protected]>
AuthorDate: Fri Aug 9 13:31:17 2024 -0700
[#32139] Fail pipelines with Stateful SDFs. (#32140)
* [#32139] Fail pipelines with Stateful SDFs.
* rm debug print
---------
Co-authored-by: lostluck <[email protected]>
---
.../go/pkg/beam/runners/prism/internal/jobservices/management.go | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 7676d958031..2b03eddff05 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -158,17 +158,26 @@ func (s *Server) Prepare(ctx context.Context, req
*jobpb.PrepareJobRequest) (*jo
return nil, fmt.Errorf("unable to unmarshal
ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
}
+ isStateful := false
+
// Validate all the state features
for _, spec := range pardo.GetStateSpecs() {
+ isStateful = true
check("StateSpec.Protocol.Urn",
spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap)
}
// Validate all the timer features
for _, spec := range pardo.GetTimerFamilySpecs() {
+ isStateful = true
check("TimerFamilySpecs.TimeDomain.Urn",
spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME,
pipepb.TimeDomain_PROCESSING_TIME)
}
check("OnWindowExpirationTimerFamily",
pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.
+ // Check for a stateful SDF and direct user to
https://github.com/apache/beam/issues/32139
+ if pardo.GetRestrictionCoderId() != "" && isStateful {
+ check("Splittable+Stateful DoFn", "See
https://github.com/apache/beam/issues/32139 for information.", "")
+ }
+
case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(),
&testStream); err != nil {