lostluck commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r863119527
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -199,6 +201,18 @@ var requiredSdfNames = []string{
createTrackerName,
}
+var optionalSdfNames = []string{
+ truncateRestrictionName,
+}
+
+var sdfNames = []string{
Review Comment:
Consider not needing to re-specify explicitly, so we don't end up with
additional places to update: eg https://go.dev/play/p/YAa2Xtx5aba
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
Review Comment:
This isn't used AFAICT, please delete it.
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -847,6 +876,7 @@ func validateSdfSigNumbers(fn *Fn, num int) error {
func validateSdfSigTypes(fn *Fn, num int) error {
restrictionT := fn.methods[createInitialRestrictionName].Ret[0].T
rTrackerT := reflect.TypeOf((*sdf.RTracker)(nil)).Elem()
+ bRTrackerT := fn.methods[createTrackerName].Ret[0].T
Review Comment:
Nits:
Please move this closer to it's use , right before the for loop that it's
using.
Consider a more descriptive short name. "b" as a prefix doesn't denote
anything here, a better name for this would be `rTrackerImplT` since
`rTrackerT` represents the interface type.
##########
sdks/go/pkg/beam/core/sdf/lock.go:
##########
@@ -80,3 +80,73 @@ func (rt *LockRTracker) GetRestriction() interface{} {
defer rt.Mu.Unlock()
return rt.Rt.GetRestriction()
}
+
+// NewLockBoundableRTracker creates a LockBoundableRTracker initialized with
the specified
+// restriction tracker as its underlying restriction tracker.
+func NewLockBoundableRTracker(rt BoundableRTracker) *LockRTracker {
+ return &LockRTracker{Rt: rt}
+}
+
+// LockBoundableRTracker is a restriction tracker that wraps another
restriction
+// tracker and adds thread safety to it by locking a mutex in each method,
+// before delegating to the underlying tracker.
+type LockBoundableRTracker struct {
Review Comment:
Is it necessary to have a new type, and duplicating all the methods, instead
of adding to the existing RTracker?
Assume as default, if there is no IsBounded method (determinable with a type
assertion), that the restriction is bounded, like we would if the
restriction/tracker didn't have the method.
Type assertions are fast, and we can always delegate to the underlying
tracker when available in the one method.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
+ if tracker, ok := restTracker.(sdf.BoundableRTracker); ok &&
tracker.IsBounded() {
+ tracker.GetRestriction()
Review Comment:
Shouldn't this be
```suggestion
return tracker.GetRestriction()
```
?
Also, shouldn't that be for `!tracker.IsBounded()` so we're passing
unbounded restrictions out without change, not for bounded restrictions?
Either way, please add a unit test for the default behavior for both bounded
and unbounded and ensure we have the correct behavior for each.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
Review Comment:
This function isn't used outside of the exec package, please unexport it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]