This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b604076b8 [Go SDK] add retries to connect with expansion service 
(#25237)
52b604076b8 is described below

commit 52b604076b8e82e8453df5b3cfbda1ab78090e3c
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Wed Feb 15 09:50:14 2023 -0500

    [Go SDK] add retries to connect with expansion service (#25237)
    
    * add retries to connect with expansion service
    
    * prevent override of res
    
    * change retries
    
    * remove unnecessary module
    
    * Revert "remove unnecessary module"
    
    This reverts commit 15081850699f29a895e0f27899431c60409a617f.
    
    * remove unnecessary pkg
---
 sdks/go/pkg/beam/core/runtime/xlangx/expand.go | 33 ++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index 9076b93e1f8..1a3040575c1 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -21,6 +21,7 @@ import (
        "context"
        "fmt"
        "strings"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
@@ -32,8 +33,13 @@ import (
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang"
        "google.golang.org/grpc"
+       "gopkg.in/retry.v1"
 )
 
+// maxRetries is the maximum number of retries to attempt connecting to
+// an expansion service endpoint.
+const maxRetries = 5
+
 // Expand expands an unexpanded graph.ExternalTransform as a
 // graph.ExpandedTransform and assigns it to the ExternalTransform's Expanded
 // field. This requires querying an expansion service based on the 
configuration
@@ -163,16 +169,33 @@ func QueryExpansionService(ctx context.Context, p 
*HandlerParams) (*jobpb.Expans
        client := jobpb.NewExpansionServiceClient(conn)
 
        // Handling ExpansionResponse
-       res, err := client.Expand(ctx, req)
-       if err != nil {
-               err = errors.Wrapf(err, "expansion failed")
-               return nil, errors.WithContextf(err, "expanding transform with 
ExpansionRequest: %v", req)
+       strategy := retry.LimitCount(
+               maxRetries,
+               retry.Exponential{
+                       Initial: time.Second,
+                       Factor:  2,
+               },
+       )
+       var res *jobpb.ExpansionResponse
+       for attempt := retry.Start(strategy, nil); attempt.Next(); {
+               res, err = client.Expand(ctx, req)
+               if err == nil {
+                       break
+               }
+
+               if attempt.Count() == maxRetries {
+                       if err != nil {
+                               err = errors.Wrap(err, "expansion failed")
+                               return nil, errors.WithContextf(err, "expanding 
transform with ExpansionRequest: %v", req)
+                       }
+               }
        }
        if len(res.GetError()) != 0 { // ExpansionResponse includes an error.
                err := errors.New(res.GetError())
-               err = errors.Wrapf(err, "expansion failed")
+               err = errors.Wrap(err, "expansion response error")
                return nil, errors.WithContextf(err, "expanding transform with 
ExpansionRequest: %v", req)
        }
+
        return res, nil
 }
 

Reply via email to