[ 
https://issues.apache.org/jira/browse/BEAM-11106?focusedWorklogId=765154&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765154
 ]

ASF GitHub Bot logged work on BEAM-11106:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 21:21
            Start Date: 02/May/22 21:21
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r863119527


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -199,6 +201,18 @@ var requiredSdfNames = []string{
        createTrackerName,
 }
 
+var optionalSdfNames = []string{
+       truncateRestrictionName,
+}
+
+var sdfNames = []string{

Review Comment:
   Consider not needing to re-specify explicitly, so we don't end up with 
additional places to update: eg https://go.dev/play/p/YAa2Xtx5aba



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()

Review Comment:
   This isn't used AFAICT, please delete it.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -847,6 +876,7 @@ func validateSdfSigNumbers(fn *Fn, num int) error {
 func validateSdfSigTypes(fn *Fn, num int) error {
        restrictionT := fn.methods[createInitialRestrictionName].Ret[0].T
        rTrackerT := reflect.TypeOf((*sdf.RTracker)(nil)).Elem()
+       bRTrackerT := fn.methods[createTrackerName].Ret[0].T

Review Comment:
   Nits:
   Please move this closer to it's use , right before the for loop that it's 
using.
   
   Consider a more descriptive short name. "b" as a prefix doesn't denote 
anything here, a better name for this would be `rTrackerImplT` since 
`rTrackerT` represents the interface type.
   



##########
sdks/go/pkg/beam/core/sdf/lock.go:
##########
@@ -80,3 +80,73 @@ func (rt *LockRTracker) GetRestriction() interface{} {
        defer rt.Mu.Unlock()
        return rt.Rt.GetRestriction()
 }
+
+// NewLockBoundableRTracker creates a LockBoundableRTracker initialized with 
the specified
+// restriction tracker as its underlying restriction tracker.
+func NewLockBoundableRTracker(rt BoundableRTracker) *LockRTracker {
+       return &LockRTracker{Rt: rt}
+}
+
+// LockBoundableRTracker is a restriction tracker that wraps another 
restriction
+// tracker and adds thread safety to it by locking a mutex in each method,
+// before delegating to the underlying tracker.
+type LockBoundableRTracker struct {

Review Comment:
   Is it necessary to have a new type, and duplicating all the methods, instead 
of adding to the existing RTracker?
   
   Assume as default, if there is no IsBounded method (determinable with a type 
assertion), that the restriction is bounded, like we would if the 
restriction/tracker didn't have the method.
   
   Type assertions are fast, and we can always delegate to the underlying 
tracker when available in the one method.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       if tracker, ok := restTracker.(sdf.BoundableRTracker); ok && 
tracker.IsBounded() {
+               tracker.GetRestriction()

Review Comment:
   Shouldn't this be 
   ```suggestion
                return tracker.GetRestriction()
   ```
   ?
   
   Also, shouldn't that be for `!tracker.IsBounded()` so we're passing 
unbounded restrictions out without change, not for bounded restrictions?
   
   Either way, please add a unit test for the default behavior for both bounded 
and unbounded and ensure we have the correct behavior for each.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,88 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(rest interface{}, elms *FullValue) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{

Review Comment:
   This function isn't used outside of the exec package, please unexport it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765154)
    Time Spent: 3h 20m  (was: 3h 10m)

> [Go SDK] Truncating SDFs during drain
> -------------------------------------
>
>                 Key: BEAM-11106
>                 URL: https://issues.apache.org/jira/browse/BEAM-11106
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Ritesh Ghorse
>            Priority: P3
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Allow SDFs to specify a restriction truncation method to permit faster drains.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to