This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d58d23f2d9 [#29180][prism] Return total element count to progress 
loop. Split less aggressively. (#29968)
0d58d23f2d9 is described below

commit 0d58d23f2d9c34ae5f068acddc622fa3ac6b0854
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jan 12 11:13:11 2024 -0800

    [#29180][prism] Return total element count to progress loop. Split less 
aggressively. (#29968)
    
    * [prism] Return total element count to progress loop. Split less 
aggressively.
    
    * Update comments.
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 .../pkg/beam/runners/prism/internal/jobservices/job.go |  2 +-
 .../beam/runners/prism/internal/jobservices/metrics.go | 18 ++++++++++++------
 sdks/go/pkg/beam/runners/prism/internal/stage.go       | 15 ++++++++++-----
 3 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index d6e906bee59..bb5eb88c919 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -100,7 +100,7 @@ func (j *Job) PipelineOptions() *structpb.Struct {
 }
 
 // ContributeTentativeMetrics returns the datachannel read index, and any 
unknown monitoring short ids.
-func (j *Job) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+func (j *Job) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
        return j.metrics.ContributeTentativeMetrics(payloads)
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index e0caec55881..f90efdfa8bd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -480,8 +480,8 @@ func (m *metricsStore) AddShortIDs(resp 
*fnpb.MonitoringInfosMetadataResponse) {
        }
 }
 
-func (m *metricsStore) contributeMetrics(d durability, mdata 
map[string][]byte) (int64, []string) {
-       readIndex := int64(-1)
+func (m *metricsStore) contributeMetrics(d durability, mdata 
map[string][]byte) (map[string]int64, []string) {
+       var index, totalCount int64
        if m.accums[d] == nil {
                m.accums[d] = map[metricKey]metricAccumulator{}
        }
@@ -510,14 +510,20 @@ func (m *metricsStore) contributeMetrics(d durability, 
mdata map[string][]byte)
                        panic(fmt.Sprintf("error decoding metrics %v: 
%+v\n\t%+v", key.Urn(), key, a))
                }
                accums[key] = a
-               if key.Urn() == "beam:metric:data_channel:read_index:v1" {
-                       readIndex = a.(*sumInt64).sum
+               switch u := key.Urn(); u {
+               case "beam:metric:data_channel:read_index:v1":
+                       index = a.(*sumInt64).sum // There should only be one 
of these per progress response.
+               case "beam:metric:element_count:v1":
+                       totalCount += a.(*sumInt64).sum
                }
        }
-       return readIndex, missingShortIDs
+       return map[string]int64{
+               "index":      index,
+               "totalCount": totalCount,
+       }, missingShortIDs
 }
 
-func (m *metricsStore) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (int64, []string) {
+func (m *metricsStore) ContributeTentativeMetrics(payloads 
*fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
        m.mu.Lock()
        defer m.mu.Unlock()
        return m.contributeMetrics(tentative, payloads.GetMonitoringData())
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index d677a0cd4cf..e52031b43d1 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -130,7 +130,9 @@ func (s *stage) Execute(ctx context.Context, j 
*jobservices.Job, wk *worker.W, c
 
        // Progress + split loop.
        previousIndex := int64(-2)
-       var splitsDone bool
+       previousTotalCount := int64(-2) // Total count of all pcollection 
elements.
+
+       unsplit := true
        progTick := time.NewTicker(100 * time.Millisecond)
        defer progTick.Stop()
        var dataFinished, bundleFinished bool
@@ -170,8 +172,10 @@ progress:
                                j.AddMetricShortIDs(md)
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
-                       // Progress for the bundle hasn't advanced. Try 
splitting.
-                       if previousIndex == index && !splitsDone {
+
+                       // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
+                       slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       if slow && unsplit {
                                slog.Debug("splitting report", "bundle", rb, 
"index", index)
                                sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
                                if err != nil {
@@ -180,7 +184,7 @@ progress:
                                }
                                if sr.GetChannelSplits() == nil {
                                        slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       splitsDone = true
+                                       unsplit = false
                                        continue progress
                                }
                                // TODO sort out rescheduling primary Roots on 
bundle failure.
@@ -206,7 +210,8 @@ progress:
                                        em.ReturnResiduals(rb, int(fr), 
s.inputInfo, residualData)
                                }
                        } else {
-                               previousIndex = index
+                               previousIndex = index["index"]
+                               previousTotalCount = index["totalCount"]
                        }
                }
        }

Reply via email to