This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 673da546c14 [#30083][prism] Factor out hold tracking to dedicated
structures (#31105)
673da546c14 is described below
commit 673da546c1465c931fdbbc5769e7d566ff55b4d8
Author: Robert Burke <[email protected]>
AuthorDate: Fri Apr 26 15:11:39 2024 -0700
[#30083][prism] Factor out hold tracking to dedicated structures (#31105)
* [prism] Factor out hold tracking to dedicated structures
* review comment-reorder move code out of ladder.
---------
Co-authored-by: lostluck <[email protected]>
---
.../prism/internal/engine/elementmanager.go | 77 +++-----------
.../prism/internal/engine/elementmanager_test.go | 2 +-
.../beam/runners/prism/internal/engine/holds.go | 105 +++++++++++++++++++
.../runners/prism/internal/engine/holds_test.go | 115 +++++++++++++++++++++
.../runners/prism/internal/engine/teststream.go | 12 +--
5 files changed, 236 insertions(+), 75 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index e40f5513dae..5d665edf286 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -414,7 +414,7 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) {
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
- stageState = append(stageState, fmt.Sprintln(id, "watermark
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending,
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts",
ss.watermarkHoldsCounts))
+ stageState = append(stageState, fmt.Sprintln(id, "watermark
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending,
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts",
ss.watermarkHolds.counts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero
pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
@@ -706,18 +706,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
delete(stage.inprogressKeysByBundle, rb.BundleID)
for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
- n := stage.watermarkHoldsCounts[hold] - v
- if n == 0 {
- delete(stage.watermarkHoldsCounts, hold)
- for i, h := range stage.watermarkHoldHeap {
- if hold == h {
- heap.Remove(&stage.watermarkHoldHeap, i)
- break
- }
- }
- } else {
- stage.watermarkHoldsCounts[hold] = n
- }
+ stage.watermarkHolds.Drop(hold, v)
}
delete(stage.inprogressHoldsByBundle, rb.BundleID)
@@ -918,8 +907,7 @@ type stageState struct {
// We track the count of timers with the same hold, and clear it from
// the map and heap when the count goes to zero.
// This avoids scanning the heap to remove or access a hold for each
element.
- watermarkHoldsCounts map[mtime.Time]int
- watermarkHoldHeap holdHeap
+ watermarkHolds *holdTracker
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to
associated holds.
}
@@ -940,37 +928,15 @@ type dataAndTimers struct {
timers map[timerKey]timerTimes
}
-// holdHeap orders holds based on their timestamps
-// so we can always find the minimum timestamp of pending holds.
-type holdHeap []mtime.Time
-
-func (h holdHeap) Len() int { return len(h) }
-func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
-func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
-
-func (h *holdHeap) Push(x any) {
- // Push and Pop use pointer receivers because they modify the slice's
length,
- // not just its contents.
- *h = append(*h, x.(mtime.Time))
-}
-
-func (h *holdHeap) Pop() any {
- old := *h
- n := len(old)
- x := old[n-1]
- *h = old[0 : n-1]
- return x
-}
-
// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID)
*stageState {
ss := &stageState{
- ID: ID,
- outputIDs: outputIDs,
- sides: sides,
- strat: defaultStrat{},
- state:
map[LinkID]map[typex.Window]map[string]StateData{},
- watermarkHoldsCounts: map[mtime.Time]int{},
+ ID: ID,
+ outputIDs: outputIDs,
+ sides: sides,
+ strat: defaultStrat{},
+ state:
map[LinkID]map[typex.Window]map[string]StateData{},
+ watermarkHolds: newHoldTracker(),
input: mtime.MinTimestamp,
output: mtime.MinTimestamp,
@@ -1016,29 +982,13 @@ func (ss *stageState) AddPending(newPending []element)
int {
// don't increase the count this time,
as "this" timer is already pending.
count--
// clear out the existing hold for
accounting purposes.
- v :=
ss.watermarkHoldsCounts[lastSet.hold] - 1
- if v == 0 {
- delete(ss.watermarkHoldsCounts,
lastSet.hold)
- for i, hold := range
ss.watermarkHoldHeap {
- if hold == lastSet.hold
{
-
heap.Remove(&ss.watermarkHoldHeap, i)
- break
- }
- }
- } else {
-
ss.watermarkHoldsCounts[lastSet.hold] = v
- }
+ ss.watermarkHolds.Drop(lastSet.hold, 1)
}
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
// Mark the hold in the heap.
- ss.watermarkHoldsCounts[e.holdTimestamp] =
ss.watermarkHoldsCounts[e.holdTimestamp] + 1
-
- if len(ss.watermarkHoldsCounts) !=
len(ss.watermarkHoldHeap) {
- // The hold should not be in the heap,
so we add it.
- heap.Push(&ss.watermarkHoldHeap,
e.holdTimestamp)
- }
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
}
}
return count
@@ -1308,10 +1258,7 @@ func (ss *stageState) updateWatermarks(em
*ElementManager) set[string] {
defer ss.mu.Unlock()
minPending := ss.minPendingTimestampLocked()
- minWatermarkHold := mtime.MaxTimestamp
- if ss.watermarkHoldHeap.Len() > 0 {
- minWatermarkHold = ss.watermarkHoldHeap[0]
- }
+ minWatermarkHold := ss.watermarkHolds.Min()
// PCollection watermarks are based on their parents's output watermark.
_, newIn := ss.UpstreamWatermark()
diff --git
a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
index 7235508f164..275dd790d2b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
@@ -295,7 +295,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
ss.output = test.initOutput
ss.updateUpstreamWatermark(inputCol, test.upstream)
ss.pending = append(ss.pending, element{timestamp:
test.minPending})
- ss.watermarkHoldHeap = append(ss.watermarkHoldHeap,
test.minStateHold)
+ ss.watermarkHolds.Add(test.minStateHold, 1)
ss.updateWatermarks(em)
if got, want := ss.input, test.wantInput; got != want {
pcol, up := ss.UpstreamWatermark()
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
new file mode 100644
index 00000000000..9077b3f439d
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+ "container/heap"
+ "fmt"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+// holdHeap orders holds based on their timestamps
+// so we can always find the minimum timestamp of pending holds.
+type holdHeap []mtime.Time
+
+func (h holdHeap) Len() int { return len(h) }
+func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *holdHeap) Push(x any) {
+ // Push and Pop use pointer receivers because they modify the slice's
length,
+ // not just its contents.
+ *h = append(*h, x.(mtime.Time))
+}
+
+func (h *holdHeap) Pop() any {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// holdTracker track the watermark holds for a stage.
+//
+// Timers hold back the watermark until they fire, but multiple
+// timers may set the same watermark hold.
+// To track when the watermark may advance further this structure maintains
+// counts for each set watermark hold.
+// As timers are processed, their associated holds are removed, reducing the
counts.
+//
+// A heap of the hold times is kept so we have quick access to the minimum
hold, for calculating
+// how to advance the watermark.
+type holdTracker struct {
+ heap holdHeap
+ counts map[mtime.Time]int
+}
+
+func newHoldTracker() *holdTracker {
+ return &holdTracker{
+ counts: map[mtime.Time]int{},
+ }
+}
+
+// Drop the given hold count. When the count of a hold time reaches zero, it's
+// removed from the heap. Drop panics if holds become negative.
+func (ht *holdTracker) Drop(hold mtime.Time, v int) {
+ n := ht.counts[hold] - v
+ if n > 0 {
+ ht.counts[hold] = n
+ return
+ } else if n < 0 {
+ panic(fmt.Sprintf("prism error: negative watermark hold count
%v for time %v", n, hold))
+ }
+ delete(ht.counts, hold)
+ for i, h := range ht.heap {
+ if hold == h {
+ heap.Remove(&ht.heap, i)
+ break
+ }
+ }
+}
+
+// Add a hold a number of times to heap. If the hold time isn't already
present in the heap, it is added.
+func (ht *holdTracker) Add(hold mtime.Time, v int) {
+ // Mark the hold in the heap.
+ ht.counts[hold] = ht.counts[hold] + v
+
+ if len(ht.counts) != len(ht.heap) {
+ // Since there's a difference, the hold should not be in the
heap, so we add it.
+ heap.Push(&ht.heap, hold)
+ }
+}
+
+// Min returns the earliest hold in the heap. Returns [mtime.MaxTimestamp] if
the heap is empty.
+func (ht *holdTracker) Min() mtime.Time {
+ minWatermarkHold := mtime.MaxTimestamp
+ if len(ht.heap) > 0 {
+ minWatermarkHold = ht.heap[0]
+ }
+ return minWatermarkHold
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
new file mode 100644
index 00000000000..91de51bc1af
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+func TestHoldTracker(t *testing.T) {
+
+ type op func(*holdTracker)
+ add := func(hold mtime.Time, count int) op {
+ return func(ht *holdTracker) {
+ ht.Add(hold, count)
+ }
+ }
+
+ drop := func(hold mtime.Time, count int) op {
+ return func(ht *holdTracker) {
+ ht.Drop(hold, count)
+ }
+ }
+
+ tests := []struct {
+ name string
+ ops []op
+ wantMin mtime.Time
+ wantLen int
+ }{
+ {
+ name: "zero-max",
+ wantMin: mtime.MaxTimestamp,
+ wantLen: 0,
+ }, {
+
+ name: "one-min",
+ ops: []op{
+ add(mtime.MinTimestamp, 1),
+ },
+ wantMin: mtime.MinTimestamp,
+ wantLen: 1,
+ }, {
+
+ name: "cleared-max",
+ ops: []op{
+ add(mtime.MinTimestamp, 1),
+ drop(mtime.MinTimestamp, 1),
+ },
+ wantMin: mtime.MaxTimestamp,
+ wantLen: 0,
+ }, {
+ name: "cleared-non-eogw",
+ ops: []op{
+ add(mtime.MinTimestamp, 1),
+ add(mtime.EndOfGlobalWindowTime, 1),
+ drop(mtime.MinTimestamp, 1),
+ },
+ wantMin: mtime.EndOfGlobalWindowTime,
+ wantLen: 1,
+ }, {
+ name: "uncleared-non-min",
+ ops: []op{
+ add(mtime.MinTimestamp, 2),
+ add(mtime.EndOfGlobalWindowTime, 1),
+ drop(mtime.MinTimestamp, 1),
+ },
+ wantMin: mtime.MinTimestamp,
+ wantLen: 2,
+ }, {
+ name: "uncleared-non-min",
+ ops: []op{
+ add(1, 1),
+ add(2, 1),
+ add(3, 1),
+ drop(2, 1),
+ add(4, 1),
+ add(3, 1),
+ drop(1, 1),
+ add(2, 1),
+ drop(4, 1),
+ },
+ wantMin: 2,
+ wantLen: 2,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ tracker := newHoldTracker()
+ for _, op := range test.ops {
+ op(tracker)
+ }
+ if got, want := tracker.Min(), test.wantMin; got !=
want {
+ t.Errorf("tracker.heap.Min() = %v, want %v",
got, want)
+ }
+ if got, want := tracker.heap.Len(), test.wantLen; got
!= want {
+ t.Errorf("tracker.heap.Len() = %v, want %v",
got, want)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
index f0350064d52..34b79d455ce 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
@@ -16,7 +16,6 @@
package engine
import (
- "container/heap"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
@@ -139,13 +138,9 @@ func (ts *testStreamHandler) UpdateHold(em
*ElementManager, newHold mtime.Time)
ss.mu.Lock()
defer ss.mu.Unlock()
- if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
- heap.Pop(&ss.watermarkHoldHeap)
- ss.watermarkHoldsCounts[ts.currentHold] =
ss.watermarkHoldsCounts[ts.currentHold] - 1
- }
+ ss.watermarkHolds.Drop(ts.currentHold, 1)
ts.currentHold = newHold
- heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
- ss.watermarkHoldsCounts[ts.currentHold] = 1
+ ss.watermarkHolds.Add(ts.currentHold, 1)
// kick the TestStream and Impulse stages too.
kick := singleSet(ts.ID)
@@ -281,8 +276,7 @@ func (tsi *testStreamImpl) initHandler(id string) {
tsi.em.addPending(1) // We subtrack a pending after event
execution, so add one now for the final event to avoid a race condition.
// Arrest the watermark initially to prevent terminal
advancement.
- heap.Push(&ss.watermarkHoldHeap,
tsi.em.testStreamHandler.currentHold)
- ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] =
1
+ ss.watermarkHolds.Add(tsi.em.testStreamHandler.currentHold, 1)
}
}