This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 17298b5572e [#32115] Fix timer support, support timer clears. (#32119)
17298b5572e is described below
commit 17298b5572e9b0d8aa8c4d0ca1e51c3f832c0067
Author: Robert Burke <[email protected]>
AuthorDate: Fri Aug 9 14:21:32 2024 -0700
[#32115] Fix timer support, support timer clears. (#32119)
---
.../prism/internal/engine/elementmanager.go | 22 ++--
.../beam/runners/prism/internal/engine/timers.go | 115 +++++++++++++--------
.../runners/portability/prism_runner_test.py | 32 ++++++
3 files changed, 119 insertions(+), 50 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index bc8449c72b3..c73db507c79 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -869,14 +869,20 @@ func (em *ElementManager) triageTimers(d TentativeData,
inputInfo PColInfo, stag
for tentativeKey, timers := range d.timers {
keyToTimers := map[timerKey]element{}
for _, t := range timers {
- key, tag, elms := decodeTimer(inputInfo.KeyDec, true, t)
- for _, e := range elms {
- keyToTimers[timerKey{key: string(key), tag:
tag, win: e.window}] = e
- }
- if len(elms) == 0 {
- // TODO(lostluck): Determine best way to mark a
timer cleared.
- continue
- }
+ // TODO: Call in a for:range loop when Beam's minimum
Go version hits 1.23.0
+ iter := decodeTimerIter(inputInfo.KeyDec, true, t)
+ iter(func(ret timerRet) bool {
+ for _, e := range ret.elms {
+ keyToTimers[timerKey{key:
string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
+ }
+ if len(ret.elms) == 0 {
+ for _, w := range ret.windows {
+ delete(keyToTimers,
timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
+ }
+ }
+ // Indicate we'd like to continue iterating.
+ return true
+ })
}
for _, elm := range keyToTimers {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
index 787d27858a0..9a3bd6f9682 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
@@ -31,53 +31,77 @@ import (
"google.golang.org/protobuf/encoding/protowire"
)
-// DecodeTimer extracts timers to elements for insertion into their keyed
queues.
-// Returns the key bytes, tag, window exploded elements, and the hold
timestamp.
+type timerRet struct {
+ keyBytes []byte
+ tag string
+ elms []element
+ windows []typex.Window
+}
+
+// decodeTimerIter extracts timers to elements for insertion into their keyed
queues,
+// through a go iterator function, to be called by the caller with their
processing function.
+//
+// For each timer, a key, tag, windowed elements, and the window set are
returned.
+//
// If the timer has been cleared, no elements will be returned. Any existing
timers
-// for the tag *must* be cleared from the pending queue.
-func decodeTimer(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw
[]byte) ([]byte, string, []element) {
- keyBytes := keyDec(bytes.NewBuffer(raw))
-
- d := decoder{raw: raw, cursor: len(keyBytes)}
- tag := string(d.Bytes())
-
- var ws []typex.Window
- numWin := d.Fixed32()
- if usesGlobalWindow {
- for i := 0; i < int(numWin); i++ {
- ws = append(ws, window.GlobalWindow{})
- }
- } else {
- // Assume interval windows here, since we don't understand
custom windows yet.
- for i := 0; i < int(numWin); i++ {
- ws = append(ws, d.IntervalWindow())
- }
- }
+// for the tag *must* be cleared from the pending queue. The windows
associated with
+// the clear are provided to be able to delete pending timers.
+func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw
[]byte) func(func(timerRet) bool) {
+ return func(yield func(timerRet) bool) {
+ for len(raw) > 0 {
+ keyBytes := keyDec(bytes.NewBuffer(raw))
+ d := decoder{raw: raw, cursor: len(keyBytes)}
+ tag := string(d.Bytes())
+
+ var ws []typex.Window
+ numWin := d.Fixed32()
+ if usesGlobalWindow {
+ for i := 0; i < int(numWin); i++ {
+ ws = append(ws, window.GlobalWindow{})
+ }
+ } else {
+ // Assume interval windows here, since we don't
understand custom windows yet.
+ for i := 0; i < int(numWin); i++ {
+ ws = append(ws, d.IntervalWindow())
+ }
+ }
- clear := d.Bool()
- hold := mtime.MaxTimestamp
- if clear {
- return keyBytes, tag, nil
- }
+ clear := d.Bool()
+ hold := mtime.MaxTimestamp
+ if clear {
+ if !yield(timerRet{keyBytes, tag, nil, ws}) {
+ return // Halt iteration if yeild
returns false.
+ }
+ // Otherwise continue handling the remaining
bytes.
+ raw = d.UnusedBytes()
+ continue
+ }
- firing := d.Timestamp()
- hold = d.Timestamp()
- pane := d.Pane()
+ firing := d.Timestamp()
+ hold = d.Timestamp()
+ pane := d.Pane()
+
+ var elms []element
+ for _, w := range ws {
+ elms = append(elms, element{
+ tag: tag,
+ elmBytes: nil, // indicates this
is a timer.
+ keyBytes: keyBytes,
+ window: w,
+ timestamp: firing,
+ holdTimestamp: hold,
+ pane: pane,
+ sequence: len(elms),
+ })
+ }
- var ret []element
- for _, w := range ws {
- ret = append(ret, element{
- tag: tag,
- elmBytes: nil, // indicates this is a timer.
- keyBytes: keyBytes,
- window: w,
- timestamp: firing,
- holdTimestamp: hold,
- pane: pane,
- sequence: len(ret),
- })
+ if !yield(timerRet{keyBytes, tag, elms, ws}) {
+ return // Halt iteration if yeild returns false.
+ }
+ // Otherwise continue handling the remaining bytes.
+ raw = d.UnusedBytes()
+ }
}
- return keyBytes, tag, ret
}
type decoder struct {
@@ -140,6 +164,13 @@ func (d *decoder) Bytes() []byte {
return b
}
+// UnusedBytes returns the remainder of bytes in the buffer that weren't yet
used.
+// Multiple timers can be provided in a single timers buffer, since multiple
dynamic
+// timer tags may be set.
+func (d *decoder) UnusedBytes() []byte {
+ return d.raw[d.cursor:]
+}
+
func (d *decoder) Bool() bool {
if b := d.Byte(); b == 0 {
return false
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index 324fe5a17b5..b179156877e 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -40,6 +40,7 @@ from apache_beam.options.pipeline_options import
PortableOptions
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.transforms.sql import SqlTransform
from apache_beam.utils import timestamp
@@ -200,6 +201,37 @@ class
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))
+ # The fn_runner_test.py version of this test doesn't execute the process
+ # method for some reason. Overridden here to validate that the cleared
+ # timer won't re-fire.
+ def test_pardo_timers_clear(self):
+ timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+
+ class TimerDoFn(beam.DoFn):
+ def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
+ unused_key, ts = element
+ timer.set(ts)
+ timer.set(2 * ts)
+
+ @userstate.on_timer(timer_spec)
+ def process_timer(
+ self,
+ ts=beam.DoFn.TimestampParam,
+ timer=beam.DoFn.TimerParam(timer_spec)):
+ timer.set(timestamp.Timestamp(micros=2 * ts.micros))
+ timer.clear() # Shouldn't fire again
+ yield 'fired'
+
+ with self.create_pipeline() as p:
+ actual = (
+ p
+ | beam.Create([('k1', 10), ('k2', 100)])
+ | beam.ParDo(TimerDoFn())
+ | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
+
+ expected = [('fired', ts) for ts in (20, 200)]
+ assert_that(actual, equal_to(expected))
+
# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)