riteshghorse commented on code in PR #26101:
URL: https://github.com/apache/beam/pull/26101#discussion_r1179682689


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1386,35 +1386,54 @@ func validateState(fn *DoFn, numIn mainInputs) error {
 }
 
 func validateOnTimerFn(fn *DoFn) error {
-       if _, ok := fn.OnTimerFn(); !ok {
+       if _, ok := fn.methods[onTimerName]; !ok {
                err := errors.Errorf("OnTimer function not defined for DoFn: 
%v", fn.Name())
                return errors.SetTopLevelMsgf(err, "OnTimer function not 
defined for DoFn: %v. Ensure that OnTimer function is implemented for the 
DoFn.", fn.Name())
        }
 
-       return nil
-}
+       if _, ok := fn.methods[onTimerName].TimerProvider(); !ok {
+               err := errors.Errorf("OnTimer function doesn't use a 
TimerProvider, but Timer field is attached to the DoFn(%v): %v", fn.Name(), 
fn.PipelineTimers())
+               return errors.SetTopLevelMsgf(err, "OnTimer function doesn't 
use a TimerProvider, but Timer field is attached to the DoFn(%v): %v"+
+                       ", Ensure that you are using the TimerProvider to set 
and clear the timers.", fn.Name(), fn.PipelineTimers())
+       }
 
-func validateTimer(fn *DoFn) error {
-       if fn.Fn == nil {
-               return nil
+       _, otNum, otExists := fn.methods[onTimerName].Emits()
+       _, peNum, peExists := fn.methods[processElementName].Emits()
+
+       if otExists == peExists {
+               if otNum != peNum {
+                       return fmt.Errorf("OnTimer and ProcessElement functions 
for DoFn should have exactly same emitters, no. of emitters used in OnTimer: 
%v, no. of emitters used in ProcessElement: %v", otNum, peNum)
+               }
+       } else {
+               return fmt.Errorf("OnTimer and ProcessElement functions for 
DoFn should have exactly same emitters, emitters used in OnTimer: %v, emitters 
used in ProcessElement: %v", otExists, peExists)
        }
 
+       return nil
+}
+
+func validateTimer(fn *DoFn, numIn mainInputs) error {
        pt := fn.PipelineTimers()
 
-       if _, ok := fn.Fn.TimerProvider(); ok {
+       if _, ok := fn.methods[processElementName].TimerProvider(); ok {
+               if numIn == MainSingle {
+                       err := errors.Errorf("ProcessElement uses a 
TimerProvider, but is not keyed")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a TimerProvider, but is not keyed. "+
+                               "All stateful DoFns must take a key/value pair 
as an input.")
+               }
                if len(pt) == 0 {
                        err := errors.New("ProcessElement uses a TimerProvider, 
but no Timer fields are defined in the DoFn")
                        return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a TimerProvider, but no timer fields are defined in the DoFn"+
-                               ", Ensure that you are including the exported 
timer field in the DoFn that you're using to set/clear timers.")
+                               ", Ensure that your DoFn exports the Timer 
fields used to set and clear timers.")
                }
                timerKeys := make(map[string]timers.PipelineTimer)
                for _, t := range pt {
-                       k := t.TimerFamily()
-                       if timer, ok := timerKeys[k]; ok {
-                               err := errors.Errorf("Duplicate timer key %v", 
k)
-                               return errors.SetTopLevelMsgf(err, "Duplicate 
timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k, 
timer, t)
-                       } else {
-                               timerKeys[k] = t
+                       for timerFamilyID := range t.Timers() {
+                               if timer, ok := timerKeys[timerFamilyID]; ok {
+                                       err := errors.Errorf("Duplicate timer 
key %v", timerFamilyID)
+                                       return errors.SetTopLevelMsgf(err, 
"Duplicate timer family ID %v used by %v and %v. Ensure that timer family IDs 
are unique per DoFn", timerFamilyID, timer, t)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to