This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e80596f5d41 Use context param and error return value in mongodbio.Read 
SDF methods (#25536)
e80596f5d41 is described below

commit e80596f5d418d5c7608e05f32e0b309c912749bf
Author: Johanna Öjeling <[email protected]>
AuthorDate: Tue Feb 21 21:28:01 2023 +0100

    Use context param and error return value in mongodbio.Read SDF methods 
(#25536)
---
 sdks/go/pkg/beam/io/mongodbio/read.go | 29 +++++++++++++++++------------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go 
b/sdks/go/pkg/beam/io/mongodbio/read.go
index 59d8cf6aef9..101d1f4af89 100644
--- a/sdks/go/pkg/beam/io/mongodbio/read.go
+++ b/sdks/go/pkg/beam/io/mongodbio/read.go
@@ -159,10 +159,12 @@ func inferProjection(t reflect.Type, tagKey string) 
bson.D {
        return projection
 }
 
-func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
-       ctx := context.Background()
+func (fn *readFn) CreateInitialRestriction(
+       ctx context.Context,
+       _ []byte,
+) (idRangeRestriction, error) {
        if err := fn.Setup(ctx); err != nil {
-               panic(err)
+               return idRangeRestriction{}, err
        }
 
        outerRange, err := findOuterIDRange(ctx, fn.collection, fn.filter)
@@ -174,10 +176,10 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) 
idRangeRestriction {
                                fn.Database,
                                fn.Collection,
                        )
-                       return idRangeRestriction{}
+                       return idRangeRestriction{}, nil
                }
 
-               panic(err)
+               return idRangeRestriction{}, err
        }
 
        return newIDRangeRestriction(
@@ -185,7 +187,7 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) 
idRangeRestriction {
                fn.collection,
                outerRange,
                fn.filter,
-       )
+       ), nil
 }
 
 func findOuterIDRange(
@@ -213,22 +215,25 @@ func findOuterIDRange(
        return outerRange, nil
 }
 
-func (fn *readFn) SplitRestriction(_ []byte, rest idRangeRestriction) 
[]idRangeRestriction {
+func (fn *readFn) SplitRestriction(
+       ctx context.Context,
+       _ []byte,
+       rest idRangeRestriction,
+) ([]idRangeRestriction, error) {
        if rest.Count == 0 {
-               return []idRangeRestriction{rest}
+               return []idRangeRestriction{rest}, nil
        }
 
-       ctx := context.Background()
        if err := fn.Setup(ctx); err != nil {
-               panic(err)
+               return nil, err
        }
 
        splits, err := rest.SizedSplits(ctx, fn.collection, fn.BundleSize, 
fn.BucketAuto)
        if err != nil {
-               panic(err)
+               return nil, err
        }
 
-       return splits
+       return splits, nil
 }
 
 func (fn *readFn) CreateTracker(rest idRangeRestriction) *sdf.LockRTracker {

Reply via email to