lostluck commented on code in PR #26101:
URL: https://github.com/apache/beam/pull/26101#discussion_r1178321047
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1386,35 +1386,54 @@ func validateState(fn *DoFn, numIn mainInputs) error {
}
func validateOnTimerFn(fn *DoFn) error {
- if _, ok := fn.OnTimerFn(); !ok {
+ if _, ok := fn.methods[onTimerName]; !ok {
err := errors.Errorf("OnTimer function not defined for DoFn:
%v", fn.Name())
return errors.SetTopLevelMsgf(err, "OnTimer function not
defined for DoFn: %v. Ensure that OnTimer function is implemented for the
DoFn.", fn.Name())
}
- return nil
-}
+ if _, ok := fn.methods[onTimerName].TimerProvider(); !ok {
+ err := errors.Errorf("OnTimer function doesn't use a
TimerProvider, but Timer field is attached to the DoFn(%v): %v", fn.Name(),
fn.PipelineTimers())
+ return errors.SetTopLevelMsgf(err, "OnTimer function doesn't
use a TimerProvider, but Timer field is attached to the DoFn(%v): %v"+
+ ", Ensure that you are using the TimerProvider to set
and clear the timers.", fn.Name(), fn.PipelineTimers())
+ }
-func validateTimer(fn *DoFn) error {
- if fn.Fn == nil {
- return nil
+ _, otNum, otExists := fn.methods[onTimerName].Emits()
+ _, peNum, peExists := fn.methods[processElementName].Emits()
+
+ if otExists == peExists {
+ if otNum != peNum {
+ return fmt.Errorf("OnTimer and ProcessElement functions
for DoFn should have exactly same emitters, no. of emitters used in OnTimer:
%v, no. of emitters used in ProcessElement: %v", otNum, peNum)
+ }
+ } else {
+ return fmt.Errorf("OnTimer and ProcessElement functions for
DoFn should have exactly same emitters, emitters used in OnTimer: %v, emitters
used in ProcessElement: %v", otExists, peExists)
}
+ return nil
+}
+
+func validateTimer(fn *DoFn, numIn mainInputs) error {
pt := fn.PipelineTimers()
- if _, ok := fn.Fn.TimerProvider(); ok {
+ if _, ok := fn.methods[processElementName].TimerProvider(); ok {
+ if numIn == MainSingle {
+ err := errors.Errorf("ProcessElement uses a
TimerProvider, but is not keyed")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses
a TimerProvider, but is not keyed. "+
+ "All stateful DoFns must take a key/value pair
as an input.")
+ }
if len(pt) == 0 {
err := errors.New("ProcessElement uses a TimerProvider,
but no Timer fields are defined in the DoFn")
return errors.SetTopLevelMsgf(err, "ProcessElement uses
a TimerProvider, but no timer fields are defined in the DoFn"+
- ", Ensure that you are including the exported
timer field in the DoFn that you're using to set/clear timers.")
+ ", Ensure that your DoFn exports the Timer
fields used to set and clear timers.")
}
timerKeys := make(map[string]timers.PipelineTimer)
for _, t := range pt {
- k := t.TimerFamily()
- if timer, ok := timerKeys[k]; ok {
- err := errors.Errorf("Duplicate timer key %v",
k)
- return errors.SetTopLevelMsgf(err, "Duplicate
timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k,
timer, t)
- } else {
- timerKeys[k] = t
+ for timerFamilyID := range t.Timers() {
+ if timer, ok := timerKeys[timerFamilyID]; ok {
+ err := errors.Errorf("Duplicate timer
key %v", timerFamilyID)
+ return errors.SetTopLevelMsgf(err,
"Duplicate timer family ID %v used by %v and %v. Ensure that timer family IDs
are unique per DoFn", timerFamilyID, timer, t)
Review Comment:
the value "timer" here is just a different timer. For this check we may want
to store field names in a map, so the error can be sensible. Probably need to
change the `PipelineTimers` method to allow for it, by storing both the field
name, and the timer for a given family name.
It's good we have an error at least, but
##########
sdks/go/pkg/beam/core/runtime/exec/translate.go:
##########
@@ -588,6 +592,20 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
}
}
+ if len(userTimers) > 0 {
+ log.Debugf(context.TODO(),
"userTimers %+v", userTimers)
+ timerIDToCoder :=
make(map[string]*coder.Coder)
+ sID := StreamID{Port: Port{URL:
b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to}
+ ec, wc, err :=
b.makeCoderForPCollection(input[0])
+ if err != nil {
+ return nil, err
+ }
+ for key := range userTimers {
+ timerIDToCoder[key] =
coder.NewT(ec, wc)
+ }
Review Comment:
2. Yeah, here we don't even use the coderID stored in the spec, but we
ultimately pass through the whole PCollection coder instead of extracting out
the Key component here.
##########
sdks/go/pkg/beam/core/runtime/exec/timers.go:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+// UserTimerAdapter provides a timer provider to be used for manipulating
timers.
+type UserTimerAdapter interface {
+ NewTimerProvider(ctx context.Context, manager DataManager,
inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput)
(timerProvider, error)
Review Comment:
5. We can thus also pre-create and cache the Key decoder here, and use
adapter for the coder, instead of doing it at the top of the DataSource.
##########
sdks/go/pkg/beam/core/runtime/exec/timers.go:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+// UserTimerAdapter provides a timer provider to be used for manipulating
timers.
+type UserTimerAdapter interface {
+ NewTimerProvider(ctx context.Context, manager DataManager,
inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput)
(timerProvider, error)
+}
+
+type userTimerAdapter struct {
+ sID StreamID
+ timerIDToCoder map[string]*coder.Coder
+}
+
+// NewUserTimerAdapter returns a user timer adapter for the given StreamID and
timer coder.
+func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoders
map[string]*coder.Coder) UserTimerAdapter {
+ if !coder.IsW(c) {
+ panic(fmt.Sprintf("expected WV coder for user timer %v: %v",
sID, c))
+ }
+
+ return &userTimerAdapter{sID: sID, timerIDToCoder: timerCoders}
+}
+
+// NewTimerProvider creates and returns a timer provider to set/clear timers.
+func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager
DataManager, inputTs typex.EventTime, w []typex.Window, element *MainInput)
(timerProvider, error) {
+ userKey := &FullValue{Elm: element.Key.Elm}
+ tp := timerProvider{
+ ctx: ctx,
+ tm: manager,
+ userKey: userKey,
+ inputTimestamp: inputTs,
+ sID: u.sID,
+ window: w,
+ writersByFamily: make(map[string]io.Writer),
+ codersByFamily: u.timerIDToCoder,
Review Comment:
4. Per this bit of jumping around, since it's always going to be the same
coder for all families in the same DoFn, we can pre-make the Encoder here too,
so it's not created on every call to Set.
##########
sdks/go/pkg/beam/core/runtime/exec/coder.go:
##########
@@ -145,8 +145,15 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
}
case coder.Timer:
+ tc := coder.SkipW(c).Components[0]
+ if coder.IsKV(tc) {
+ return &timerEncoder{
+ elm: MakeElementEncoder(tc.Components[0]),
+ win: MakeWindowEncoder(c.Window),
+ }
+ }
return &timerEncoder{
- elm: MakeElementEncoder(c.Components[0]),
Review Comment:
3. Yeah, not extracting them early leads to issues at this stage where we
only need the key, but we require a KV. Could lead to subtle, weird errors I
think.
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -116,6 +116,22 @@ func TryParDo(s Scope, dofn any, col PCollection, opts
...Option) ([]PCollection
}
}
+ wc := inWfn.Coder()
+ pipelineTimers := fn.PipelineTimers()
+ if len(pipelineTimers) > 0 {
+ c, err := inferCoder(typex.New(reflect.TypeOf(col.Type())))
+ if err != nil {
+ return nil, addParDoCtx(errors.New("error infering
coder from col"), s)
+ }
+ edge.TimerCoders = make(map[string]*coder.Coder)
Review Comment:
1. If the coder is always going to be the Key of the input PCollection KV
type, do we need to store a different coder for every timer family, since
they're identical?
This means we could also avoid infering the coder multiple times, since this
could be done in the graphx/translate.go step instead, and we can probably
avoid filtering down to the Key type a few times in the exec side. (I should be
able to comment on those directly to help track them down...)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]