riteshghorse commented on code in PR #27496:
URL: https://github.com/apache/beam/pull/27496#discussion_r1263830842


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6435,6 +6433,10 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, 
OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
timer_output_timestamps_bad >}}
+{{< /highlight >}}\

Review Comment:
   ```suggestion
   {{< /highlight >}}
   ```



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -644,40 +701,422 @@ type combineFn struct{}
 // combiningStateFn keeps track of the number of elements seen.
 type combiningStateFn struct {
        // types are the types of the accumulator, input, and output 
respectively
-       val state.Combining[int, int, int]
+       Val state.Combining[int, int, int]
 }
 
 func (s *combiningStateFn) ProcessElement(p state.Provider, book string, word 
string, emitWords func(string)) error {
        // Get the value stored in our state
-       val, _, err := s.val.Read(p)
+       val, _, err := s.Val.Read(p)
        if err != nil {
                return err
        }
-       s.val.Add(p, 1)
+       s.Val.Add(p, 1)
 
        if val > 10000 {
                // Example of clearing and starting again with an empty bag
-               s.val.Clear(p)
+               s.Val.Clear(p)
        }
 
        return nil
 }
 
-func main() {
+func combineState(s beam.Scope, input beam.PCollection) beam.PCollection {
        // ...
        // CombineFn param can be a simple fn like this or a structural 
CombineFn
        cFn := state.MakeCombiningState[int, int, int]("stateKey", func(a, b 
int) int {
                return a + b
        })
+       combined := beam.ParDo(s, combiningStateFn{Val: cFn}, input)
+
        // ...
 
        // [END combining_state]
 
-       fmt.Print(cFn)
+       return combined
+}
+
+// [START event_time_timer]
+
+type eventTimerDoFn struct {
+       State state.Value[int64]
+       Timer timers.EventTime
+}
+
+func (fn *eventTimerDoFn) ProcessElement(ts beam.EventTime, sp state.Provider, 
tp timers.Provider, book, word string, emitWords func(string)) {
+       // ...
+
+       // Set an event-time timer to the element timestamp.
+       fn.Timer.Set(tp, ts.ToTime())
+
+       // ...
+}
+
+func (fn *eventTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w 
beam.Window, key string, timer timers.Context, emitWords func(string)) {
+       switch timer.Family {
+       case fn.Timer.Family:
+               // process callback for this timer
+       }
+}
+
+func AddEventTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection {
+       return beam.ParDo(s, &eventTimerDoFn{
+               // Timers are given family names so their callbacks can be 
handled independantly.
+               Timer: timers.InEventTime("processWatermark"),
+               State: state.MakeValueState[int64]("latest"),
+       }, in)
+}
+
+// [END event_time_timer]
+
+// [START processing_time_timer]
+
+type processingTimerDoFn struct {
+       Timer timers.ProcessingTime
+}
+
+func (fn *processingTimerDoFn) ProcessElement(sp state.Provider, tp 
timers.Provider, book, word string, emitWords func(string)) {
+       // ...
+
+       // Set a timer to go off 30 seconds in the future.
+       fn.Timer.Set(tp, time.Now().Add(30*time.Second))
+
+       // ...
+}
+
+func (fn *processingTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, 
w beam.Window, key string, timer timers.Context, emitWords func(string)) {
+       switch timer.Family {
+       case fn.Timer.Family:
+               // process callback for this timer
+       }
+}
+
+func AddProcessingTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection 
{
+       return beam.ParDo(s, &processingTimerDoFn{
+               // Timers are given family names so their callbacks can be 
handled independantly.
+               Timer: timers.InProcessingTime("timer"),
+       }, in)
+}
+
+// [END processing_time_timer]
+
+// [START dynamic_timer_tags]
+
+type hasAction interface {
+       Action() string
+}
+
+type dynamicTagsDoFn[V hasAction] struct {
+       Timer timers.EventTime
+}
+
+func (fn *dynamicTagsDoFn[V]) ProcessElement(ts beam.EventTime, tp 
timers.Provider, key string, value V, emitWords func(string)) {
+       // ...
+
+       // Set a timer to go off 30 seconds in the future.
+       fn.Timer.Set(tp, ts.ToTime(), timers.WithTag(value.Action()))
+
+       // ...
+}
+
+func (fn *dynamicTagsDoFn[V]) OnTimer(tp timers.Provider, w beam.Window, key 
string, timer timers.Context, emitWords func(string)) {
+       switch timer.Family {
+       case fn.Timer.Family:
+               tag := timer.Tag // Do something with fired tag
+               _ = tag
+       }
+}
+
+func AddDynamicTimerTagsDoFn[V hasAction](s beam.Scope, in beam.PCollection) 
beam.PCollection {
+       return beam.ParDo(s, &dynamicTagsDoFn[V]{
+               Timer: timers.InEventTime("actionTimers"),
+       }, in)
+}
+
+// [END dynamic_timer_tags]
+
+// [START timer_output_timestamps_bad]
+
+type badTimerOutputTimestampsFn[V any] struct {
+       ElementBag  state.Bag[V]
+       TimerSet    state.Value[bool]
+       OutputState timers.ProcessingTime
+}
+
+func (fn *badTimerOutputTimestampsFn[V]) ProcessElement(sp state.Provider, tp 
timers.Provider, key string, value V, emit func(string)) error {
+       // Add the current element to the bag for this key.
+       if err := fn.ElementBag.Add(sp, value); err != nil {
+               return err
+       }
+       set, _, err := fn.TimerSet.Read(sp)
+       if err != nil {
+               return err
+       }
+       if !set {
+               fn.OutputState.Set(tp, time.Now().Add(1*time.Minute))
+               fn.TimerSet.Write(sp, true)
+       }
+       return nil
+}
+
+func (fn *badTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp 
timers.Provider, w beam.Window, key string, timer timers.Context, emit 
func(string)) error {
+       switch timer.Family {
+       case fn.OutputState.Family:
+               vs, _, err := fn.ElementBag.Read(sp)
+               if err != nil {
+                       return err
+               }
+               for _, v := range vs {
+                       // Output each element
+                       emit(fmt.Sprintf("%v", v))
+               }
+
+               fn.ElementBag.Clear(sp)
+               // Note that the timer has now fired.
+               fn.TimerSet.Clear(sp)
+       }
+       return nil
+}
+
+// [END timer_output_timestamps_bad]
+
+// [START timer_output_timestamps_good]
+
+type element[V any] struct {
+       Timestamp int64
+       Value     V
+}
+
+type goodTimerOutputTimestampsFn[V any] struct {
+       ElementBag        state.Bag[element[V]]                // The bag of 
elements accumulated.
+       TimerTimerstamp   state.Value[int64]                   // The timestamp 
of the timer set.
+       MinTimestampInBag state.Combining[int64, int64, int64] // The minimum 
timestamp stored in the bag.
+       OutputState       timers.ProcessingTime                // The timestamp 
of the timer.
+}
+
+func (fn *goodTimerOutputTimestampsFn[V]) ProcessElement(et beam.EventTime, sp 
state.Provider, tp timers.Provider, key string, value V, emit 
func(beam.EventTime, string)) error {
+       // ...
+       // Add the current element to the bag for this key, and preserve the 
event time.
+       if err := fn.ElementBag.Add(sp, element[V]{Timestamp: 
et.Milliseconds(), Value: value}); err != nil {
+               return err
+       }
+
+       // Keep track of the minimum element timestamp currently stored in the 
bag.
+       fn.MinTimestampInBag.Add(sp, et.Milliseconds())
+
+       // If the timer is already set, then reset it at the same time but with 
an updated output timestamp (otherwise
+       // we would keep resetting the timer to the future). If there is no 
timer set, then set one to expire in a minute.
+       ts, ok, _ := fn.TimerTimerstamp.Read(sp)
+       var tsToSet time.Time
+       if ok {
+               tsToSet = time.UnixMilli(ts)
+       } else {
+               tsToSet = time.Now().Add(1 * time.Minute)
+       }
+
+       minTs, _, _ := fn.MinTimestampInBag.Read(sp)
+       outputTs := time.UnixMilli(minTs)
+
+       // Setting the outputTimestamp to the minimum timestamp in the bag 
holds the watermark to that timestamp until the
+       // timer fires. This allows outputting all the elements with their 
timestamp.
+       fn.OutputState.Set(tp, tsToSet, timers.WithOutputTimestamp(outputTs))
+       fn.TimerTimerstamp.Write(sp, tsToSet.UnixMilli())
+
+       return nil
+}
+
+func (fn *goodTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp 
timers.Provider, w beam.Window, key string, timer timers.Context, emit 
func(beam.EventTime, string)) error {
+       switch timer.Family {
+       case fn.OutputState.Family:
+               vs, _, err := fn.ElementBag.Read(sp)
+               if err != nil {
+                       return err
+               }
+               for _, v := range vs {
+                       // Output each element with their timestamp
+                       emit(beam.EventTime(v.Timestamp), fmt.Sprintf("%v", 
v.Value))
+               }
+
+               fn.ElementBag.Clear(sp)
+               // Note that the timer has now fired.
+               fn.TimerTimerstamp.Clear(sp)
+       }
+       return nil
+}
+
+func AddTimedOutputBatching[V any](s beam.Scope, in beam.PCollection) 
beam.PCollection {
+       return beam.ParDo(s, &goodTimerOutputTimestampsFn[V]{
+               ElementBag:      state.MakeBagState[element[V]]("elementBag"),
+               TimerTimerstamp: state.MakeValueState[int64]("timerTimestamp"),
+               MinTimestampInBag: state.MakeCombiningState[int64, int64, 
int64]("minTimestampInBag", func(a, b int64) int64 {
+                       if a < b {
+                               return a
+                       }
+                       return b
+               }),
+               OutputState: timers.InProcessingTime("outputState"),
+       }, in)
+}
+
+// [END timer_output_timestamps_good]
+
+// updateState exists for example purposes only
+func updateState(sp, state, k, v any) {}
+
+// [START timer_garbage_collection]
+
+type timerGarbageCollectionFn[V any] struct {
+       State             state.Value[V]                       // The state for 
the key.
+       MaxTimestampInBag state.Combining[int64, int64, int64] // The maximum 
element timestamp seen so far.
+       GcTimer           timers.EventTime                     // The timestamp 
of the timer.
+}
+
+func (fn *timerGarbageCollectionFn[V]) ProcessElement(et beam.EventTime, sp 
state.Provider, tp timers.Provider, key string, value V, emit 
func(beam.EventTime, string)) {
+       updateState(sp, fn.State, key, value)
+       fn.MaxTimestampInBag.Add(sp, et.Milliseconds())
+
+       // Set the timer to be one hour after the maximum timestamp seen. This 
will keep overwriting the same timer, so
+       // as long as there is activity on this key the state will stay active. 
Once the key goes inactive for one hour's
+       // worth of event time (as measured by the watermark), then the gc 
timer will fire.
+       maxTs, _, _ := fn.MaxTimestampInBag.Read(sp)
+       expirationTime := time.UnixMilli(maxTs).Add(1 * time.Hour)
+       fn.GcTimer.Set(tp, expirationTime)
 }
 
+func (fn *timerGarbageCollectionFn[V]) OnTimer(sp state.Provider, tp 
timers.Provider, w beam.Window, key string, timer timers.Context, emit 
func(beam.EventTime, string)) {
+       switch timer.Family {
+       case fn.GcTimer.Family:
+               // Clear all the state for the key
+               fn.State.Clear(sp)
+               fn.MaxTimestampInBag.Clear(sp)
+       }
+}
+
+func AddTimerGarbageCollection[V any](s beam.Scope, in beam.PCollection) 
beam.PCollection {
+       return beam.ParDo(s, &timerGarbageCollectionFn[V]{
+               State: state.MakeValueState[V]("timerTimestamp"),
+               MaxTimestampInBag: state.MakeCombiningState[int64, int64, 
int64]("maxTimestampInBag", func(a, b int64) int64 {
+                       if a > b {
+                               return a
+                       }
+                       return b
+               }),
+               GcTimer: timers.InEventTime("gcTimer"),
+       }, in)
+}
+
+// [END timer_garbage_collection]
+
+type Event struct{}
+
+func (*Event) isClick() bool {}

Review Comment:
   the precommit build may be failing because we are not returning anything here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to