This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch beam24505emptysplit in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7d058b4ede71dbafa9c47e0efbd72ed2e751122b Author: lostluck <[email protected]> AuthorDate: Fri Dec 2 15:32:50 2022 -0800 Return empty splits if unable to split, not errors --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 6 ++-- .../pkg/beam/core/runtime/exec/datasource_test.go | 32 +++++++++++----------- .../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 20 +++++++------- sdks/go/pkg/beam/core/runtime/exec/plan.go | 13 +++++---- sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 ++++++++- 5 files changed, 50 insertions(+), 34 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 9fa8df7500a..55b0b0a5cad 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,6 +29,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) // DataSource is a Root execution unit. @@ -453,7 +454,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { // sent to this DataSource, and is used to be able to perform accurate splits // even if the DataSource has not yet received all its elements. A bufSize of // 0 or less indicates that its unknown, and so uses the current known size. -func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitResult, error) { +func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) { if n == nil { return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits) } @@ -498,7 +499,8 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe } s, fr, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil) if err != nil { - return SplitResult{}, err + log.Infof(ctx, "Unsuccessful split: %v", err) + return SplitResult{Unsuccessful: true}, nil } // No fraction returned, perform channel split. diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 46c00e68ee8..75a988fd31e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -303,7 +303,7 @@ func TestDataSource_Split(t *testing.T) { runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) }) // SDK never splits on 0, so check that every test. - splitRes, err := p.Split(SplitPoints{Splits: []int64{0, test.splitIdx}}) + splitRes, err := p.Split(ctx, SplitPoints{Splits: []int64{0, test.splitIdx}}) if err != nil { t.Fatalf("error in Split: %v", err) } @@ -373,7 +373,7 @@ func TestDataSource_Split(t *testing.T) { <-blockedCh // Validate that we do not split on the element we're blocking on index. // The first valid split is at test.splitIdx. - if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil { + if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil { t.Errorf("error in Split: %v", err) } else { if got, want := splitRes.RI, test.splitIdx; got != want { @@ -439,7 +439,7 @@ func TestDataSource_Split(t *testing.T) { // SDK never splits on 0, so check that every test. sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, BufSize: test.bufSize} - splitRes, err := p.Split(sp) + splitRes, err := p.Split(ctx, sp) if err != nil { t.Fatalf("error in Split: %v", err) } @@ -505,7 +505,7 @@ func TestDataSource_Split(t *testing.T) { <-blockedCh // Validate that we either do or do not perform a sub-element split with the // given fraction. - if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil { + if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil { t.Errorf("error in Split: %v", err) } else { // For sub-element splits, check sub-element split only results. @@ -566,8 +566,8 @@ func TestDataSource_Split(t *testing.T) { dc := DataContext{Data: &TestDataManager{R: pr}} ctx := context.Background() - if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil { - t.Fatal("plan uninitialized, expected error when splitting, got nil") + if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful { + t.Fatalf("p.Split(before active) = %v,%v want unsuccessful split & nil err", sr, err) } for i, root := range p.units { if err := root.Up(ctx); err != nil { @@ -575,30 +575,30 @@ func TestDataSource_Split(t *testing.T) { } } p.status = Active - if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil { - t.Fatal("plan not started, expected error when splitting, got nil") + if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful { + t.Fatalf("p.Split(active, not started) = %v,%v want unsuccessful split & nil err", sr, err) } runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) }) - if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil { - t.Fatal("plan started, expected error when splitting, got nil") + if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { + t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err) } runOnRoots(ctx, t, p, "Process", Root.Process) - if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil { - t.Fatal("plan in progress, expected error when unable to get a desired split, got nil") + if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { + t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err) } runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle) - if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil { - t.Fatal("plan finished, expected error when splitting, got nil") + if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { + t.Fatalf("p.Split(finished) = %v,%v want unsuccessful split & nil err", sr, err) } validateSource(t, out, source, makeValues(elements...)) }) t.Run("sanity_errors", func(t *testing.T) { var source *DataSource - if _, err := source.Split([]int64{0}, -1, 0); err == nil { + if _, err := source.Split(context.Background(), []int64{0}, -1, 0); err == nil { t.Fatal("expected error splitting nil *DataSource") } - if _, err := source.Split(nil, -1, 0); err == nil { + if _, err := source.Split(context.Background(), nil, -1, 0); err == nil { t.Fatal("expected error splitting nil desired splits") } }) diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go index cb7a6c48cb5..16c2c902f49 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go @@ -44,7 +44,7 @@ func TestDynamicSplit(t *testing.T) { name string // driver is a function determining how the processing and splitting // threads are created and coordinated. - driver func(*Plan, DataContext, *splitTestSdf) (splitResult, error) + driver func(context.Context, *Plan, DataContext, *splitTestSdf) (splitResult, error) }{ { // Complete a split before beginning processing. @@ -81,7 +81,7 @@ func TestDynamicSplit(t *testing.T) { dc := DataContext{Data: &TestDataManager{R: pr}} // Call driver to coordinate processing & splitting threads. - splitRes, procRes := test.driver(plan, dc, sdf) + splitRes, procRes := test.driver(context.Background(), plan, dc, sdf) // Validate we get a valid split result, aside from split elements. if splitRes.err != nil { @@ -141,7 +141,7 @@ func TestDynamicSplit(t *testing.T) { // nonBlockingDriver performs a split before starting processing, so no thread // is forced to wait on a mutex. -func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { +func nonBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { // Begin processing pipeline. procResCh := make(chan error) go processPlan(plan, dc, procResCh) @@ -149,7 +149,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes // Complete a split before unblocking processing. splitResCh := make(chan splitResult) - go splitPlan(plan, splitResCh) + go splitPlan(ctx, plan, splitResCh) <-rt.split <-rt.blockSplit splitRes = <-splitResCh @@ -166,7 +166,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes // splitBlockingDriver blocks on a split request so that the SDF attempts to // claim while the split is occurring. -func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { +func splitBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { // Begin processing pipeline. procResCh := make(chan error) go processPlan(plan, dc, procResCh) @@ -174,7 +174,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe // Start a split, but block on it so it holds the mutex. splitResCh := make(chan splitResult) - go splitPlan(plan, splitResCh) + go splitPlan(ctx, plan, splitResCh) <-rt.split // Start processing and start a claim, that'll be waiting for the mutex. @@ -195,7 +195,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe // claimBlockingDriver blocks on a claim request so that the SDF attempts to // split while the claim is occurring. -func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { +func claimBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) { // Begin processing pipeline. procResCh := make(chan error) go processPlan(plan, dc, procResCh) @@ -207,7 +207,7 @@ func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe // Start a split that'll be waiting for the mutex. splitResCh := make(chan splitResult) - go splitPlan(plan, splitResCh) + go splitPlan(ctx, plan, splitResCh) <-rt.split // Unblock the claim, freeing the mutex (but not finishing processing yet). @@ -333,8 +333,8 @@ type splitResult struct { // splitPlan is meant to be the goroutine representing the thread handling a // split request for the SDF. -func splitPlan(plan *Plan, result chan splitResult) { - split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1}) +func splitPlan(ctx context.Context, plan *Plan, result chan splitResult) { + split, err := plan.Split(ctx, SplitPoints{Frac: 0.5, BufSize: 1}) result <- splitResult{split: split, err: err} } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index f1a6f998e5b..0189de51c7f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -251,6 +251,8 @@ type SplitPoints struct { // SplitResult contains the result of performing a split on a Plan. type SplitResult struct { + Unsuccessful bool // Indicates the split was unsuccessful. + // Indices are always included, for both channel and sub-element splits. PI int64 // Primary index, last element of the primary. RI int64 // Residual index, first element of the residual. @@ -268,13 +270,14 @@ type SplitResult struct { // Split takes a set of potential split indexes, and if successful returns // the split result. // Returns an error when unable to split. -func (p *Plan) Split(s SplitPoints) (SplitResult, error) { +func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) { + // Can't split inactive plans. + if p.status != Active { + return SplitResult{Unsuccessful: true}, nil + } // TODO: When bundles with multiple sources, are supported, perform splits // on all sources. - if p.source != nil { - return p.source.Split(s.Splits, s.Frac, s.BufSize) - } - return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s) + return p.source.Split(ctx, s.Splits, s.Frac, s.BufSize) } // Checkpoint attempts to split an SDF if the DoFn self-checkpointed. diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 95c1729166b..1e4bdc30806 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -573,7 +573,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if ds == nil { return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref) } - sr, err := plan.Split(exec.SplitPoints{ + sr, err := plan.Split(ctx, exec.SplitPoints{ Splits: ds.GetAllowedSplitPoints(), Frac: ds.GetFractionOfRemainder(), BufSize: ds.GetEstimatedInputElements(), @@ -583,6 +583,17 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe return fail(ctx, instID, "unable to split %v: %v", ref, err) } + // Unsuccessful splits without errors indicate we should return an empty response, + // as processing can confinue. + if sr.Unsuccessful { + return &fnpb.InstructionResponse{ + InstructionId: string(instID), + Response: &fnpb.InstructionResponse_ProcessBundleSplit{ + ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{}, + }, + } + } + var pRoots []*fnpb.BundleApplication var rRoots []*fnpb.DelayedBundleApplication if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {
