This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry-go-libs.git
commit ce8cf9a072c22382bab65ee33f349756d8bad47a Author: Jamie McAtamney <[email protected]> AuthorDate: Wed Apr 12 21:57:40 2023 -0700 Miscellaneous mirror-related enhancements This commit changes cluster behavior in the following ways in order to make it easier to work with mirrors: 1) The ContentIDs array is now deduplicated and sorted in the Cluster constructor, so that a cluster with mirrors won't have duplicate values in that array and segments can be added in any order without affecting the order of ContentIDs. 2) GetSegmentConfiguration can now retrieve only mirror information if desired, so that if some code wants to work with primaries and mirrors separately it can do so without having to retrieve both primary and mirror information at once and then manually separate it out into two Clusters. 3) TestExecutor now supports returning different values on subsequent calls for both ExecuteLocalCommand and ExecuteClusterCommand, in order to make it easier to test functions that might make one call to deal with primaries and another to deal with mirrors (or any other use case involving multiple calls per function, of course). --- cluster/cluster.go | 28 ++++++++++++++++++----- cluster/cluster_test.go | 33 +++++++++++++++++++++++++++ testhelper/structs.go | 60 +++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 107 insertions(+), 14 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 0fd9135..6f406d8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -9,6 +9,7 @@ import ( "bytes" "fmt" "os/exec" + "sort" "strings" "github.com/cloudberrydb/gp-common-go-libs/dbconn" @@ -218,9 +219,9 @@ func NewCluster(segConfigs []SegConfig) *Cluster { cluster.ByContent = make(map[int][]*SegConfig, 0) cluster.ByHost = make(map[string][]*SegConfig, 0) cluster.Executor = &GPDBExecutor{} + for i := range cluster.Segments { segment := &cluster.Segments[i] - cluster.ContentIDs = append(cluster.ContentIDs, segment.ContentID) cluster.ByContent[segment.ContentID] = append(cluster.ByContent[segment.ContentID], segment) segmentList := cluster.ByContent[segment.ContentID] if len(segmentList) == 2 && segmentList[0].Role == "m" { @@ -236,6 +237,10 @@ func NewCluster(segConfigs []SegConfig) *Cluster { cluster.Hostnames = append(cluster.Hostnames, segment.Hostname) } } + for content := range cluster.ByContent { + cluster.ContentIDs = append(cluster.ContentIDs, content) + } + sort.Ints(cluster.ContentIDs) return &cluster } @@ -493,13 +498,24 @@ func (cluster *Cluster) GetDirsForHost(hostname string) []string { * Helper functions */ +/* + * This function accepts up to two booleans: + * By default, it retrieves only primary and coordinator information. + * If the first boolean is set to true, it also retrieves mirror and standby information. + * If the second is set to true, it retrieves only mirror and standby information, regardless of the value of the first boolean. + */ func GetSegmentConfiguration(connection *dbconn.DBConn, getMirrors ...bool) ([]SegConfig, error) { includeMirrors := len(getMirrors) == 1 && getMirrors[0] + includeOnlyMirrors := len(getMirrors) == 2 && getMirrors[1] query := "" if connection.Version.Before("6") { - whereClause := "WHERE s.role = 'p' AND f.fsname = 'pg_system'" - if includeMirrors { - whereClause = "WHERE f.fsname = 'pg_system'" + whereClause := "WHERE%s f.fsname = 'pg_system'" + if includeOnlyMirrors { + whereClause = fmt.Sprintf(whereClause, " s.role = 'm' AND") + } else if includeMirrors { + whereClause = fmt.Sprintf(whereClause, "") + } else { + whereClause = fmt.Sprintf(whereClause, " s.role = 'p' AND") } query = fmt.Sprintf(` SELECT @@ -516,7 +532,9 @@ JOIN pg_filespace f ON e.fsefsoid = f.oid ORDER BY s.content, s.role DESC;`, whereClause) } else { whereClause := "WHERE role = 'p'" - if includeMirrors { + if includeOnlyMirrors { + whereClause = "WHERE role = 'm'" + } else if includeMirrors { whereClause = "" } query = fmt.Sprintf(` diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 7d737f9..6572d53 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -147,6 +147,39 @@ var _ = Describe("cluster/cluster tests", func() { Expect(results[2].DataDir).To(Equal("/data/gpseg2")) Expect(results[2].Hostname).To(Equal("remotehost")) }) + It("returns mirrors for a single-host, single-segment cluster", func() { + fakeResult := sqlmock.NewRows(header).AddRow(localSegOne...) + mock.ExpectQuery("SELECT (.*)").WillReturnRows(fakeResult) + results, err := cluster.GetSegmentConfiguration(connection, true, true) + Expect(err).ToNot(HaveOccurred()) + Expect(len(results)).To(Equal(1)) + Expect(results[0].DataDir).To(Equal("/data/gpseg0")) + Expect(results[0].Hostname).To(Equal("localhost")) + }) + It("returns mirrors for a single-host, multi-segment cluster", func() { + fakeResult := sqlmock.NewRows(header).AddRow(localSegOne...).AddRow(localSegTwo...) + mock.ExpectQuery("SELECT (.*)").WillReturnRows(fakeResult) + results, err := cluster.GetSegmentConfiguration(connection, true, true) + Expect(err).ToNot(HaveOccurred()) + Expect(len(results)).To(Equal(2)) + Expect(results[0].DataDir).To(Equal("/data/gpseg0")) + Expect(results[0].Hostname).To(Equal("localhost")) + Expect(results[1].DataDir).To(Equal("/data/gpseg1")) + Expect(results[1].Hostname).To(Equal("localhost")) + }) + It("returns mirrors for a multi-host, multi-segment cluster", func() { + fakeResult := sqlmock.NewRows(header).AddRow(localSegOne...).AddRow(localSegTwo...).AddRow(remoteSegOne...) + mock.ExpectQuery("SELECT (.*)").WillReturnRows(fakeResult) + results, err := cluster.GetSegmentConfiguration(connection, true, true) + Expect(err).ToNot(HaveOccurred()) + Expect(len(results)).To(Equal(3)) + Expect(results[0].DataDir).To(Equal("/data/gpseg0")) + Expect(results[0].Hostname).To(Equal("localhost")) + Expect(results[1].DataDir).To(Equal("/data/gpseg1")) + Expect(results[1].Hostname).To(Equal("localhost")) + Expect(results[2].DataDir).To(Equal("/data/gpseg2")) + Expect(results[2].Hostname).To(Equal("remotehost")) + }) }) Describe("GenerateSSHCommandList", func() { diff --git a/testhelper/structs.go b/testhelper/structs.go index e3e4245..6816556 100644 --- a/testhelper/structs.go +++ b/testhelper/structs.go @@ -6,6 +6,7 @@ package testhelper import ( "github.com/cloudberrydb/gp-common-go-libs/cluster" + "github.com/cloudberrydb/gp-common-go-libs/gplog" "github.com/jmoiron/sqlx" ) @@ -42,20 +43,53 @@ func (result TestResult) RowsAffected() (int64, error) { return result.Rows, nil } +/* + * Each output or error type has both a plural form and a singular form. If the plural form is set, it overrides the singular. + * The singular form returns the same output and error on each call; the plural form returns one output per call (first element on the first call, etc.) + * If more calls are made than there are outputs provided a Fatal error is raised, unless UseDefaultOutput or UseLastOutput is set. + * + * The LocalOutputs and LocalErrors arrays are "paired" in that the struct doesn't know how "normal" calls and "error" calls will be interleaved, so if + * N calls are expected then at least N outputs and N errors must be provided; even if UseLastOutput or UseDefaultOutput is set in order to define its + * behavior when more than N calls are made and it runs out of outputs and errors to return, the two array lengths must still be identical. + */ type TestExecutor struct { - LocalOutput string - LocalError error - LocalCommands []string + LocalOutput string + LocalOutputs []string + LocalError error + LocalErrors []error + LocalCommands []string + ClusterOutput *cluster.RemoteOutput + ClusterOutputs []*cluster.RemoteOutput ClusterCommands [][]cluster.ShellCommand - ErrorOnExecNum int // Throw the specified error after this many executions of Execute[...]Command(); 0 means always return error - NumExecutions int + + ErrorOnExecNum int // Return LocalError after this many calls of ExecuteLocalCommand (0 means always return error); has no effect for ExecuteClusterCommand + NumExecutions int // Total of NumLocalExecutions and NumClusterExecutions, for convenience and backwards compatibility + NumLocalExecutions int + NumClusterExecutions int + UseLastOutput bool // If we run out of LocalOutputs/LocalErrors or ClusterOutputs, default to the final items in those arrays + UseDefaultOutput bool // If we run out of LocalOutputs/LocalErrors or ClusterOutputs, default to LocalOutput/LocalError or ClusterOutput } func (executor *TestExecutor) ExecuteLocalCommand(commandStr string) (string, error) { executor.NumExecutions++ + executor.NumLocalExecutions++ executor.LocalCommands = append(executor.LocalCommands, commandStr) - if executor.ErrorOnExecNum == 0 || executor.NumExecutions == executor.ErrorOnExecNum { + if (executor.LocalOutputs == nil && executor.LocalErrors != nil) || (executor.LocalOutputs != nil && executor.LocalErrors == nil) { + gplog.Fatal(nil, "If one of LocalOutputs or LocalErrors is set, both must be set") + } else if executor.LocalOutputs != nil && executor.LocalErrors != nil && len(executor.LocalOutputs) != len(executor.LocalErrors) { + gplog.Fatal(nil, "Found %d LocalOutputs and %d LocalErrors, but one output and one error must be set for each call", len(executor.LocalOutputs), len(executor.LocalErrors)) + } + if executor.LocalOutputs != nil { + if executor.NumLocalExecutions <= len(executor.LocalOutputs) { + return executor.LocalOutputs[executor.NumLocalExecutions-1], executor.LocalErrors[executor.NumLocalExecutions-1] + } else if executor.UseLastOutput { + return executor.LocalOutputs[len(executor.LocalOutputs)-1], executor.LocalErrors[len(executor.LocalErrors)-1] + } else if executor.UseDefaultOutput { + return executor.LocalOutput, executor.LocalError + } + gplog.Fatal(nil, "ExecuteLocalCommand called %d times, but only %d outputs and errors provided", executor.NumLocalExecutions, len(executor.LocalOutputs)) + } else if executor.ErrorOnExecNum == 0 || executor.NumLocalExecutions == executor.ErrorOnExecNum { return executor.LocalOutput, executor.LocalError } return executor.LocalOutput, nil @@ -63,9 +97,17 @@ func (executor *TestExecutor) ExecuteLocalCommand(commandStr string) (string, er func (executor *TestExecutor) ExecuteClusterCommand(scope cluster.Scope, commandList []cluster.ShellCommand) *cluster.RemoteOutput { executor.NumExecutions++ + executor.NumClusterExecutions++ executor.ClusterCommands = append(executor.ClusterCommands, commandList) - if executor.ErrorOnExecNum == 0 || executor.NumExecutions == executor.ErrorOnExecNum { - return executor.ClusterOutput + if executor.ClusterOutputs != nil { + if executor.NumClusterExecutions <= len(executor.ClusterOutputs) { + return executor.ClusterOutputs[executor.NumClusterExecutions-1] + } else if executor.UseLastOutput { + return executor.ClusterOutputs[len(executor.ClusterOutputs)-1] + } else if executor.UseDefaultOutput { + return executor.ClusterOutput + } + gplog.Fatal(nil, "ExecuteClusterCommand called %d times, but only %d ClusterOutputs provided", executor.NumClusterExecutions, len(executor.ClusterOutputs)) } - return nil + return executor.ClusterOutput } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
