riteshghorse commented on code in PR #26101:
URL: https://github.com/apache/beam/pull/26101#discussion_r1179682689
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1386,35 +1386,54 @@ func validateState(fn *DoFn, numIn mainInputs) error {
}
func validateOnTimerFn(fn *DoFn) error {
- if _, ok := fn.OnTimerFn(); !ok {
+ if _, ok := fn.methods[onTimerName]; !ok {
err := errors.Errorf("OnTimer function not defined for DoFn:
%v", fn.Name())
return errors.SetTopLevelMsgf(err, "OnTimer function not
defined for DoFn: %v. Ensure that OnTimer function is implemented for the
DoFn.", fn.Name())
}
- return nil
-}
+ if _, ok := fn.methods[onTimerName].TimerProvider(); !ok {
+ err := errors.Errorf("OnTimer function doesn't use a
TimerProvider, but Timer field is attached to the DoFn(%v): %v", fn.Name(),
fn.PipelineTimers())
+ return errors.SetTopLevelMsgf(err, "OnTimer function doesn't
use a TimerProvider, but Timer field is attached to the DoFn(%v): %v"+
+ ", Ensure that you are using the TimerProvider to set
and clear the timers.", fn.Name(), fn.PipelineTimers())
+ }
-func validateTimer(fn *DoFn) error {
- if fn.Fn == nil {
- return nil
+ _, otNum, otExists := fn.methods[onTimerName].Emits()
+ _, peNum, peExists := fn.methods[processElementName].Emits()
+
+ if otExists == peExists {
+ if otNum != peNum {
+ return fmt.Errorf("OnTimer and ProcessElement functions
for DoFn should have exactly same emitters, no. of emitters used in OnTimer:
%v, no. of emitters used in ProcessElement: %v", otNum, peNum)
+ }
+ } else {
+ return fmt.Errorf("OnTimer and ProcessElement functions for
DoFn should have exactly same emitters, emitters used in OnTimer: %v, emitters
used in ProcessElement: %v", otExists, peExists)
}
+ return nil
+}
+
+func validateTimer(fn *DoFn, numIn mainInputs) error {
pt := fn.PipelineTimers()
- if _, ok := fn.Fn.TimerProvider(); ok {
+ if _, ok := fn.methods[processElementName].TimerProvider(); ok {
+ if numIn == MainSingle {
+ err := errors.Errorf("ProcessElement uses a
TimerProvider, but is not keyed")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses
a TimerProvider, but is not keyed. "+
+ "All stateful DoFns must take a key/value pair
as an input.")
+ }
if len(pt) == 0 {
err := errors.New("ProcessElement uses a TimerProvider,
but no Timer fields are defined in the DoFn")
return errors.SetTopLevelMsgf(err, "ProcessElement uses
a TimerProvider, but no timer fields are defined in the DoFn"+
- ", Ensure that you are including the exported
timer field in the DoFn that you're using to set/clear timers.")
+ ", Ensure that your DoFn exports the Timer
fields used to set and clear timers.")
}
timerKeys := make(map[string]timers.PipelineTimer)
for _, t := range pt {
- k := t.TimerFamily()
- if timer, ok := timerKeys[k]; ok {
- err := errors.Errorf("Duplicate timer key %v",
k)
- return errors.SetTopLevelMsgf(err, "Duplicate
timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k,
timer, t)
- } else {
- timerKeys[k] = t
+ for timerFamilyID := range t.Timers() {
+ if timer, ok := timerKeys[timerFamilyID]; ok {
+ err := errors.Errorf("Duplicate timer
key %v", timerFamilyID)
+ return errors.SetTopLevelMsgf(err,
"Duplicate timer family ID %v used by %v and %v. Ensure that timer family IDs
are unique per DoFn", timerFamilyID, timer, t)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]