This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 05b1781c6ea [#32221] [prism] Terminate streams for each
timerfamily+transform pair. (#32223)
05b1781c6ea is described below
commit 05b1781c6eae2df48bc69572ded7d0fcc6ed3757
Author: Robert Burke <[email protected]>
AuthorDate: Tue Aug 20 15:27:45 2024 -0700
[#32221] [prism] Terminate streams for each timerfamily+transform pair.
(#32223)
* [#32221] Mark is-last for each timer stream correctly.
* Remove test override for clear, since it now passes unmodified.
* delint
---------
Co-authored-by: lostluck <[email protected]>
---
sdks/go/pkg/beam/runners/prism/internal/stage.go | 22 ++++++++-------
.../beam/runners/prism/internal/worker/bundle.go | 5 ++--
.../runners/portability/prism_runner_test.py | 32 ----------------------
3 files changed, 15 insertions(+), 44 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 3d1e506f5e3..da23ca8ccce 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -55,15 +55,17 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
- ID string
- transforms []string
- primaryInput string // PCollection used as the
parallel input.
- outputs []link // PCollections that must escape
this stage.
- sideInputs []engine.LinkID // Non-parallel input PCollections
and their consumers
- internalCols []string // PCollections that escape. Used
for precise coder sending.
- envID string
- stateful bool
- hasTimers []string
+ ID string
+ transforms []string
+ primaryInput string // PCollection used as the parallel input.
+ outputs []link // PCollections that must escape this
stage.
+ sideInputs []engine.LinkID // Non-parallel input PCollections and
their consumers
+ internalCols []string // PCollections that escape. Used for
precise coder sending.
+ envID string
+ stateful bool
+ // hasTimers indicates the transform+timerfamily pairs that need to be
waited on for
+ // the stage to be considered complete.
+ hasTimers []struct{ Transform, TimerFamily string }
processingTimeTimers map[string]bool
exe transformExecuter
@@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components,
wk *worker.W, em *eng
}
}
for timerID, v := range pardo.GetTimerFamilySpecs() {
- stg.hasTimers = append(stg.hasTimers, tid)
+ stg.hasTimers = append(stg.hasTimers, struct{
Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID})
if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME {
if stg.processingTimeTimers == nil {
stg.processingTimeTimers =
map[string]bool{}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 842c5fdfc19..50e427ca36f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -42,7 +42,7 @@ type B struct {
InputTransformID string
Input []*engine.Block // Data and Timers for this
bundle.
EstimatedInputElements int
- HasTimers []string
+ HasTimers []struct{ Transform, TimerFamily string } //
Timer streams to terminate.
// IterableSideInputData is a map from transformID + inputID, to
window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
@@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan
struct{} {
for _, tid := range b.HasTimers {
timers = append(timers, &fnpb.Elements_Timers{
InstructionId: b.InstID,
- TransformId: tid,
+ TransformId: tid.Transform,
+ TimerFamilyId: tid.TimerFamily,
IsLast: true,
})
}
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index 387b7ba2bec..04a2dbd4fae 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -35,7 +35,6 @@ from apache_beam.options.pipeline_options import
PortableOptions
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.utils import timestamp
@@ -195,37 +194,6 @@ class
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))
- # The fn_runner_test.py version of this test doesn't execute the process
- # method for some reason. Overridden here to validate that the cleared
- # timer won't re-fire.
- def test_pardo_timers_clear(self):
- timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
-
- class TimerDoFn(beam.DoFn):
- def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
- unused_key, ts = element
- timer.set(ts)
- timer.set(2 * ts)
-
- @userstate.on_timer(timer_spec)
- def process_timer(
- self,
- ts=beam.DoFn.TimestampParam,
- timer=beam.DoFn.TimerParam(timer_spec)):
- timer.set(timestamp.Timestamp(micros=2 * ts.micros))
- timer.clear() # Shouldn't fire again
- yield 'fired'
-
- with self.create_pipeline() as p:
- actual = (
- p
- | beam.Create([('k1', 10), ('k2', 100)])
- | beam.ParDo(TimerDoFn())
- | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
-
- expected = [('fired', ts) for ts in (20, 200)]
- assert_that(actual, equal_to(expected))
-
# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)