This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 05b1781c6ea [#32221] [prism] Terminate streams for each 
timerfamily+transform pair. (#32223)
05b1781c6ea is described below

commit 05b1781c6eae2df48bc69572ded7d0fcc6ed3757
Author: Robert Burke <[email protected]>
AuthorDate: Tue Aug 20 15:27:45 2024 -0700

    [#32221] [prism] Terminate streams for each timerfamily+transform pair. 
(#32223)
    
    * [#32221] Mark is-last for each timer stream correctly.
    
    * Remove test override for clear, since it now passes unmodified.
    
    * delint
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 22 ++++++++-------
 .../beam/runners/prism/internal/worker/bundle.go   |  5 ++--
 .../runners/portability/prism_runner_test.py       | 32 ----------------------
 3 files changed, 15 insertions(+), 44 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 3d1e506f5e3..da23ca8ccce 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -55,15 +55,17 @@ type link struct {
 // account, but all serialization boundaries remain since the pcollections
 // would continue to get serialized.
 type stage struct {
-       ID                   string
-       transforms           []string
-       primaryInput         string          // PCollection used as the 
parallel input.
-       outputs              []link          // PCollections that must escape 
this stage.
-       sideInputs           []engine.LinkID // Non-parallel input PCollections 
and their consumers
-       internalCols         []string        // PCollections that escape. Used 
for precise coder sending.
-       envID                string
-       stateful             bool
-       hasTimers            []string
+       ID           string
+       transforms   []string
+       primaryInput string          // PCollection used as the parallel input.
+       outputs      []link          // PCollections that must escape this 
stage.
+       sideInputs   []engine.LinkID // Non-parallel input PCollections and 
their consumers
+       internalCols []string        // PCollections that escape. Used for 
precise coder sending.
+       envID        string
+       stateful     bool
+       // hasTimers indicates the transform+timerfamily pairs that need to be 
waited on for
+       // the stage to be considered complete.
+       hasTimers            []struct{ Transform, TimerFamily string }
        processingTimeTimers map[string]bool
 
        exe              transformExecuter
@@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, 
wk *worker.W, em *eng
                        }
                }
                for timerID, v := range pardo.GetTimerFamilySpecs() {
-                       stg.hasTimers = append(stg.hasTimers, tid)
+                       stg.hasTimers = append(stg.hasTimers, struct{ 
Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID})
                        if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME {
                                if stg.processingTimeTimers == nil {
                                        stg.processingTimeTimers = 
map[string]bool{}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 842c5fdfc19..50e427ca36f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -42,7 +42,7 @@ type B struct {
        InputTransformID       string
        Input                  []*engine.Block // Data and Timers for this 
bundle.
        EstimatedInputElements int
-       HasTimers              []string
+       HasTimers              []struct{ Transform, TimerFamily string } // 
Timer streams to terminate.
 
        // IterableSideInputData is a map from transformID + inputID, to 
window, to data.
        IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
@@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan 
struct{} {
        for _, tid := range b.HasTimers {
                timers = append(timers, &fnpb.Elements_Timers{
                        InstructionId: b.InstID,
-                       TransformId:   tid,
+                       TransformId:   tid.Transform,
+                       TimerFamilyId: tid.TimerFamily,
                        IsLast:        true,
                })
        }
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py 
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index 387b7ba2bec..04a2dbd4fae 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -35,7 +35,6 @@ from apache_beam.options.pipeline_options import 
PortableOptions
 from apache_beam.runners.portability import portable_runner_test
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
 
@@ -195,37 +194,6 @@ class 
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
       assert_that(
           res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))
 
-  # The fn_runner_test.py version of this test doesn't execute the process
-  # method for some reason. Overridden here to validate that the cleared
-  # timer won't re-fire.
-  def test_pardo_timers_clear(self):
-    timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
-
-    class TimerDoFn(beam.DoFn):
-      def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
-        unused_key, ts = element
-        timer.set(ts)
-        timer.set(2 * ts)
-
-      @userstate.on_timer(timer_spec)
-      def process_timer(
-          self,
-          ts=beam.DoFn.TimestampParam,
-          timer=beam.DoFn.TimerParam(timer_spec)):
-        timer.set(timestamp.Timestamp(micros=2 * ts.micros))
-        timer.clear()  # Shouldn't fire again
-        yield 'fired'
-
-    with self.create_pipeline() as p:
-      actual = (
-          p
-          | beam.Create([('k1', 10), ('k2', 100)])
-          | beam.ParDo(TimerDoFn())
-          | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
-
-      expected = [('fired', ts) for ts in (20, 200)]
-      assert_that(actual, equal_to(expected))
-
   # Can't read host files from within docker, read a "local" file there.
   def test_read(self):
     print('name:', __name__)

Reply via email to