This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new b0e4c596d refactor(go-client): refactor admin APIs for meta by
reflection (#1929)
b0e4c596d is described below
commit b0e4c596dbf096d454846d386567a740d5f87fbd
Author: Dan Wang <[email protected]>
AuthorDate: Fri Mar 8 10:12:22 2024 +0800
refactor(go-client): refactor admin APIs for meta by reflection (#1929)
---
.github/workflows/lint_and_test_go-client.yml | 2 +-
go-client/admin/client.go | 192 +++++++++++++++++++-------
go-client/admin/client_test.go | 57 ++++----
go-client/idl/base/error_code.go | 34 +++++
4 files changed, 211 insertions(+), 74 deletions(-)
diff --git a/.github/workflows/lint_and_test_go-client.yml
b/.github/workflows/lint_and_test_go-client.yml
index 79d9e12b0..4e02501f1 100644
--- a/.github/workflows/lint_and_test_go-client.yml
+++ b/.github/workflows/lint_and_test_go-client.yml
@@ -78,7 +78,7 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
- version: v1.55.2
+ version: v1.56.2
working-directory: ./go-client
build_server:
diff --git a/go-client/admin/client.go b/go-client/admin/client.go
index d5848560f..77851f477 100644
--- a/go-client/admin/client.go
+++ b/go-client/admin/client.go
@@ -22,6 +22,7 @@ package admin
import (
"context"
"fmt"
+ "reflect"
"time"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
@@ -33,51 +34,109 @@ import (
// Client provides the administration API to a specific cluster.
// Remember only the superusers configured to the cluster have the admin
priviledges.
type Client interface {
- CreateTable(ctx context.Context, tableName string, partitionCount int,
successIfExist_optional ...bool) error
-
- DropTable(ctx context.Context, tableName string) error
-
- ListTables(ctx context.Context) ([]*TableInfo, error)
-}
-
-// TableInfo is the table information.
-type TableInfo struct {
- Name string
-
- // Envs is a set of attributes binding to this table.
- Envs map[string]string
+ Close() error
+
+ // The timeout specify the max duration that is spent on an client
request. For
+ // example, if the client is based on RPC, it would be the timeout for
the RPC
+ // request.
+ GetTimeout() time.Duration
+ SetTimeout(timeout time.Duration)
+
+ // `maxWaitSeconds` specify the number of seconds that is spent on
waiting for
+ // the created table to be ready. This method would return error once
the table
+ // is still not ready after `maxWaitSeconds`. The administrator should
check if
+ // there is something wrong with the table.
+ CreateTable(tableName string, partitionCount int32, replicaCount int32,
envs map[string]string, maxWaitSeconds int32, successIfExistOptional ...bool)
(int32, error)
+
+ // `reserveSeconds` specify the retention interval for a table before
it is actually dropped.
+ DropTable(tableName string, reserveSeconds int64) error
+
+ // Empty `args` means "list all available tables"; Otherwise, the only
parameter would
+ // specify the status of the returned tables.
+ ListTables(args ...interface{}) ([]*replication.AppInfo, error)
}
type Config struct {
MetaServers []string `json:"meta_servers"`
+ Timeout time.Duration
}
// NewClient returns an instance of Client.
func NewClient(cfg Config) Client {
return &rpcBasedClient{
- metaManager: session.NewMetaManager(cfg.MetaServers,
session.NewNodeSession),
+ meta: session.NewMetaManager(cfg.MetaServers,
session.NewNodeSession),
+ rpcTimeout: cfg.Timeout,
}
}
type rpcBasedClient struct {
- metaManager *session.MetaManager
+ meta *session.MetaManager
+ rpcTimeout time.Duration
+}
+
+func (c *rpcBasedClient) Close() error {
+ return c.meta.Close()
+}
+
+func (c *rpcBasedClient) GetTimeout() time.Duration {
+ return c.rpcTimeout
}
-func (c *rpcBasedClient) waitTableReady(ctx context.Context, tableName string,
partitionCount int) error {
- const replicaCount int = 3
+func (c *rpcBasedClient) SetTimeout(timeout time.Duration) {
+ c.rpcTimeout = timeout
+}
+
+// Call RPC methods(go-client/session/admin_rpc_types.go) of
session.MetaManager by reflection.
+// `req` and `resp` are the request and response structs of RPC. `callback`
always accepts
+// non-nil `resp`.
+func (c *rpcBasedClient) callMeta(methodName string, req interface{}, callback
func(resp interface{})) error {
+ ctx, cancel := context.WithTimeout(context.Background(), c.rpcTimeout)
+ defer cancel()
+
+ // There are 2 kinds of structs for the result which could be processed:
+ // * error
+ // * (response, error)
+ result :=
reflect.ValueOf(c.meta).MethodByName(methodName).Call([]reflect.Value{
+ reflect.ValueOf(ctx),
+ reflect.ValueOf(req),
+ })
+
+ // The last element must be error.
+ ierr := result[len(result)-1].Interface()
+
+ var err error
+ if ierr != nil {
+ err = ierr.(error)
+ }
+
+ if len(result) == 1 {
+ return err
+ }
+
+ // The struct of result must be (response, error).
+ if !result[0].IsNil() {
+ callback(result[0].Interface())
+ }
+
+ return err
+}
- for {
- resp, err := c.metaManager.QueryConfig(ctx, tableName)
+func (c *rpcBasedClient) waitTableReady(tableName string, partitionCount
int32, replicaCount int32, maxWaitSeconds int32) error {
+ for ; maxWaitSeconds > 0; maxWaitSeconds-- {
+ var resp *replication.QueryCfgResponse
+ err := c.callMeta("QueryConfig", tableName, func(iresp
interface{}) {
+ resp = iresp.(*replication.QueryCfgResponse)
+ })
if err != nil {
return err
}
if resp.GetErr().Errno != base.ERR_OK.String() {
- return fmt.Errorf("QueryConfig failed: %s",
resp.GetErr().String())
+ return fmt.Errorf("QueryConfig failed: %s",
base.GetResponseError(resp))
}
- readyCount := 0
+ readyCount := int32(0)
for _, part := range resp.Partitions {
- if part.Primary.GetRawAddress() != 0 &&
len(part.Secondaries)+1 == replicaCount {
+ if part.Primary.GetRawAddress() != 0 &&
int32(len(part.Secondaries)+1) == replicaCount {
readyCount++
}
}
@@ -86,55 +145,92 @@ func (c *rpcBasedClient) waitTableReady(ctx
context.Context, tableName string, p
}
time.Sleep(time.Second)
}
+
+ if maxWaitSeconds <= 0 {
+ return fmt.Errorf("After %d seconds, table '%s' is still not
ready", maxWaitSeconds, tableName)
+ }
+
return nil
}
-func (c *rpcBasedClient) CreateTable(ctx context.Context, tableName string,
partitionCount int, successIfExist_optional ...bool) error {
+func (c *rpcBasedClient) CreateTable(tableName string, partitionCount int32,
replicaCount int32, envs map[string]string, maxWaitSeconds int32,
successIfExistOptional ...bool) (int32, error) {
successIfExist := true
- if len(successIfExist_optional) > 0 {
- successIfExist = successIfExist_optional[0]
+ if len(successIfExistOptional) > 0 {
+ successIfExist = successIfExistOptional[0]
}
- _, err := c.metaManager.CreateApp(ctx,
&admin.ConfigurationCreateAppRequest{
+
+ req := &admin.ConfigurationCreateAppRequest{
AppName: tableName,
Options: &admin.CreateAppOptions{
PartitionCount: int32(partitionCount),
- ReplicaCount: 3,
+ ReplicaCount: replicaCount,
SuccessIfExist: successIfExist,
AppType: "pegasus",
- Envs: make(map[string]string),
IsStateful: true,
+ Envs: envs,
+ }}
+
+ var appID int32
+ var respErr error
+ err := c.callMeta("CreateApp", req, func(iresp interface{}) {
+ resp := iresp.(*admin.ConfigurationCreateAppResponse)
+ appID = resp.Appid
+ respErr = base.GetResponseError(resp)
+ })
+ if err != nil {
+ return appID, err
+ }
+
+ err = c.waitTableReady(tableName, partitionCount, replicaCount,
maxWaitSeconds)
+ if err != nil {
+ return appID, err
+ }
+
+ return appID, respErr
+}
+
+func (c *rpcBasedClient) DropTable(tableName string, reserveSeconds int64)
error {
+ req := &admin.ConfigurationDropAppRequest{
+ AppName: tableName,
+ Options: &admin.DropAppOptions{
+ SuccessIfNotExist: true,
+ ReserveSeconds: &reserveSeconds, // Optional for
thrift
},
+ }
+
+ var respErr error
+ err := c.callMeta("DropApp", req, func(iresp interface{}) {
+ respErr =
base.GetResponseError(iresp.(*admin.ConfigurationDropAppResponse))
})
if err != nil {
return err
}
- err = c.waitTableReady(ctx, tableName, partitionCount)
- return err
+
+ return respErr
}
-func (c *rpcBasedClient) DropTable(ctx context.Context, tableName string)
error {
- req := admin.NewConfigurationDropAppRequest()
- req.AppName = tableName
- reserveSeconds := int64(1) // delete immediately. the caller is
responsible for the soft deletion of table.
- req.Options = &admin.DropAppOptions{
- SuccessIfNotExist: true,
- ReserveSeconds: &reserveSeconds,
+func (c *rpcBasedClient) listTables(status replication.AppStatus)
([]*replication.AppInfo, error) {
+ req := &admin.ConfigurationListAppsRequest{
+ Status: status,
}
- _, err := c.metaManager.DropApp(ctx, req)
- return err
-}
-func (c *rpcBasedClient) ListTables(ctx context.Context) ([]*TableInfo, error)
{
- resp, err := c.metaManager.ListApps(ctx,
&admin.ConfigurationListAppsRequest{
- Status: replication.AppStatus_AS_AVAILABLE,
+ var tables []*replication.AppInfo
+ var respErr error
+ err := c.callMeta("ListApps", req, func(iresp interface{}) {
+ resp := iresp.(*admin.ConfigurationListAppsResponse)
+ tables = resp.Infos
+ respErr = base.GetResponseError(resp)
})
if err != nil {
- return nil, err
+ return tables, err
}
- var results []*TableInfo
- for _, app := range resp.Infos {
- results = append(results, &TableInfo{Name: app.AppName, Envs:
app.Envs})
+ return tables, respErr
+}
+
+func (c *rpcBasedClient) ListTables(args ...interface{})
([]*replication.AppInfo, error) {
+ if len(args) == 0 {
+ return c.listTables(replication.AppStatus_AS_AVAILABLE)
}
- return results, nil
+ return c.listTables(args[0].(replication.AppStatus))
}
diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go
index 3d6ee3ddf..8fdd13a4f 100644
--- a/go-client/admin/client_test.go
+++ b/go-client/admin/client_test.go
@@ -25,51 +25,62 @@ import (
"testing"
"time"
+ "github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegasus"
"github.com/stretchr/testify/assert"
)
-func TestAdmin_Table(t *testing.T) {
- c := NewClient(Config{
+const (
+ replicaCount = 3
+ maxWaitSeconds = 600
+ reserveSeconds = 1
+)
+
+func defaultConfig() Config {
+ return Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
- })
+ Timeout: 30 * time.Second,
+ }
+}
+
+func TestAdmin_Table(t *testing.T) {
+ c := NewClient(defaultConfig())
- hasTable := func(tables []*TableInfo, tableName string) bool {
+ hasTable := func(tables []*replication.AppInfo, tableName string) bool {
for _, tb := range tables {
- if tb.Name == tableName {
+ if tb.AppName == tableName {
return true
}
}
return false
}
- err := c.DropTable(context.Background(), "admin_table_test")
+ err := c.DropTable("admin_table_test", reserveSeconds)
assert.Nil(t, err)
// no such table after deletion
- tables, err := c.ListTables(context.Background())
+ tables, err := c.ListTables()
assert.Nil(t, err)
assert.False(t, hasTable(tables, "admin_table_test"))
- err = c.CreateTable(context.Background(), "admin_table_test", 16)
+ _, err = c.CreateTable("admin_table_test", 16, replicaCount,
make(map[string]string), maxWaitSeconds)
assert.Nil(t, err)
- tables, err = c.ListTables(context.Background())
+ tables, err = c.ListTables()
assert.Nil(t, err)
assert.True(t, hasTable(tables, "admin_table_test"))
- err = c.DropTable(context.Background(), "admin_table_test")
+ err = c.DropTable("admin_table_test", reserveSeconds)
assert.Nil(t, err)
}
func TestAdmin_ListTablesTimeout(t *testing.T) {
c := NewClient(Config{
MetaServers: []string{"0.0.0.0:123456"},
+ Timeout: 500 * time.Millisecond,
})
- ctx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
- defer cancel()
- _, err := c.ListTables(ctx)
+ _, err := c.ListTables()
assert.Equal(t, err, context.DeadlineExceeded)
}
@@ -77,14 +88,9 @@ func TestAdmin_ListTablesTimeout(t *testing.T) {
func TestAdmin_CreateTableMustAvailable(t *testing.T) {
const tableName = "admin_table_test"
- c := NewClient(Config{
- MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
- })
-
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
+ c := NewClient(defaultConfig())
- err := c.CreateTable(context.Background(), tableName, 8)
+ _, err := c.CreateTable(tableName, 8, replicaCount,
make(map[string]string), maxWaitSeconds)
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}
@@ -115,24 +121,25 @@ func TestAdmin_CreateTableMustAvailable(t *testing.T) {
}
}
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
err = tb.Set(ctx, []byte("a"), []byte("a"), []byte("a"))
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}
// cleanup
- err = c.DropTable(context.Background(), tableName)
+ err = c.DropTable(tableName, reserveSeconds)
if !assert.NoError(t, err) {
assert.Fail(t, err.Error())
}
}
func TestAdmin_GetAppEnvs(t *testing.T) {
- c := NewClient(Config{
- MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
- })
+ c := NewClient(defaultConfig())
- tables, err := c.ListTables(context.Background())
+ tables, err := c.ListTables()
assert.Nil(t, err)
for _, tb := range tables {
assert.Empty(t, tb.Envs)
diff --git a/go-client/idl/base/error_code.go b/go-client/idl/base/error_code.go
index f70404df3..c105cb48b 100644
--- a/go-client/idl/base/error_code.go
+++ b/go-client/idl/base/error_code.go
@@ -21,6 +21,7 @@ package base
import (
"fmt"
+ "reflect"
"github.com/apache/thrift/lib/go/thrift"
)
@@ -136,6 +137,39 @@ func (ec *ErrorCode) String() string {
return fmt.Sprintf("ErrorCode(%+v)", *ec)
}
+type baseError struct {
+ message string
+}
+
+// Implement error interface.
+func (e *baseError) Error() string {
+ if e == nil || e.message == ERR_OK.String() {
+ return ERR_OK.String()
+ }
+ return e.message
+}
+
+// Convert ErrorCode to error.
+func (ec *ErrorCode) AsError() error {
+ if ec == nil || ec.Errno == ERR_OK.String() {
+ return nil
+ }
+ return &baseError{
+ message: ec.Errno,
+ }
+}
+
+// `resp` is the thrift-generated response struct of RPC.
+func GetResponseError(resp interface{}) error {
+ result :=
reflect.ValueOf(resp).MethodByName("GetErr").Call([]reflect.Value{})
+ iec := result[0].Interface()
+ if iec == nil {
+ return nil
+ }
+
+ return iec.(*ErrorCode).AsError()
+}
+
//go:generate enumer -type=RocksDBErrCode -output=rocskdb_err_string.go
type RocksDBErrCode int32
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]