lostluck commented on a change in pull request #16980:
URL: https://github.com/apache/beam/pull/16980#discussion_r818257293
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -39,23 +40,47 @@ type MainInput struct {
RTracker sdf.RTracker
}
+type bundleFinalizationCallback struct {
+ callback func() error
+ validUntil time.Time
+}
+
+// bundleFinalizer holds all the user defined callbacks to be run on bundle
finalization.
+// Implements typex.BundleFinalization
+type bundleFinalizer struct {
+ callbacks []bundleFinalizationCallback
+ lastValidCallback time.Time // Used to track when we can safely gc the
bundleFinalizer
+}
+
+// RegisterCallback is used to register callbacks during DoFn execution.
+func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
+ callback := bundleFinalizationCallback{
+ callback: cb,
+ validUntil: time.Now().Add(t),
+ }
+ bf.callbacks = append(bf.callbacks, callback)
+ if bf.lastValidCallback.Before(callback.validUntil) {
+ bf.lastValidCallback = callback.validUntil
+ }
+}
+
// Invoke invokes the fn with the given values. The extra values must match
the non-main
// side input and emitters. It returns the direct output, if any.
-func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{})
(*FullValue, error) {
+func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra
...interface{}) (*FullValue, error) {
Review comment:
But what about adding the optional bundleFinalizer on a field on
MainInput, like we do the optional RTracker? (To be clear, they aren't exactly
the same notionally, but they're very similar, I can be convinced either way).
Don't forget to update the Design Doc with the implementation attempts and
rationale about why we are or are not going with that approach.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -338,14 +347,36 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
c.cache.CompleteBundle(tokens...)
mons, pylds := monitoring(plan, store)
+ requiresFinalization := false
// Move the plan back to the candidate state
c.mu.Lock()
// Mark the instruction as failed.
if err != nil {
c.failed[instID] = err
} else {
- // Non failure plans can be re-used.
- c.plans[bdID] = append(c.plans[bdID], plan)
+ // Non failure plans should either be moved to the
finalized state
+ // or to plans so they can be re-used.
+ expiration := plan.GetExpirationTime(ctx,
string(instID))
+ if time.Now().Before(expiration) {
+ // TODO(damccorm) - we can be a little smarter
about data structures here by
+ // by storing plans awaiting finalization in a
heap. That way when we expire plans
+ // here its O(1) instead of O(n) (though
adding/finalizing will still be O(logn))
Review comment:
+1 Especially once Generic container types begin to drop with Go 1.18.
And I agree that maintaining our own Sorted By Time Slices would be a bit
irritating in the short term, and require it's own set of testing. Not hard
(because the sort package does exist, just finicky to test all the edges).
I will let you know that in Streaming jobs, there are typically O(100s) if
not O(1000s) of bundles coming through in very quick succession, so we'll need
to keep an eye out in case finalization becomes a bottleneck, once we're able
to load test it in earnest.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
##########
@@ -99,6 +124,11 @@ func newInvoker(fn *funcx.Fn) *invoker {
if n.outErrIdx, ok = fn.Error(); !ok {
n.outErrIdx = -1
}
+ // TODO(@damccorm) - add this back in once BundleFinalization is
implemented
Review comment:
Prefer to annotate TODOs with JIRA tickets instead of people. Most of
the legacy TODOs use people because they were added before the SDK was
externalized.
So here, TODO(BEAM-10976)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]