This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 99ee1738e2b Add a flag to control whether to allow splitting on sdf.
(#36512)
99ee1738e2b is described below
commit 99ee1738e2bf33add4487de2dad4c290f847b4f1
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Oct 15 10:13:49 2025 -0400
Add a flag to control whether to allow splitting on sdf. (#36512)
---
.../pkg/beam/runners/prism/internal/engine/elementmanager.go | 5 +++++
sdks/go/pkg/beam/runners/prism/internal/execute.go | 11 ++++++++++-
sdks/go/pkg/beam/runners/prism/internal/stage.go | 5 +++--
3 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index ccc4cfcc69d..f6562a77c39 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -186,6 +186,11 @@ type Config struct {
EnableRTC bool
// Whether to process the data in a streaming mode
StreamingMode bool
+ // Whether to enable splitting on splittable dofn.
+ // This flag is currently used when calling KafkaIO in streaming mode.
It prevents an
+ // error ("KafkaConsumer is not safe for multi-threaded access") that
can occur
+ // if the SDK allows splitting a single topic.
+ EnableSDFSplit bool
}
// ElementManager handles elements, watermarks, and related errata to determine
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index cedf0a9a043..05e939411b0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -152,7 +152,7 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
ts := comps.GetTransforms()
pcols := comps.GetPcollections()
- config := engine.Config{EnableRTC: true}
+ config := engine.Config{EnableRTC: true, EnableSDFSplit: true}
m := j.PipelineOptions().AsMap()
if experimentsSlice, ok :=
m["beam:option:experiments:v1"].([]interface{}); ok {
for _, exp := range experimentsSlice {
@@ -163,6 +163,14 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
}
}
}
+ for _, exp := range experimentsSlice {
+ if expStr, ok := exp.(string); ok {
+ if expStr == "prism_disable_sdf_split" {
+ config.EnableSDFSplit = false
+ break // Found it, no need to check the
rest of the slice
+ }
+ }
+ }
}
if streaming, ok := m["beam:option:streaming:v1"].(bool); ok {
@@ -324,6 +332,7 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
if len(stage.processingTimeTimers) > 0 {
em.StageProcessingTimeTimers(stage.ID,
stage.processingTimeTimers)
}
+ stage.sdfSplittable = config.EnableSDFSplit
default:
return fmt.Errorf("unknown environment[%v]",
t.GetEnvironmentId())
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 918ea45fcd6..c4758984af8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -88,7 +88,8 @@ type stage struct {
OutputsToCoders map[string]engine.PColInfo
// Stage specific progress and splitting interval.
- baseProgTick atomic.Value // time.Duration
+ baseProgTick atomic.Value // time.Duration
+ sdfSplittable bool
}
// The minimum and maximum durations between each ProgressBundleRequest and
split evaluation.
@@ -234,7 +235,7 @@ progress:
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
- if slow && unsplit && b.EstimatedInputElements > 0 {
+ if slow && unsplit && b.EstimatedInputElements > 0 &&
s.sdfSplittable {
slog.Debug("splitting report", "bundle", rb,
"index", index)
sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {