This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam24505emptysplit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7d058b4ede71dbafa9c47e0efbd72ed2e751122b
Author: lostluck <[email protected]>
AuthorDate: Fri Dec 2 15:32:50 2022 -0800

    Return empty splits if unable to split, not errors
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  6 ++--
 .../pkg/beam/core/runtime/exec/datasource_test.go  | 32 +++++++++++-----------
 .../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 20 +++++++-------
 sdks/go/pkg/beam/core/runtime/exec/plan.go         | 13 +++++----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   | 13 ++++++++-
 5 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 9fa8df7500a..55b0b0a5cad 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -453,7 +454,7 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 // sent to this DataSource, and is used to be able to perform accurate splits
 // even if the DataSource has not yet received all its elements. A bufSize of
 // 0 or less indicates that its unknown, and so uses the current known size.
-func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) 
(SplitResult, error) {
+func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, 
bufSize int64) (SplitResult, error) {
        if n == nil {
                return SplitResult{}, fmt.Errorf("failed to split at requested 
splits: {%v}, DataSource not initialized", splits)
        }
@@ -498,7 +499,8 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (SplitRe
        }
        s, fr, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
        if err != nil {
-               return SplitResult{}, err
+               log.Infof(ctx, "Unsuccessful split: %v", err)
+               return SplitResult{Unsuccessful: true}, nil
        }
 
        // No fraction returned, perform channel split.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 46c00e68ee8..75a988fd31e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -303,7 +303,7 @@ func TestDataSource_Split(t *testing.T) {
                        runOnRoots(ctx, t, p, "StartBundle", func(root Root, 
ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
 
                        // SDK never splits on 0, so check that every test.
-                       splitRes, err := p.Split(SplitPoints{Splits: []int64{0, 
test.splitIdx}})
+                       splitRes, err := p.Split(ctx, SplitPoints{Splits: 
[]int64{0, test.splitIdx}})
                        if err != nil {
                                t.Fatalf("error in Split: %v", err)
                        }
@@ -373,7 +373,7 @@ func TestDataSource_Split(t *testing.T) {
                                        <-blockedCh
                                        // Validate that we do not split on the 
element we're blocking on index.
                                        // The first valid split is at 
test.splitIdx.
-                                       if splitRes, err := 
source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
+                                       if splitRes, err := 
source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, -1, 0); err != 
nil {
                                                t.Errorf("error in Split: %v", 
err)
                                        } else {
                                                if got, want := splitRes.RI, 
test.splitIdx; got != want {
@@ -439,7 +439,7 @@ func TestDataSource_Split(t *testing.T) {
 
                // SDK never splits on 0, so check that every test.
                sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, 
BufSize: test.bufSize}
-               splitRes, err := p.Split(sp)
+               splitRes, err := p.Split(ctx, sp)
                if err != nil {
                        t.Fatalf("error in Split: %v", err)
                }
@@ -505,7 +505,7 @@ func TestDataSource_Split(t *testing.T) {
                                        <-blockedCh
                                        // Validate that we either do or do not 
perform a sub-element split with the
                                        // given fraction.
-                                       if splitRes, err := 
source.Split([]int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); 
err != nil {
+                                       if splitRes, err := 
source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, test.fraction, 
int64(len(elements))); err != nil {
                                                t.Errorf("error in Split: %v", 
err)
                                        } else {
                                                // For sub-element splits, 
check sub-element split only results.
@@ -566,8 +566,8 @@ func TestDataSource_Split(t *testing.T) {
                dc := DataContext{Data: &TestDataManager{R: pr}}
                ctx := context.Background()
 
-               if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: 
-1}); err == nil {
-                       t.Fatal("plan uninitialized, expected error when 
splitting, got nil")
+               if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, 
Frac: -1}); err != nil || !sr.Unsuccessful {
+                       t.Fatalf("p.Split(before active) = %v,%v want 
unsuccessful split & nil err", sr, err)
                }
                for i, root := range p.units {
                        if err := root.Up(ctx); err != nil {
@@ -575,30 +575,30 @@ func TestDataSource_Split(t *testing.T) {
                        }
                }
                p.status = Active
-               if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: 
-1}); err == nil {
-                       t.Fatal("plan not started, expected error when 
splitting, got nil")
+               if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, 
Frac: -1}); err != nil || !sr.Unsuccessful {
+                       t.Fatalf("p.Split(active, not started) = %v,%v want 
unsuccessful split & nil err", sr, err)
                }
                runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx 
context.Context) error { return root.StartBundle(ctx, "1", dc) })
-               if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: 
-1}); err == nil {
-                       t.Fatal("plan started, expected error when splitting, 
got nil")
+               if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, 
Frac: -1}); err != nil || !sr.Unsuccessful {
+                       t.Fatalf("p.Split(active) = %v,%v want unsuccessful 
split & nil err", sr, err)
                }
                runOnRoots(ctx, t, p, "Process", Root.Process)
-               if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: 
-1}); err == nil {
-                       t.Fatal("plan in progress, expected error when unable 
to get a desired split, got nil")
+               if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, 
Frac: -1}); err != nil || !sr.Unsuccessful {
+                       t.Fatalf("p.Split(active, unable to get desired split) 
= %v,%v want unsuccessful split & nil err", sr, err)
                }
                runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle)
-               if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: 
-1}); err == nil {
-                       t.Fatal("plan finished, expected error when splitting, 
got nil")
+               if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, 
Frac: -1}); err != nil || !sr.Unsuccessful {
+                       t.Fatalf("p.Split(finished) = %v,%v want unsuccessful 
split & nil err", sr, err)
                }
                validateSource(t, out, source, makeValues(elements...))
        })
 
        t.Run("sanity_errors", func(t *testing.T) {
                var source *DataSource
-               if _, err := source.Split([]int64{0}, -1, 0); err == nil {
+               if _, err := source.Split(context.Background(), []int64{0}, -1, 
0); err == nil {
                        t.Fatal("expected error splitting nil *DataSource")
                }
-               if _, err := source.Split(nil, -1, 0); err == nil {
+               if _, err := source.Split(context.Background(), nil, -1, 0); 
err == nil {
                        t.Fatal("expected error splitting nil desired splits")
                }
        })
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index cb7a6c48cb5..16c2c902f49 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -44,7 +44,7 @@ func TestDynamicSplit(t *testing.T) {
                name string
                // driver is a function determining how the processing and 
splitting
                // threads are created and coordinated.
-               driver func(*Plan, DataContext, *splitTestSdf) (splitResult, 
error)
+               driver func(context.Context, *Plan, DataContext, *splitTestSdf) 
(splitResult, error)
        }{
                {
                        // Complete a split before beginning processing.
@@ -81,7 +81,7 @@ func TestDynamicSplit(t *testing.T) {
                        dc := DataContext{Data: &TestDataManager{R: pr}}
 
                        // Call driver to coordinate processing & splitting 
threads.
-                       splitRes, procRes := test.driver(plan, dc, sdf)
+                       splitRes, procRes := test.driver(context.Background(), 
plan, dc, sdf)
 
                        // Validate we get a valid split result, aside from 
split elements.
                        if splitRes.err != nil {
@@ -141,7 +141,7 @@ func TestDynamicSplit(t *testing.T) {
 
 // nonBlockingDriver performs a split before starting processing, so no thread
 // is forced to wait on a mutex.
-func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) 
(splitRes splitResult, procRes error) {
+func nonBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRes splitResult, procRes error) {
        // Begin processing pipeline.
        procResCh := make(chan error)
        go processPlan(plan, dc, procResCh)
@@ -149,7 +149,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRes
 
        // Complete a split before unblocking processing.
        splitResCh := make(chan splitResult)
-       go splitPlan(plan, splitResCh)
+       go splitPlan(ctx, plan, splitResCh)
        <-rt.split
        <-rt.blockSplit
        splitRes = <-splitResCh
@@ -166,7 +166,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRes
 
 // splitBlockingDriver blocks on a split request so that the SDF attempts to
 // claim while the split is occurring.
-func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) 
(splitRes splitResult, procRes error) {
+func splitBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRes splitResult, procRes error) {
        // Begin processing pipeline.
        procResCh := make(chan error)
        go processPlan(plan, dc, procResCh)
@@ -174,7 +174,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRe
 
        // Start a split, but block on it so it holds the mutex.
        splitResCh := make(chan splitResult)
-       go splitPlan(plan, splitResCh)
+       go splitPlan(ctx, plan, splitResCh)
        <-rt.split
 
        // Start processing and start a claim, that'll be waiting for the mutex.
@@ -195,7 +195,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRe
 
 // claimBlockingDriver blocks on a claim request so that the SDF attempts to
 // split while the claim is occurring.
-func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) 
(splitRes splitResult, procRes error) {
+func claimBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRes splitResult, procRes error) {
        // Begin processing pipeline.
        procResCh := make(chan error)
        go processPlan(plan, dc, procResCh)
@@ -207,7 +207,7 @@ func claimBlockingDriver(plan *Plan, dc DataContext, sdf 
*splitTestSdf) (splitRe
 
        // Start a split that'll be waiting for the mutex.
        splitResCh := make(chan splitResult)
-       go splitPlan(plan, splitResCh)
+       go splitPlan(ctx, plan, splitResCh)
        <-rt.split
 
        // Unblock the claim, freeing the mutex (but not finishing processing 
yet).
@@ -333,8 +333,8 @@ type splitResult struct {
 
 // splitPlan is meant to be the goroutine representing the thread handling a
 // split request for the SDF.
-func splitPlan(plan *Plan, result chan splitResult) {
-       split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1})
+func splitPlan(ctx context.Context, plan *Plan, result chan splitResult) {
+       split, err := plan.Split(ctx, SplitPoints{Frac: 0.5, BufSize: 1})
        result <- splitResult{split: split, err: err}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index f1a6f998e5b..0189de51c7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -251,6 +251,8 @@ type SplitPoints struct {
 
 // SplitResult contains the result of performing a split on a Plan.
 type SplitResult struct {
+       Unsuccessful bool // Indicates the split was unsuccessful.
+
        // Indices are always included, for both channel and sub-element splits.
        PI int64 // Primary index, last element of the primary.
        RI int64 // Residual index, first element of the residual.
@@ -268,13 +270,14 @@ type SplitResult struct {
 // Split takes a set of potential split indexes, and if successful returns
 // the split result.
 // Returns an error when unable to split.
-func (p *Plan) Split(s SplitPoints) (SplitResult, error) {
+func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) {
+       // Can't split inactive plans.
+       if p.status != Active {
+               return SplitResult{Unsuccessful: true}, nil
+       }
        // TODO: When bundles with multiple sources, are supported, perform 
splits
        // on all sources.
-       if p.source != nil {
-               return p.source.Split(s.Splits, s.Frac, s.BufSize)
-       }
-       return SplitResult{}, fmt.Errorf("failed to split at requested splits: 
{%v}, Source not initialized", s)
+       return p.source.Split(ctx, s.Splits, s.Frac, s.BufSize)
 }
 
 // Checkpoint attempts to split an SDF if the DoFn self-checkpointed.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 95c1729166b..1e4bdc30806 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -573,7 +573,7 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                if ds == nil {
                        return fail(ctx, instID, "failed to split: desired 
splits for root of %v was empty.", ref)
                }
-               sr, err := plan.Split(exec.SplitPoints{
+               sr, err := plan.Split(ctx, exec.SplitPoints{
                        Splits:  ds.GetAllowedSplitPoints(),
                        Frac:    ds.GetFractionOfRemainder(),
                        BufSize: ds.GetEstimatedInputElements(),
@@ -583,6 +583,17 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        return fail(ctx, instID, "unable to split %v: %v", ref, 
err)
                }
 
+               // Unsuccessful splits without errors indicate we should return 
an empty response,
+               // as processing can confinue.
+               if sr.Unsuccessful {
+                       return &fnpb.InstructionResponse{
+                               InstructionId: string(instID),
+                               Response: 
&fnpb.InstructionResponse_ProcessBundleSplit{
+                                       ProcessBundleSplit: 
&fnpb.ProcessBundleSplitResponse{},
+                               },
+                       }
+               }
+
                var pRoots []*fnpb.BundleApplication
                var rRoots []*fnpb.DelayedBundleApplication
                if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) 
> 0 {

Reply via email to