SamStentz commented on issue #31177:
URL: https://github.com/apache/beam/issues/31177#issuecomment-2093696810

   I generally just want to be able to test timers locally with any runner, not 
too picky.
   
   the file I am trying to run
   
   ```
   package poc
   
   import (
        "context"
        "time"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/verily-src/verily1/ingestion/internal/beamutil"
   )
   
   func init() {
        runtime.RegisterFunction(DoNothingTransform)
        beamutil.RegisterTypePointer[doNothingTransformFn]()
        beamutil.RegisterTypePointer[context.Context]()
   }
   
   type doNothingTransformFn struct {
        TimerCount     state.Value[int64]
        ProcessingTime timers.ProcessingTime
   }
   
   func NewDoNothingTransform() *doNothingTransformFn {
        return &doNothingTransformFn{
                TimerCount:     state.MakeValueState[int64]("timerCount"),
                ProcessingTime: timers.InProcessingTime("processingTime"),
        }
   }
   
   func (fn *doNothingTransformFn) Setup(ctx context.Context) {
   }
   
   func (fn *doNothingTransformFn) ProcessElement(ctx context.Context,
        sp state.Provider, tp timers.Provider,
        k int,
        v string,
        emit func(string)) {
        log.Infof(ctx, "ProcessElement <%d, %s>", k, v)
        // Set timer for 1 second processing time.
        tn := time.Now()
        fn.ProcessingTime.Set(tp, tn.Add(time.Second))
        err := fn.TimerCount.Write(sp, 0)
        if err != nil {
                log.Errorf(ctx, "error writing timer time: %v", err)
        }
   }
   
   func (fn *doNothingTransformFn) OnTimer(ctx context.Context,
        sp state.Provider, tp timers.Provider,
        k int, timer timers.Context,
        v string,
        emit func(string)) {
        // Read state.
        tc, _, err := fn.TimerCount.Read(sp)
        if err != nil {
                log.Errorf(ctx, "error reading timer time: %v", err)
                return
        }
        log.Infof(ctx, "OnTimer <%d, %s>: count %d", k, v, tc)
        // terminate at 5 iterations
        if tc == 5 {
                emit(v)
        }
        // Set timer for 1 second processing time.
        tn := time.Now()
        fn.ProcessingTime.Set(tp, tn.Add(time.Second))
        err = fn.TimerCount.Write(sp, tc+1)
        if err != nil {
                log.Errorf(ctx, "error writing timer time: %v", err)
        }
   }
   
   func (fn *doNothingTransformFn) Teardown(ctx context.Context) {
   }
   
   func DoNothingTransform(scope beam.Scope, in beam.PCollection) 
beam.PCollection {
        return beam.ParDo(scope.Scope("DoNothing"), NewDoNothingTransform(), in)
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to