This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 99ee1738e2b Add a flag to control whether to allow splitting on sdf. 
(#36512)
99ee1738e2b is described below

commit 99ee1738e2bf33add4487de2dad4c290f847b4f1
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Oct 15 10:13:49 2025 -0400

    Add a flag to control whether to allow splitting on sdf. (#36512)
---
 .../pkg/beam/runners/prism/internal/engine/elementmanager.go  |  5 +++++
 sdks/go/pkg/beam/runners/prism/internal/execute.go            | 11 ++++++++++-
 sdks/go/pkg/beam/runners/prism/internal/stage.go              |  5 +++--
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index ccc4cfcc69d..f6562a77c39 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -186,6 +186,11 @@ type Config struct {
        EnableRTC bool
        // Whether to process the data in a streaming mode
        StreamingMode bool
+       // Whether to enable splitting on splittable dofn.
+       // This flag is currently used when calling KafkaIO in streaming mode. 
It prevents an
+       // error ("KafkaConsumer is not safe for multi-threaded access") that 
can occur
+       // if the SDK allows splitting a single topic.
+       EnableSDFSplit bool
 }
 
 // ElementManager handles elements, watermarks, and related errata to determine
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index cedf0a9a043..05e939411b0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -152,7 +152,7 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
        ts := comps.GetTransforms()
        pcols := comps.GetPcollections()
 
-       config := engine.Config{EnableRTC: true}
+       config := engine.Config{EnableRTC: true, EnableSDFSplit: true}
        m := j.PipelineOptions().AsMap()
        if experimentsSlice, ok := 
m["beam:option:experiments:v1"].([]interface{}); ok {
                for _, exp := range experimentsSlice {
@@ -163,6 +163,14 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                }
                        }
                }
+               for _, exp := range experimentsSlice {
+                       if expStr, ok := exp.(string); ok {
+                               if expStr == "prism_disable_sdf_split" {
+                                       config.EnableSDFSplit = false
+                                       break // Found it, no need to check the 
rest of the slice
+                               }
+                       }
+               }
        }
 
        if streaming, ok := m["beam:option:streaming:v1"].(bool); ok {
@@ -324,6 +332,7 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                        if len(stage.processingTimeTimers) > 0 {
                                em.StageProcessingTimeTimers(stage.ID, 
stage.processingTimeTimers)
                        }
+                       stage.sdfSplittable = config.EnableSDFSplit
                default:
                        return fmt.Errorf("unknown environment[%v]", 
t.GetEnvironmentId())
                }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 918ea45fcd6..c4758984af8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -88,7 +88,8 @@ type stage struct {
        OutputsToCoders   map[string]engine.PColInfo
 
        // Stage specific progress and splitting interval.
-       baseProgTick atomic.Value // time.Duration
+       baseProgTick  atomic.Value // time.Duration
+       sdfSplittable bool
 }
 
 // The minimum and maximum durations between each ProgressBundleRequest and 
split evaluation.
@@ -234,7 +235,7 @@ progress:
 
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
-                       if slow && unsplit && b.EstimatedInputElements > 0 {
+                       if slow && unsplit && b.EstimatedInputElements > 0 && 
s.sdfSplittable {
                                slog.Debug("splitting report", "bundle", rb, 
"index", index)
                                sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
                                if err != nil {

Reply via email to