This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-go-libs.git

commit 7e1b684694b9bbb02a2a8853678b2e87ad441a3f
Author: Karen Huddleston <[email protected]>
AuthorDate: Wed Jul 31 18:31:15 2024 -0700

    Retry failed ssh commands in ExecuteClusterCommand
    
    We pass in maxAttempts and retry until the command passes. If there is an
    error, we collect it in the command.RetryError variable. We have ways to log
    messages for commands that were retried but eventually passed, along with
    continuing to log failed commands like we used to.
    
    We test the command retry with a script that increments a number and exits 1
    until the number gets high enough to simulate failing and then passing..
    
    Have GenerateAndExecuteCommand use retries by default. This seems to only be
    used by gpbackup and we know we want to use retries in that utility.
---
 cluster/cluster.go      |  91 +++++++++++---
 cluster/cluster_test.go | 314 ++++++++++++++++++++++++++++++++++++++++--------
 go.mod                  |   2 +-
 go.sum                  |   4 +
 testhelper/structs.go   |  18 +++
 5 files changed, 358 insertions(+), 71 deletions(-)

diff --git a/cluster/cluster.go b/cluster/cluster.go
index 0311d0c..9877526 100644
--- a/cluster/cluster.go
+++ b/cluster/cluster.go
@@ -9,6 +9,7 @@ import (
        "bufio"
        "bytes"
        "context"
+       joinerrs "errors"
        "fmt"
        "os"
        "os/exec"
@@ -16,6 +17,7 @@ import (
        "sort"
        "strconv"
        "strings"
+       "time"
 
        "github.com/cloudberrydb/gp-common-go-libs/dbconn"
        "github.com/cloudberrydb/gp-common-go-libs/gplog"
@@ -27,6 +29,7 @@ type Executor interface {
        ExecuteLocalCommand(commandStr string) (string, error)
        ExecuteLocalCommandWithContext(commandStr string, ctx context.Context) 
(string, error)
        ExecuteClusterCommand(scope Scope, commandList []ShellCommand) 
*RemoteOutput
+       ExecuteClusterCommandWithRetries(scope Scope, commandList 
[]ShellCommand, maxAttempts int, retrySleep time.Duration) *RemoteOutput
 }
 
 // This type only exists to allow us to mock Execute[...]Command functions for 
testing
@@ -178,6 +181,7 @@ type ShellCommand struct {
        Stdout        string
        Stderr        string
        Error         error
+       RetryError    error
        Completed     bool
 }
 
@@ -196,26 +200,29 @@ func NewShellCommand(scope Scope, content int, host 
string, command []string) Sh
  * of a cluster command and to display the results to the user.
  */
 type RemoteOutput struct {
-       Scope          Scope
-       NumErrors      int
-       Commands       []ShellCommand
-       FailedCommands []*ShellCommand
+       Scope           Scope
+       NumErrors       int
+       Commands        []ShellCommand
+       FailedCommands  []ShellCommand
+       RetriedCommands []ShellCommand
 }
 
 func NewRemoteOutput(scope Scope, numErrors int, commands []ShellCommand) 
*RemoteOutput {
-       failedCommands := make([]*ShellCommand, numErrors)
-       index := 0
-       for i := range commands {
-               if commands[i].Error != nil {
-                       failedCommands[index] = &commands[i]
-                       index++
+       failedCommands := make([]ShellCommand, 0)
+       retriedCommands := make([]ShellCommand, 0)
+       for _, command := range commands {
+               if command.Error != nil {
+                       failedCommands = append(failedCommands, command)
+               } else if command.RetryError != nil {
+                       retriedCommands = append(retriedCommands, command)
                }
        }
        return &RemoteOutput{
-               Scope:          scope,
-               NumErrors:      numErrors,
-               Commands:       commands,
-               FailedCommands: failedCommands,
+               Scope:           scope,
+               NumErrors:       numErrors,
+               Commands:        commands,
+               FailedCommands:  failedCommands,
+               RetriedCommands: retriedCommands,
        }
 }
 
@@ -337,23 +344,54 @@ func (executor *GPDBExecutor) 
ExecuteLocalCommandWithContext(commandStr string,
        return string(output), err
 }
 
+// Create a new exec.Command object so we can run it again
+func resetCmd(cmd *exec.Cmd) *exec.Cmd {
+       args := cmd.Args
+       return exec.Command(args[0], args[1:]...)
+}
+
+/*
+ * ExecuteClusterCommandWithRetries, but only 1 attempt to keep the previous 
functionality
+ */
+func (executor *GPDBExecutor) ExecuteClusterCommand(scope Scope, commandList 
[]ShellCommand) *RemoteOutput {
+       return executor.ExecuteClusterCommandWithRetries(scope, commandList, 1, 
0)
+}
+
 /*
  * This function just executes all of the commands passed to it in parallel; it
  * doesn't care about the scope of the command except to pass that on to the
  * RemoteOutput after execution.
+ *
+ * It will retry the command up to maxAttempts times
  * TODO: Add batching to prevent bottlenecks when executing in a huge cluster.
  */
-func (executor *GPDBExecutor) ExecuteClusterCommand(scope Scope, commandList 
[]ShellCommand) *RemoteOutput {
+func (executor *GPDBExecutor) ExecuteClusterCommandWithRetries(scope Scope, 
commandList []ShellCommand, maxAttempts int, retrySleep time.Duration) 
*RemoteOutput {
        length := len(commandList)
        finished := make(chan int)
        numErrors := 0
        for i := range commandList {
                go func(index int) {
+                       var (
+                               out    []byte
+                               err    error
+                               stderr bytes.Buffer
+                       )
                        command := commandList[index]
-                       var stderr bytes.Buffer
-                       cmd := command.Command
-                       cmd.Stderr = &stderr
-                       out, err := cmd.Output()
+                       for attempt := 1; attempt <= maxAttempts; attempt++ {
+                               stderr.Reset()
+                               cmd := resetCmd(command.Command)
+                               cmd.Stderr = &stderr
+                               out, err = cmd.Output()
+                               if err == nil {
+                                       break
+                               } else {
+                                       newRetryErr := fmt.Errorf("attempt %d: 
error was %w: %s", attempt, err, stderr.String())
+                                       command.RetryError = 
joinerrs.Join(command.RetryError, newRetryErr)
+                                       if attempt != maxAttempts {
+                                               time.Sleep(retrySleep)
+                                       }
+                               }
+                       }
                        command.Stdout = string(out)
                        command.Stderr = stderr.String()
                        command.Error = err
@@ -382,10 +420,23 @@ func (executor *GPDBExecutor) ExecuteClusterCommand(scope 
Scope, commandList []S
 func (cluster *Cluster) GenerateAndExecuteCommand(verboseMsg string, scope 
Scope, generator interface{}) *RemoteOutput {
        gplog.Verbose(verboseMsg)
        commandList := cluster.GenerateSSHCommandList(scope, generator)
-       return cluster.ExecuteClusterCommand(scope, commandList)
+       return cluster.ExecuteClusterCommandWithRetries(scope, commandList, 5, 
1*time.Second)
 }
 
 func (cluster *Cluster) CheckClusterError(remoteOutput *RemoteOutput, 
finalErrMsg string, messageFunc interface{}, noFatal ...bool) {
+       for _, retriedCommand := range remoteOutput.RetriedCommands {
+               switch messageFunc.(type) {
+               case func(content int) string:
+                       content := retriedCommand.Content
+                       host := cluster.GetHostForContent(content)
+                       gplog.Debug("Command failed before passing on segment 
%d on host %s with error:\n%v", content, host, retriedCommand.RetryError)
+               case func(host string) string:
+                       host := retriedCommand.Host
+                       gplog.Debug("Command failed before passing on host %s 
with error:\n%v", host, retriedCommand.RetryError)
+               }
+               gplog.Debug("Command was: %s", retriedCommand.CommandString)
+       }
+
        if remoteOutput.NumErrors == 0 {
                return
        }
diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go
index 60a397d..959b373 100644
--- a/cluster/cluster_test.go
+++ b/cluster/cluster_test.go
@@ -3,6 +3,7 @@ package cluster_test
 import (
        "context"
        "database/sql/driver"
+       joinerrs "errors"
        "fmt"
        "os"
        "os/user"
@@ -10,17 +11,16 @@ import (
        "testing"
        "time"
 
-       sqlmock "github.com/DATA-DOG/go-sqlmock"
-
+       "github.com/DATA-DOG/go-sqlmock"
        "github.com/cloudberrydb/gp-common-go-libs/cluster"
        "github.com/cloudberrydb/gp-common-go-libs/dbconn"
        "github.com/cloudberrydb/gp-common-go-libs/operating"
        "github.com/cloudberrydb/gp-common-go-libs/testhelper"
+       "github.com/onsi/gomega/gbytes"
        "github.com/pkg/errors"
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
-       "github.com/onsi/gomega/gbytes"
 )
 
 func TestCluster(t *testing.T) {
@@ -78,6 +78,7 @@ var _ = Describe("cluster/cluster tests", func() {
                testExecutor = &testhelper.TestExecutor{}
                testCluster = 
cluster.NewCluster([]cluster.SegConfig{coordinatorSeg, localSegOne, 
remoteSegOne})
                testCluster.Executor = testExecutor
+               logfile.Clear()
        })
        Describe("ConstructSSHCommand", func() {
                It("constructs a local ssh command", func() {
@@ -556,58 +557,203 @@ var _ = Describe("cluster/cluster tests", func() {
                        }
                })
        })
-       Describe("CheckClusterError", func() {
-               var (
-                       remoteOutput *cluster.RemoteOutput
-                       failedCmd    cluster.ShellCommand
-               )
+       Describe("ExecuteClusterCommandWithRetries", func() {
+               var testDir = "/tmp/gp_common_go_libs_test"
                BeforeEach(func() {
-                       failedCmd = cluster.ShellCommand{
-                               Scope:         0, // The appropriate scope will 
be set in each test
-                               Content:       1,
-                               Host:          "remotehost1",
-                               Command:       nil,
-                               CommandString: "this is the command",
-                               Stderr:        "exit status 1",
-                               Error:         errors.Errorf("command error"),
+                       os.MkdirAll(testDir, 0777)
+               })
+               AfterEach(func() {
+                       os.RemoveAll(testDir)
+               })
+               It("retries a command until it passes", func() {
+                       scriptFile, _ := os.OpenFile(path.Join(testDir, 
"incr.bash"), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0777)
+                       // This script increments a number in a file and 
returns an error until it reaches 3 and returns success
+                       scriptFmt := `#!/bin/bash
+n=$(cat %s/num.txt)
+m=$((n+1))
+if [ "$m" -lt "3" ]; then
+       echo $m > %s/num.txt
+       exit 1
+fi`
+                       scriptFile.WriteString(fmt.Sprintf(scriptFmt, testDir, 
testDir))
+                       scriptFile.Close()
+                       os.WriteFile(path.Join(testDir, "num.txt"), 
[]byte{'0'}, 0777)
+                       testCluster := cluster.Cluster{}
+                       commandList := []cluster.ShellCommand{
+                               
cluster.NewShellCommand(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, -1, 
"", []string{"touch", path.Join(testDir, "foo")}),
+                               
cluster.NewShellCommand(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, 0, "", 
[]string{"bash", "-c", path.Join(testDir, "incr.bash")}),
                        }
-                       remoteOutput = &cluster.RemoteOutput{
-                               Scope:          0,
-                               NumErrors:      1,
-                               Commands:       
[]cluster.ShellCommand{failedCmd},
-                               FailedCommands: 
[]*cluster.ShellCommand{&failedCmd},
+                       testCluster.Executor = &cluster.GPDBExecutor{}
+                       clusterOutput := 
testCluster.ExecuteClusterCommandWithRetries(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR,
 commandList, 5, 5*time.Millisecond)
+                       expectPathToExist(path.Join(testDir, "foo"))
+                       Expect(clusterOutput.NumErrors).To(Equal(0))
+                       Expect(clusterOutput.FailedCommands).To(HaveLen(0))
+                       Expect(clusterOutput.RetriedCommands).To(HaveLen(1))
+                       
Expect(clusterOutput.RetriedCommands[0].RetryError.Error()).To(Equal("attempt 
1: error was exit status 1: \nattempt 2: error was exit status 1: "))
+               })
+               It("retries a command until it reaches max retries", func() {
+                       testCluster := cluster.Cluster{}
+                       commandList := []cluster.ShellCommand{
+                               
cluster.NewShellCommand(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, -1, 
"", []string{"touch", path.Join(testDir, "foo")}),
+                               
cluster.NewShellCommand(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, 0, "", 
[]string{"some-non-existent-command"}),
                        }
+                       testCluster.Executor = &cluster.GPDBExecutor{}
+                       clusterOutput := 
testCluster.ExecuteClusterCommandWithRetries(cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR,
 commandList, 3, 5*time.Millisecond)
+                       expectedErrMsg := "exec: \"some-non-existent-command\": 
executable file not found in $PATH"
+                       expectPathToExist(path.Join(testDir, "foo"))
+                       Expect(clusterOutput.NumErrors).To(Equal(1))
+                       Expect(clusterOutput.FailedCommands).To(HaveLen(1))
+                       Expect(clusterOutput.RetriedCommands).To(HaveLen(0))
+                       
Expect(clusterOutput.FailedCommands[0].Error.Error()).To(Equal(expectedErrMsg))
+                       
Expect(clusterOutput.FailedCommands[0].RetryError.Error()).To(Equal(fmt.Sprintf("attempt
 1: error was %s: \nattempt 2: error was %s: \nattempt 3: error was %s: ", 
expectedErrMsg, expectedErrMsg, expectedErrMsg)))
                })
-               DescribeTable("CheckClusterError", func(scope cluster.Scope, 
includeCoordinator bool, perSegment bool, remote bool) {
-                       remoteOutput.Scope = scope
-                       remoteOutput.Commands[0].Scope = scope
-                       remoteOutput.FailedCommands[0].Scope = scope
-                       errStr := "1 segment"
-                       debugStr := "segment 1 on host remotehost1"
-                       var generatorFunc interface{}
-                       generatorFunc = func(contentID int) string { return 
"Error received" }
-                       if !perSegment {
-                               errStr = "1 host"
-                               debugStr = "host remotehost1"
+       })
+       Describe("CheckClusterError", func() {
+               Context("FailedCommands", func() {
+                       var (
+                               remoteOutput *cluster.RemoteOutput
+                               failedCmd    cluster.ShellCommand
+                       )
+                       BeforeEach(func() {
+                               failedCmd = cluster.ShellCommand{
+                                       Scope:         0, // The appropriate 
scope will be set in each test
+                                       Content:       1,
+                                       Host:          "remotehost1",
+                                       Command:       nil,
+                                       CommandString: "this is the command",
+                                       Stderr:        "exit status 1",
+                                       Error:         fmt.Errorf("command 
error"),
+                               }
+                               remoteOutput = &cluster.RemoteOutput{
+                                       Scope:          0,
+                                       NumErrors:      1,
+                                       Commands:       
[]cluster.ShellCommand{failedCmd},
+                                       FailedCommands: 
[]cluster.ShellCommand{failedCmd},
+                               }
+                       })
+                       DescribeTable("CheckClusterError", func(scope 
cluster.Scope, perSegment bool, remote bool) {
+                               remoteOutput.Scope = scope
+                               remoteOutput.Commands[0].Scope = scope
+                               remoteOutput.FailedCommands[0].Scope = scope
+                               errStr := "1 segment"
+                               debugStr := "segment 1 on host remotehost1"
+                               var generatorFunc interface{}
+                               generatorFunc = func(contentID int) string { 
return "Error received" }
+                               if !perSegment {
+                                       errStr = "1 host"
+                                       debugStr = "host remotehost1"
+                                       generatorFunc = func(host string) 
string { return "Error received" }
+                               }
+                               if !remote {
+                                       errStr = "coordinator for " + errStr
+                               }
+                               defer 
testhelper.ShouldPanicWithMessage(fmt.Sprintf("Got an error on %s. See 
gbytes.Buffer for a complete list of errors.", errStr))
+                               defer 
Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command was: this is the command`))
+                               defer 
Expect(logfile).To(gbytes.Say(fmt.Sprintf(`\[ERROR\]:-Error received on %s with 
error command error: exit status 1`, debugStr)))
+                               testCluster.CheckClusterError(remoteOutput, 
"Got an error", generatorFunc)
+                       },
+                               Entry("prints error messages for a per-segment 
command, including coordinator", 
cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, true, true),
+                               Entry("prints error messages for a per-segment 
command, excluding coordinator", cluster.ON_SEGMENTS, true, true),
+                               Entry("prints error messages for a per-host 
command, including the coordinator host", 
cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR, false, true),
+                               Entry("prints error messages for a per-host 
command, excluding the coordinator host", cluster.ON_HOSTS, false, true),
+                               Entry("prints error messages for commands 
executed on coordinator to segments, including coordinator", 
cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR|cluster.ON_LOCAL, true, false),
+                               Entry("prints error messages for commands 
executed on coordinator to segments, excluding coordinator", 
cluster.ON_SEGMENTS|cluster.ON_LOCAL, true, false),
+                               Entry("prints error messages for commands 
executed on coordinator to hosts, including coordinator", 
cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR|cluster.ON_LOCAL, false, false),
+                               Entry("prints error messages for commands 
executed on coordinator to hosts, excluding coordinator", 
cluster.ON_HOSTS|cluster.ON_LOCAL, false, false),
+                       )
+               })
+               Context("RetriedCommands", func() {
+                       var (
+                               remoteOutput  *cluster.RemoteOutput
+                               retriedCmd    cluster.ShellCommand
+                               failedCmd     cluster.ShellCommand
+                               retryErrStr   string
+                               generatorFunc interface{}
+                       )
+                       BeforeEach(func() {
+                               retryErr := joinerrs.Join(errors.New("attempt 
1: this is an error"), errors.New("attempt 2: this is an error"))
+                               retriedCmd = cluster.ShellCommand{
+                                       Scope:         0,
+                                       Content:       1,
+                                       Host:          "remotehost1",
+                                       Command:       nil,
+                                       CommandString: "this is the retry 
command",
+                                       Stderr:        "",
+                                       Error:         nil,
+                                       RetryError:    retryErr,
+                               }
+                               failedCmd = cluster.ShellCommand{
+                                       Scope:         0,
+                                       Content:       1,
+                                       Host:          "remotehost1",
+                                       Command:       nil,
+                                       CommandString: "this is the failed 
command",
+                                       Stderr:        "exit status 1",
+                                       Error:         fmt.Errorf("command 
error"),
+                               }
+                               remoteOutput = &cluster.RemoteOutput{
+                                       Scope:           0,
+                                       NumErrors:       0,
+                                       Commands:        
[]cluster.ShellCommand{retriedCmd},
+                                       RetriedCommands: 
[]cluster.ShellCommand{retriedCmd},
+                               }
+                               retryErrStr = "\nattempt 1: this is an 
error\nattempt 2: this is an error"
+                       })
+                       It("prints retry error messages for a per-segment 
command", func() {
+                               generatorFunc = func(contentID int) string { 
return "Error received" }
+                               testCluster.CheckClusterError(remoteOutput, 
"Got an error", generatorFunc)
+                               
Expect(logfile).To(gbytes.Say(fmt.Sprintf(`\[DEBUG\]:-Command failed before 
passing on segment 1 on host remotehost1 with error:%s`, retryErrStr)))
+                               
Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command was: this is the retry 
command`))
+                       })
+                       It("prints retry error messages for a per-host 
command", func() {
                                generatorFunc = func(host string) string { 
return "Error received" }
-                       }
-                       if !remote {
-                               errStr = "coordinator for " + errStr
-                       }
-                       defer 
testhelper.ShouldPanicWithMessage(fmt.Sprintf("Got an error on %s. See 
gbytes.Buffer for a complete list of errors.", errStr))
-                       defer Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command 
was: this is the command`))
-                       defer 
Expect(logfile).To(gbytes.Say(fmt.Sprintf(`\[ERROR\]:-Error received on %s with 
error command error: exit status 1`, debugStr)))
-                       testCluster.CheckClusterError(remoteOutput, "Got an 
error", generatorFunc)
-               },
-                       Entry("prints error messages for a per-segment command, 
including coordinator", cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR, true, 
true, true),
-                       Entry("prints error messages for a per-segment command, 
excluding coordinator", cluster.ON_SEGMENTS, false, true, true),
-                       Entry("prints error messages for a per-host command, 
including the coordinator host", cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR, 
true, false, true),
-                       Entry("prints error messages for a per-host command, 
excluding the coordinator host", cluster.ON_HOSTS, false, false, true),
-                       Entry("prints error messages for commands executed on 
coordinator to segments, including coordinator", 
cluster.ON_SEGMENTS|cluster.INCLUDE_COORDINATOR|cluster.ON_LOCAL, true, true, 
false),
-                       Entry("prints error messages for commands executed on 
coordinator to segments, excluding coordinator", 
cluster.ON_SEGMENTS|cluster.ON_LOCAL, false, true, false),
-                       Entry("prints error messages for commands executed on 
coordinator to hosts, including coordinator", 
cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR|cluster.ON_LOCAL, true, false, 
false),
-                       Entry("prints error messages for commands executed on 
coordinator to hosts, excluding coordinator", 
cluster.ON_HOSTS|cluster.ON_LOCAL, false, false, false),
-               )
+                               testCluster.CheckClusterError(remoteOutput, 
"Got an error", generatorFunc)
+                               
Expect(logfile).To(gbytes.Say(fmt.Sprintf(`\[DEBUG\]:-Command failed before 
passing on host remotehost1 with error:%s`, retryErrStr)))
+                               
Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command was: this is the retry 
command`))
+                       })
+                       It("prints retry error messages before failed error 
messages", func() {
+                               remoteOutput = &cluster.RemoteOutput{
+                                       Scope:           0,
+                                       NumErrors:       1,
+                                       Commands:        
[]cluster.ShellCommand{retriedCmd, failedCmd},
+                                       FailedCommands:  
[]cluster.ShellCommand{failedCmd},
+                                       RetriedCommands: 
[]cluster.ShellCommand{retriedCmd},
+                               }
+                               generatorFunc = func(contentID int) string { 
return "Error received" }
+                               defer testhelper.ShouldPanicWithMessage("Got an 
error on 1 segment. See gbytes.Buffer for a complete list of errors.")
+                               defer 
Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command was: this is the failed 
command`))
+                               defer 
Expect(logfile).To(gbytes.Say(`\[ERROR\]:-Error received on segment 1 on host 
remotehost1 with error command error: exit status 1`))
+                               testCluster.CheckClusterError(remoteOutput, 
"Got an error", generatorFunc)
+                               
Expect(logfile).To(gbytes.Say(fmt.Sprintf(`\[DEBUG\]:-Command failed before 
passing on segment 1 on host remotehost1 with error:%s`, retryErrStr)))
+                               
Expect(logfile).To(gbytes.Say(`\[DEBUG\]:-Command was: this is the retry 
command`))
+                       })
+               })
+               Context("No errors", func() {
+                       var (
+                               successfulCmd = cluster.ShellCommand{
+                                       Scope:         0,
+                                       Content:       1,
+                                       Host:          "remotehost1",
+                                       Command:       nil,
+                                       CommandString: "this is the successful 
command",
+                                       Stderr:        "",
+                                       Error:         nil,
+                                       RetryError:    nil,
+                               }
+                               remoteOutput = &cluster.RemoteOutput{
+                                       Scope:           0,
+                                       NumErrors:       0,
+                                       Commands:        
[]cluster.ShellCommand{successfulCmd},
+                                       FailedCommands:  
[]cluster.ShellCommand{},
+                                       RetriedCommands: 
[]cluster.ShellCommand{},
+                               }
+                               generatorFunc = func(contentID int) string { 
return "Error received" }
+                       )
+                       It("prints nothing if there are no retried or failed 
commands", func() {
+                               testCluster.CheckClusterError(remoteOutput, 
"Got an error", generatorFunc)
+                               Expect(logfile).ToNot(gbytes.Say("error"))
+                       })
+               })
        })
        Describe("LogFatalClusterError", func() {
                It("logs an error for 1 segment (with coordinator)", func() {
@@ -627,6 +773,74 @@ var _ = Describe("cluster/cluster tests", func() {
                        cluster.LogFatalClusterError("Error occurred", 
cluster.ON_HOSTS|cluster.INCLUDE_COORDINATOR, 2)
                })
        })
+       Describe("NewRemoteOutput", func() {
+               var (
+                       retryErr   = joinerrs.Join(errors.New("attempt 1: this 
is an error"), errors.New("attempt 2: this is an error"))
+                       retriedCmd = cluster.ShellCommand{
+                               Scope:         0,
+                               Content:       1,
+                               Host:          "remotehost1",
+                               Command:       nil,
+                               CommandString: "this is the retry command",
+                               Stderr:        "",
+                               Error:         nil,
+                               RetryError:    retryErr,
+                       }
+                       failedCmd = cluster.ShellCommand{
+                               Scope:         0,
+                               Content:       1,
+                               Host:          "remotehost1",
+                               Command:       nil,
+                               CommandString: "this is the failed command",
+                               Stderr:        "exit status 1",
+                               Error:         fmt.Errorf("command error"),
+                               RetryError:    retryErr,
+                       }
+                       successfulCmd = cluster.ShellCommand{
+                               Scope:         0,
+                               Content:       1,
+                               Host:          "remotehost1",
+                               Command:       nil,
+                               CommandString: "this is the successful command",
+                               Stderr:        "",
+                               Error:         nil,
+                               RetryError:    nil,
+                       }
+                       commands []cluster.ShellCommand
+               )
+               It("can create a remote output with no failed or retried 
commands", func() {
+                       commands = []cluster.ShellCommand{successfulCmd}
+                       output := cluster.NewRemoteOutput(0, 0, commands)
+                       Expect(output.NumErrors).To(Equal(0))
+                       Expect(output.Commands).To(HaveLen(1))
+                       Expect(output.FailedCommands).To(HaveLen(0))
+                       Expect(output.RetriedCommands).To(HaveLen(0))
+               })
+               It("can create a remote output with failed commands", func() {
+                       commands = []cluster.ShellCommand{successfulCmd, 
failedCmd}
+                       output := cluster.NewRemoteOutput(0, 1, commands)
+                       Expect(output.NumErrors).To(Equal(1))
+                       Expect(output.Commands).To(HaveLen(2))
+                       Expect(output.FailedCommands[0]).To(Equal(failedCmd))
+                       Expect(output.RetriedCommands).To(HaveLen(0))
+               })
+               It("can create a remote output with retried commands", func() {
+                       commands = []cluster.ShellCommand{successfulCmd, 
retriedCmd}
+                       output := cluster.NewRemoteOutput(0, 0, commands)
+                       Expect(output.NumErrors).To(Equal(0))
+                       Expect(output.Commands).To(HaveLen(2))
+                       Expect(output.FailedCommands).To(HaveLen(0))
+                       Expect(output.RetriedCommands[0]).To(Equal(retriedCmd))
+               })
+               It("can create a remote output with failed and retry commands", 
func() {
+                       commands = []cluster.ShellCommand{successfulCmd, 
retriedCmd, failedCmd}
+                       output := cluster.NewRemoteOutput(0, 1, commands)
+                       Expect(output.NumErrors).To(Equal(1))
+                       Expect(output.Commands).To(HaveLen(3))
+                       Expect(output.FailedCommands[0]).To(Equal(failedCmd))
+                       Expect(output.RetriedCommands[0]).To(Equal(retriedCmd))
+               })
+       })
        Describe("NewCluster", func() {
                It("sets up the configuration for a single-host, single-segment 
cluster", func() {
                        newCluster := 
cluster.NewCluster([]cluster.SegConfig{coordinatorSeg, localSegOne})
diff --git a/go.mod b/go.mod
index 02f72c3..097a63a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/cloudberrydb/gp-common-go-libs
 
-go 1.19
+go 1.21
 
 require (
        github.com/DATA-DOG/go-sqlmock v1.5.0
diff --git a/go.sum b/go.sum
index 8ce0d7f..28663a9 100644
--- a/go.sum
+++ b/go.sum
@@ -29,6 +29,7 @@ github.com/go-task/slim-sprig 
v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4
 github.com/gofrs/uuid v4.0.0+incompatible 
h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
 github.com/gofrs/uuid v4.0.0+incompatible/go.mod 
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
 github.com/golang/protobuf v1.5.3 
h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/go-cmp v0.5.9/go.mod 
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 
h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
@@ -135,6 +136,7 @@ github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
 github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.1 
h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
+github.com/stretchr/testify v1.8.1/go.mod 
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 github.com/zenazn/goji v0.9.0/go.mod 
h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
 go.uber.org/atomic v1.3.2/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@@ -162,6 +164,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod 
h1:6SW0HCj/g11FgYtHl
 golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod 
h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
 golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod 
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
 golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
+golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -210,6 +213,7 @@ golang.org/x/xerrors 
v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/protobuf v1.28.0 
h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod 
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/testhelper/structs.go b/testhelper/structs.go
index 41c9462..6a00eec 100644
--- a/testhelper/structs.go
+++ b/testhelper/structs.go
@@ -9,6 +9,7 @@ import (
        "github.com/cloudberrydb/gp-common-go-libs/cluster"
        "github.com/cloudberrydb/gp-common-go-libs/gplog"
        "github.com/jmoiron/sqlx"
+       "time"
 )
 
 type TestDriver struct {
@@ -138,3 +139,20 @@ func (executor *TestExecutor) ExecuteClusterCommand(scope 
cluster.Scope, command
        }
        return executor.ClusterOutput
 }
+
+func (executor *TestExecutor) ExecuteClusterCommandWithRetries(scope 
cluster.Scope, commandList []cluster.ShellCommand, maxAttempts int, retrySleep 
time.Duration) *cluster.RemoteOutput {
+       executor.NumExecutions++
+       executor.NumClusterExecutions++
+       executor.ClusterCommands = append(executor.ClusterCommands, commandList)
+       if executor.ClusterOutputs != nil {
+               if executor.NumClusterExecutions <= 
len(executor.ClusterOutputs) {
+                       return 
executor.ClusterOutputs[executor.NumClusterExecutions-1]
+               } else if executor.UseLastOutput {
+                       return 
executor.ClusterOutputs[len(executor.ClusterOutputs)-1]
+               } else if executor.UseDefaultOutput {
+                       return executor.ClusterOutput
+               }
+               gplog.Fatal(nil, "ExecuteClusterCommand called %d times, but 
only %d ClusterOutputs provided", executor.NumClusterExecutions, 
len(executor.ClusterOutputs))
+       }
+       return executor.ClusterOutput
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to