lostluck commented on code in PR #26101:
URL: https://github.com/apache/beam/pull/26101#discussion_r1157512442


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1350,6 +1393,42 @@ func validateState(fn *DoFn, numIn mainInputs) error {
        return nil
 }
 
+func validateTimer(fn *DoFn) error {
+       if fn.Fn == nil {
+               return nil
+       }
+
+       pt := fn.PipelineTimers()
+
+       if _, ok := fn.Fn.TimerProvider(); ok {
+               if len(pt) == 0 {
+                       err := errors.Errorf("ProcessElement uses a 
TimerProvider, but no timer struct-tags are attached to the DoFn")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a TimerProvider, but no timer struct-tags are attached to the DoFn"+
+                               ", Ensure that you are including the timer 
structs you're using to set/clear global state as uppercase member variables")
+               }
+               timerKeys := make(map[string]PipelineTimer)
+               for _, t := range pt {
+                       k := t.TimerFamily()
+                       if timer, ok := timerKeys[k]; ok {
+                               err := errors.Errorf("Duplicate timer key %v", 
k)
+                               return errors.SetTopLevelMsgf(err, "Duplicate 
timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k, 
timer, t)
+                       } else {
+                               timerKeys[k] = t
+                       }
+               }
+       } else {
+               if len(pt) > 0 {
+                       err := errors.Errorf("ProcessElement doesn't  use a 
TimerProvider, but Timer Struct is attached to the DoFn: %v", pt)
+                       return errors.SetTopLevelMsgf(err, "ProcessElement 
doesn't  use a TimerProvider, but Timer Struct is attached to the DoFn: %v"+

Review Comment:
   "but the DoFn has no Timer fields"...



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1350,6 +1393,42 @@ func validateState(fn *DoFn, numIn mainInputs) error {
        return nil
 }
 
+func validateTimer(fn *DoFn) error {
+       if fn.Fn == nil {
+               return nil
+       }
+
+       pt := fn.PipelineTimers()
+
+       if _, ok := fn.Fn.TimerProvider(); ok {
+               if len(pt) == 0 {
+                       err := errors.Errorf("ProcessElement uses a 
TimerProvider, but no timer struct-tags are attached to the DoFn")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a TimerProvider, but no timer struct-tags are attached to the DoFn"+

Review Comment:
   Technically we're just looking for exported Timer fields, not struct tags. 
struct tags are the fixed strings after the timer fields.  
   
   I'm not against using them instead of needing to name the fields and so 
forth, since then users don't need to initialise the fields themselves to get 
the names but that approach would prevent the "Composite" idea I have, since we 
can't... extrospect? the containing struct from the field. Pros and and Cons...



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -265,6 +265,30 @@ func (n *DataSource) Process(ctx context.Context) 
([]*Checkpoint, error) {
                func(bcr *byteCountReader, ptransformID, timerFamilyID string) 
error {
                        tmap, err := decodeTimer(cp, wc, bcr)
                        log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v 
- %+v  err: %v", ptransformID, timerFamilyID, tmap, err)
+                       log.Infof(ctx, "OnTimerTransforms = %+v", 
n.OnTimerTransforms[ptransformID].Fn)
+                       if fn, ok := 
n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok {
+                               log.Infof(ctx, "found ontimer method, invoking 
callback")
+                               _, err := 
n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, tmap)
+                               if err != nil {
+                                       return errors.WithContext(err, "ontimer 
callback invocation failed")
+                               }
+                       }
+                       // Check if there's a continuation and return residuals
+                       // Needs to be done immediately after processing to not 
lose the element.
+                       // if c := n.getProcessContinuation(); c != nil {
+                       //      cp, err := n.checkpointThis(ctx, c)
+                       //      if err != nil {
+                       //              // Errors during checkpointing should 
fail a bundle.
+                       //              return err
+                       //      }
+                       //      if cp != nil {
+                       //              checkpoints = append(checkpoints, cp)
+                       //      }
+                       // }
+                       //      We've finished processing an element, check if 
we have finished a split.
+                       // if n.incrementIndexAndCheckSplit() {
+                       //      return splitSuccess
+                       // }

Review Comment:
   I'm 99% sure that OnTimer shouldn't deal with Process Continuations. 
   
   I think we'd just want to be able to set another timer?



##########
sdks/go/pkg/beam/core/runtime/exec/timers.go:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+type UserTimerAdapter interface {
+       NewTimerProvider(ctx context.Context, manager DataManager, 
inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput) 
(timerProvider, error)
+}
+
+type userTimerAdapter struct {
+       SID            StreamID
+       wc             WindowEncoder
+       kc             ElementEncoder
+       Dc             ElementDecoder
+       TimerIDToCoder map[string]*coder.Coder
+       C              *coder.Coder
+}
+
+func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoders 
map[string]*coder.Coder) UserTimerAdapter {
+       if !coder.IsW(c) {
+               panic(fmt.Sprintf("expected WV coder for user timer %v: %v", 
sID, c))
+       }
+
+       wc := MakeWindowEncoder(c.Window)
+       var kc ElementEncoder
+       var dc ElementDecoder
+       if coder.IsKV(coder.SkipW(c)) {

Review Comment:
   Note: if it isn't a KV, this should fail at pipeline construction time. 
Timers (and State) are always keyed, so at execution time, we should be able to 
take that as a guarantee and not have to check the coder at every layer WRT 
timers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to