This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch beam24931 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0aeed95d6a8e6915c1b650bd7c13c1955c5dd19b Author: Robert Burke <[email protected]> AuthorDate: Sat Jan 7 00:22:14 2023 -0800 return checkpoint list --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 48 ++++++--- .../pkg/beam/core/runtime/exec/datasource_test.go | 115 ++++++++++++++++++++- sdks/go/pkg/beam/core/runtime/exec/plan.go | 25 +++-- sdks/go/pkg/beam/core/runtime/exec/unit.go | 2 +- sdks/go/pkg/beam/core/runtime/exec/unit_test.go | 14 +-- sdks/go/pkg/beam/core/runtime/harness/harness.go | 32 +++--- sdks/go/pkg/beam/runners/direct/impulse.go | 4 +- 7 files changed, 183 insertions(+), 57 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 55b0b0a5cad..c90f7016ca6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -127,10 +127,10 @@ func (r *byteCountReader) reset() int { } // Process opens the data source, reads and decodes data, kicking off element processing. -func (n *DataSource) Process(ctx context.Context) error { +func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { r, err := n.source.OpenRead(ctx, n.SID) if err != nil { - return err + return nil, err } defer r.Close() n.PCol.resetSize() // initialize the size distribution for this bundle. @@ -154,23 +154,24 @@ func (n *DataSource) Process(ctx context.Context) error { cp = MakeElementDecoder(c) } + var checkpoints []*Checkpoint for { if n.incrementIndexAndCheckSplit() { - return nil + return checkpoints, nil } // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes? ws, t, pn, err := DecodeWindowedValueHeader(wc, r) if err != nil { if err == io.EOF { - return nil + return nil, nil } - return errors.Wrap(err, "source failed") + return nil, errors.Wrap(err, "source failed") } // Decode key or parallel element. pe, err := cp.Decode(&bcr) if err != nil { - return errors.Wrap(err, "source decode failed") + return nil, errors.Wrap(err, "source decode failed") } pe.Timestamp = t pe.Windows = ws @@ -180,17 +181,30 @@ func (n *DataSource) Process(ctx context.Context) error { for _, cv := range cvs { values, err := n.makeReStream(ctx, cv, &bcr, len(cvs) == 1 && n.singleIterate) if err != nil { - return err + return nil, err } valReStreams = append(valReStreams, values) } if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil { - return err + return nil, err } // Collect the actual size of the element, and reset the bytecounter reader. n.PCol.addSize(int64(bcr.reset())) bcr.reader = r + + // Check if there's a continuation and return residuals + // Needs to be done immeadiately after processing to not lose the element. + if c := n.getProcessContinuation(); c != nil { + cp, err := n.checkpointThis(c) + if err != nil { + // Errors during checkpointing should fail a bundle. + return nil, err + } + if cp != nil { + checkpoints = append(checkpoints, cp) + } + } } } @@ -397,18 +411,22 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } +type Checkpoint struct { + SR SplitResult + Reapply time.Duration +} + // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not // splittable or has not returned a resuming continuation, the function returns an empty // SplitResult, a negative resumption time, and a false boolean to indicate that no split // occurred. -func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { +func (n *DataSource) checkpointThis(pc sdf.ProcessContinuation) (*Checkpoint, error) { n.mu.Lock() defer n.mu.Unlock() - pc := n.getProcessContinuation() if pc == nil || !pc.ShouldResume() { - return SplitResult{}, -1 * time.Minute, false, nil + return nil, nil } su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions)) @@ -418,17 +436,17 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { // Checkpointing is functionally a split at fraction 0.0 rs, err := su.Checkpoint() if err != nil { - return SplitResult{}, -1 * time.Minute, false, err + return nil, err } if len(rs) == 0 { - return SplitResult{}, -1 * time.Minute, false, nil + return nil, nil } encodeElms := n.makeEncodeElms() rsEnc, err := encodeElms(rs) if err != nil { - return SplitResult{}, -1 * time.Minute, false, err + return nil, err } res := SplitResult{ @@ -437,7 +455,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { InId: su.GetInputId(), OW: ow, } - return res, pc.ResumeDelay(), true, nil + return &Checkpoint{SR: res, Reapply: pc.ResumeDelay()}, nil } // Split takes a sorted set of potential split indices and a fraction of the diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 19f639b31d0..17f59638340 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -20,14 +20,20 @@ import ( "fmt" "io" "math" + "reflect" "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/coderx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -314,7 +320,10 @@ func TestDataSource_Split(t *testing.T) { t.Fatalf("error in Split: got primary index = %v, want %v ", got, want) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle) validateSource(t, out, source, makeValues(test.expected...)) @@ -449,7 +458,10 @@ func TestDataSource_Split(t *testing.T) { if got, want := splitRes.PI, test.splitIdx-1; got != want { t.Fatalf("error in Split: got primary index = %v, want %v ", got, want) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle) validateSource(t, out, source, makeValues(test.expected...)) @@ -582,7 +594,10 @@ func TestDataSource_Split(t *testing.T) { if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err) } @@ -858,6 +873,100 @@ func TestSplitHelper(t *testing.T) { }) } +func TestCheckpointing(t *testing.T) { + t.Run("nil", func(t *testing.T) { + cps, err := (&DataSource{}).checkpointThis(nil) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v", cps, err) + } + }) + t.Run("Stop", func(t *testing.T) { + cps, err := (&DataSource{}).checkpointThis(sdf.StopProcessing()) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v", cps, err) + } + }) + t.Run("Delay_no_residuals", func(t *testing.T) { + wesInv, _ := newWatermarkEstimatorStateInvoker(nil) + root := &DataSource{ + Out: &ProcessSizedElementsAndRestrictions{ + PDo: &ParDo{}, + wesInv: wesInv, + rt: offsetrange.NewTracker(offsetrange.Restriction{}), + elm: &FullValue{ + Windows: window.SingleGlobalWindow, + }, + }, + } + cp, err := root.checkpointThis(sdf.ResumeProcessingIn(time.Second * 13)) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v, want nil", cp, err) + } + if cp != nil { + t.Fatalf("checkpointThis() = %v, want nil", cp) + } + }) + dfn, err := graph.NewDoFn(&WindowBlockingSdf{claim: -1}, graph.NumMainInputs(graph.MainSingle)) + if err != nil { + t.Fatalf("invalid function: %v", err) + } + t.Run("Delay_residuals", func(t *testing.T) { + wesInv, _ := newWatermarkEstimatorStateInvoker(nil) + rest := offsetrange.Restriction{Start: 1, End: 10} + intCoder, _ := coderx.NewVarIntZ(reflectx.Int) + root := &DataSource{ + Coder: coder.NewW( + coder.NewKV([]*coder.Coder{ + coder.NewKV([]*coder.Coder{ + coder.CoderFrom(intCoder), // Element + coder.NewR(typex.New(reflect.TypeOf(rest))), // Restriction + }), + coder.NewDouble(), // Size + }), + coder.NewGlobalWindow(), + ), + Out: &ProcessSizedElementsAndRestrictions{ + PDo: &ParDo{ + Fn: dfn, + }, + TfId: "testTransformID", + wesInv: wesInv, + rt: offsetrange.NewTracker(rest), + elm: &FullValue{ + Windows: window.SingleGlobalWindow, + Elm: &FullValue{ + Elm: 42, + Elm2: rest, + }, + }, + }, + } + if err := root.Up(context.Background()); err != nil { + t.Fatalf("invalid function: %v", err) + } + if err := root.Out.Up(context.Background()); err != nil { + t.Fatalf("invalid function: %v", err) + } + wantDelay := time.Second * 13 + cp, err := root.checkpointThis(sdf.ResumeProcessingIn(wantDelay)) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v, want nil", cp, err) + } + if cp == nil { + t.Fatalf("checkpointThis() = %v, want not nil", cp) + } + if got, want := cp.Reapply, wantDelay; got != want { + t.Errorf("checkpointThis(delay(%v)) delay = %v, want %v", want, got, want) + } + if got, want := cp.SR.TId, root.Out.(*ProcessSizedElementsAndRestrictions).TfId; got != want { + t.Errorf("checkpointThis() transformID = %v, want %v", got, want) + } + if got, want := cp.SR.InId, "i0"; got != want { + t.Errorf("checkpointThis() transformID = %v, want %v", got, want) + } + }) +} + func runOnRoots(ctx context.Context, t *testing.T, p *Plan, name string, mthd func(Root, context.Context) error) { t.Helper() for i, root := range p.roots { diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 0189de51c7f..7958cf38238 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -31,11 +31,12 @@ import ( // from a part of a pipeline. A plan can be used to process multiple bundles // serially. type Plan struct { - id string // id of the bundle descriptor for this plan - roots []Root - units []Unit - pcols []*PCollection - bf *bundleFinalizer + id string // id of the bundle descriptor for this plan + roots []Root + units []Unit + pcols []*PCollection + bf *bundleFinalizer + checkpoints []*Checkpoint status Status @@ -126,7 +127,11 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro } } for _, root := range p.roots { - if err := callNoPanic(ctx, root.Process); err != nil { + if err := callNoPanic(ctx, func(ctx context.Context) error { + cps, err := root.Process(ctx) + p.checkpoints = cps + return err + }); err != nil { p.status = Broken return errors.Wrapf(err, "while executing Process for %v", p) } @@ -281,9 +286,7 @@ func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) { } // Checkpoint attempts to split an SDF if the DoFn self-checkpointed. -func (p *Plan) Checkpoint() (SplitResult, time.Duration, bool, error) { - if p.source != nil { - return p.source.Checkpoint() - } - return SplitResult{}, -1 * time.Minute, false, nil +func (p *Plan) Checkpoint() []*Checkpoint { + defer func() { p.checkpoints = nil }() + return p.checkpoints } diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit.go b/sdks/go/pkg/beam/core/runtime/exec/unit.go index 04635086ab1..ac907d98f75 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit.go @@ -57,7 +57,7 @@ type Root interface { // Process processes the entire source, notably emitting elements to // downstream nodes. - Process(ctx context.Context) error + Process(ctx context.Context) ([]*Checkpoint, error) } // ElementProcessor presents a component that can process an element. diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go index 9d0137a0edd..885294ed397 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go @@ -125,6 +125,8 @@ type FixedRoot struct { Out Node } +var _ Root = (*FixedRoot)(nil) + func (n *FixedRoot) ID() UnitID { return n.UID } @@ -137,13 +139,13 @@ func (n *FixedRoot) StartBundle(ctx context.Context, id string, data DataContext return n.Out.StartBundle(ctx, id, data) } -func (n *FixedRoot) Process(ctx context.Context) error { +func (n *FixedRoot) Process(ctx context.Context) ([]*Checkpoint, error) { for _, elm := range n.Elements { if err := n.Out.ProcessElement(ctx, &elm.Key, elm.Values...); err != nil { - return err + return nil, err } } - return nil + return nil, nil } func (n *FixedRoot) FinishBundle(ctx context.Context) error { @@ -186,13 +188,13 @@ func (n *BenchRoot) StartBundle(ctx context.Context, id string, data DataContext return n.Out.StartBundle(ctx, id, data) } -func (n *BenchRoot) Process(ctx context.Context) error { +func (n *BenchRoot) Process(ctx context.Context) ([]*Checkpoint, error) { for elm := range n.Elements { if err := n.Out.ProcessElement(ctx, &elm.Key, elm.Values...); err != nil { - return err + return nil, err } } - return nil + return nil, nil } func (n *BenchRoot) FinishBundle(ctx context.Context) error { diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 6b2386cc2e3..c260a46c80e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -415,6 +415,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe mons, pylds := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID]) + checkpoints := plan.Checkpoint() requiresFinalization := false // Move the plan back to the candidate state c.mu.Lock() @@ -447,21 +448,19 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } } - // Check if the underlying DoFn self-checkpointed. - sr, delay, checkpointed, checkErr := plan.Checkpoint() - var rRoots []*fnpb.DelayedBundleApplication - if checkpointed { - rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS)) - for i, r := range sr.RS { - rRoots[i] = &fnpb.DelayedBundleApplication{ - Application: &fnpb.BundleApplication{ - TransformId: sr.TId, - InputId: sr.InId, - Element: r, - OutputWatermarks: sr.OW, - }, - RequestedTimeDelay: durationpb.New(delay), + if len(checkpoints) > 0 { + for _, cp := range checkpoints { + for _, r := range cp.SR.RS { + rRoots = append(rRoots, &fnpb.DelayedBundleApplication{ + Application: &fnpb.BundleApplication{ + TransformId: cp.SR.TId, + InputId: cp.SR.InId, + Element: r, + OutputWatermarks: cp.SR.OW, + }, + RequestedTimeDelay: durationpb.New(cp.Reapply), + }) } } } @@ -477,11 +476,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if err != nil { return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err) } - - if checkErr != nil { - return fail(ctx, instID, "process bundle failed at checkpointing for instruction %v using plan %v : %v", instID, bdID, checkErr) - } - return &fnpb.InstructionResponse{ InstructionId: string(instID), Response: &fnpb.InstructionResponse_ProcessBundle{ diff --git a/sdks/go/pkg/beam/runners/direct/impulse.go b/sdks/go/pkg/beam/runners/direct/impulse.go index 1d6f78d0620..c2348474a0b 100644 --- a/sdks/go/pkg/beam/runners/direct/impulse.go +++ b/sdks/go/pkg/beam/runners/direct/impulse.go @@ -43,13 +43,13 @@ func (n *Impulse) StartBundle(ctx context.Context, id string, data exec.DataCont return n.Out.StartBundle(ctx, id, data) } -func (n *Impulse) Process(ctx context.Context) error { +func (n *Impulse) Process(ctx context.Context) ([]*exec.Checkpoint, error) { value := &exec.FullValue{ Windows: window.SingleGlobalWindow, Timestamp: mtime.Now(), Elm: n.Value, } - return n.Out.ProcessElement(ctx, value) + return nil, n.Out.ProcessElement(ctx, value) } func (n *Impulse) FinishBundle(ctx context.Context) error {
