This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0d58d23f2d9 [#29180][prism] Return total element count to progress
loop. Split less aggressively. (#29968)
0d58d23f2d9 is described below
commit 0d58d23f2d9c34ae5f068acddc622fa3ac6b0854
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jan 12 11:13:11 2024 -0800
[#29180][prism] Return total element count to progress loop. Split less
aggressively. (#29968)
* [prism] Return total element count to progress loop. Split less
aggressively.
* Update comments.
---------
Co-authored-by: lostluck <[email protected]>
---
.../pkg/beam/runners/prism/internal/jobservices/job.go | 2 +-
.../beam/runners/prism/internal/jobservices/metrics.go | 18 ++++++++++++------
sdks/go/pkg/beam/runners/prism/internal/stage.go | 15 ++++++++++-----
3 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index d6e906bee59..bb5eb88c919 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -100,7 +100,7 @@ func (j *Job) PipelineOptions() *structpb.Struct {
}
// ContributeTentativeMetrics returns the datachannel read index, and any
unknown monitoring short ids.
-func (j *Job) ContributeTentativeMetrics(payloads
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+func (j *Job) ContributeTentativeMetrics(payloads
*fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index e0caec55881..f90efdfa8bd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -480,8 +480,8 @@ func (m *metricsStore) AddShortIDs(resp
*fnpb.MonitoringInfosMetadataResponse) {
}
}
-func (m *metricsStore) contributeMetrics(d durability, mdata
map[string][]byte) (int64, []string) {
- readIndex := int64(-1)
+func (m *metricsStore) contributeMetrics(d durability, mdata
map[string][]byte) (map[string]int64, []string) {
+ var index, totalCount int64
if m.accums[d] == nil {
m.accums[d] = map[metricKey]metricAccumulator{}
}
@@ -510,14 +510,20 @@ func (m *metricsStore) contributeMetrics(d durability,
mdata map[string][]byte)
panic(fmt.Sprintf("error decoding metrics %v:
%+v\n\t%+v", key.Urn(), key, a))
}
accums[key] = a
- if key.Urn() == "beam:metric:data_channel:read_index:v1" {
- readIndex = a.(*sumInt64).sum
+ switch u := key.Urn(); u {
+ case "beam:metric:data_channel:read_index:v1":
+ index = a.(*sumInt64).sum // There should only be one
of these per progress response.
+ case "beam:metric:element_count:v1":
+ totalCount += a.(*sumInt64).sum
}
}
- return readIndex, missingShortIDs
+ return map[string]int64{
+ "index": index,
+ "totalCount": totalCount,
+ }, missingShortIDs
}
-func (m *metricsStore) ContributeTentativeMetrics(payloads
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+func (m *metricsStore) ContributeTentativeMetrics(payloads
*fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
m.mu.Lock()
defer m.mu.Unlock()
return m.contributeMetrics(tentative, payloads.GetMonitoringData())
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index d677a0cd4cf..e52031b43d1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -130,7 +130,9 @@ func (s *stage) Execute(ctx context.Context, j
*jobservices.Job, wk *worker.W, c
// Progress + split loop.
previousIndex := int64(-2)
- var splitsDone bool
+ previousTotalCount := int64(-2) // Total count of all pcollection
elements.
+
+ unsplit := true
progTick := time.NewTicker(100 * time.Millisecond)
defer progTick.Stop()
var dataFinished, bundleFinished bool
@@ -170,8 +172,10 @@ progress:
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
- // Progress for the bundle hasn't advanced. Try
splitting.
- if previousIndex == index && !splitsDone {
+
+ // Check if there has been any measurable progress by
the input, or all output pcollections since last report.
+ slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ if slow && unsplit {
slog.Debug("splitting report", "bundle", rb,
"index", index)
sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {
@@ -180,7 +184,7 @@ progress:
}
if sr.GetChannelSplits() == nil {
slog.Debug("SDK returned no splits",
"bundle", rb)
- splitsDone = true
+ unsplit = false
continue progress
}
// TODO sort out rescheduling primary Roots on
bundle failure.
@@ -206,7 +210,8 @@ progress:
em.ReturnResiduals(rb, int(fr),
s.inputInfo, residualData)
}
} else {
- previousIndex = index
+ previousIndex = index["index"]
+ previousTotalCount = index["totalCount"]
}
}
}