This is an automated email from the ASF dual-hosted git repository. linkinstar pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/answer.git
commit 56f36ec5e605bfc74fd17e86198d657a1a3f45ce Author: Sonui <m...@sonui.cn> AuthorDate: Mon Mar 10 00:28:57 2025 +0800 refactor(plugin): improve KV storage with better caching and param handling --- plugin/kv_storage.go | 198 +++++++++++++++++++++++++++++---------------------- 1 file changed, 112 insertions(+), 86 deletions(-) diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go index 0ec3d640..17617f6c 100644 --- a/plugin/kv_storage.go +++ b/plugin/kv_storage.go @@ -20,32 +20,64 @@ package plugin import ( "context" - "encoding/json" "fmt" - "math/rand" + "math/rand/v2" "time" "github.com/apache/answer/internal/entity" + "github.com/segmentfault/pacman/cache" "github.com/segmentfault/pacman/log" "xorm.io/builder" "xorm.io/xorm" ) -// define error +// Error variables for KV storage operations var ( - ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage") - ErrKVGroupEmpty = fmt.Errorf("group name is empty") - ErrKVKeyEmpty = fmt.Errorf("key name is empty") - ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty") - ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed") - ErrKVDataNotInitialized = fmt.Errorf("KV storage data not initialized") - ErrKVDBNotInitialized = fmt.Errorf("KV storage database connection not initialized") + // ErrKVKeyNotFound is returned when the requested key does not exist in the KV storage + ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage") + // ErrKVGroupEmpty is returned when a required group name is empty + ErrKVGroupEmpty = fmt.Errorf("group name is empty") + // ErrKVKeyEmpty is returned when a required key name is empty + ErrKVKeyEmpty = fmt.Errorf("key name is empty") + // ErrKVKeyAndGroupEmpty is returned when both key and group names are empty + ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty") + // ErrKVTransactionFailed is returned when a KV storage transaction operation fails + ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed") ) +// KVParams is the parameters for KV storage operations +type KVParams struct { + Group string + Key string + Value string + Page int + PageSize int +} + +// KVOperator provides methods to interact with the key-value storage system for plugins type KVOperator struct { data *Data session *xorm.Session pluginSlugName string + cacheTTL time.Duration +} + +// KVStorageOption defines a function type that configures a KVOperator +type KVStorageOption func(*KVOperator) + +// WithCacheTTL is the option to set the cache TTL; the default value is 30 minutes. +// If ttl is less than 0, the cache will not be used +func WithCacheTTL(ttl time.Duration) KVStorageOption { + return func(kv *KVOperator) { + kv.cacheTTL = ttl + } +} + +// Option is used to set the options for the KV storage +func (kv *KVOperator) Option(opts ...KVStorageOption) { + for _, opt := range opts { + opt(kv) + } } func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) { @@ -62,28 +94,53 @@ func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) { return session, cleanup } -func (kv *KVOperator) getCacheTTL() time.Duration { - return 30*time.Minute + time.Duration(rand.Intn(300))*time.Second +func (kv *KVOperator) getCacheKey(params KVParams) string { + return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, params.Group, params.Key) +} + +func (kv *KVOperator) setCache(ctx context.Context, params KVParams) { + if kv.cacheTTL < 0 { + return + } + + ttl := kv.cacheTTL + if ttl > 10 { + ttl += time.Duration(float64(ttl) * 0.1 * (1 - rand.Float64())) + } + + cacheKey := kv.getCacheKey(params) + if err := kv.data.Cache.SetString(ctx, cacheKey, params.Value, ttl); err != nil { + log.Warnf("cache set failed: %v, key: %s", err, cacheKey) + } +} + +func (kv *KVOperator) getCache(ctx context.Context, params KVParams) (string, bool, error) { + if kv.cacheTTL < 0 { + return "", false, nil + } + + cacheKey := kv.getCacheKey(params) + return kv.data.Cache.GetString(ctx, cacheKey) } -func (kv *KVOperator) getCacheKey(group, key string) string { - if group == "" { - return fmt.Sprintf("plugin_kv_storage:%s:key:%s", kv.pluginSlugName, key) +func (kv *KVOperator) cleanCache(ctx context.Context, params KVParams) { + if kv.cacheTTL < 0 { + return } - if key == "" { - return fmt.Sprintf("plugin_kv_storage:%s:group:%s", kv.pluginSlugName, group) + + if err := kv.data.Cache.Del(ctx, kv.getCacheKey(params)); err != nil { + log.Warnf("Failed to delete cache for key %s: %v", params.Key, err) } - return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, group, key) } -func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error) { - // validate - if key == "" { +// Get retrieves a value from KV storage by group and key. +// Returns the value as a string or an error if the key is not found. +func (kv *KVOperator) Get(ctx context.Context, params KVParams) (string, error) { + if params.Key == "" { return "", ErrKVKeyEmpty } - cacheKey := kv.getCacheKey(group, key) - if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist { + if value, exist, err := kv.getCache(ctx, params); err == nil && exist { return value, nil } @@ -94,8 +151,8 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error query.Where(builder.Eq{ "plugin_slug_name": kv.pluginSlugName, - "`group`": group, - "`key`": key, + "`group`": params.Group, + "`key`": params.Key, }) has, err := query.Get(&data) @@ -106,15 +163,15 @@ func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, error return "", ErrKVKeyNotFound } - if err := kv.data.Cache.SetString(ctx, cacheKey, data.Value, kv.getCacheTTL()); err != nil { - log.Error(err) - } + kv.setCache(ctx, params) return data.Value, nil } -func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { - if key == "" { +// Set stores a value in KV storage with the specified group and key. +// Updates the value if it already exists. +func (kv *KVOperator) Set(ctx context.Context, params KVParams) error { + if params.Key == "" { return ErrKVKeyEmpty } @@ -123,17 +180,17 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { data := &entity.PluginKVStorage{ PluginSlugName: kv.pluginSlugName, - Group: group, - Key: key, - Value: value, + Group: params.Group, + Key: params.Key, + Value: params.Value, } - kv.cleanCache(ctx, group, key) + kv.cleanCache(ctx, params) affected, err := query.Where(builder.Eq{ "plugin_slug_name": kv.pluginSlugName, - "`group`": group, - "`key`": key, + "`group`": params.Group, + "`key`": params.Key, }).Cols("value").Update(data) if err != nil { return err @@ -148,12 +205,16 @@ func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error { return nil } -func (kv *KVOperator) Del(ctx context.Context, group, key string) error { - if key == "" && group == "" { +// Del removes values from KV storage by group and/or key. +// If both group and key are provided, only that specific entry is deleted. +// If only group is provided, all entries in that group are deleted. +// At least one of group or key must be provided. +func (kv *KVOperator) Del(ctx context.Context, params KVParams) error { + if params.Key == "" && params.Group == "" { return ErrKVKeyAndGroupEmpty } - kv.cleanCache(ctx, group, key) + kv.cleanCache(ctx, params) session, cleanup := kv.getSession(ctx) defer cleanup() @@ -161,63 +222,35 @@ func (kv *KVOperator) Del(ctx context.Context, group, key string) error { session.Where(builder.Eq{ "plugin_slug_name": kv.pluginSlugName, }) - if group != "" { - session.Where(builder.Eq{"`group`": group}) + if params.Group != "" { + session.Where(builder.Eq{"`group`": params.Group}) } - if key != "" { - session.Where(builder.Eq{"`key`": key}) + if params.Key != "" { + session.Where(builder.Eq{"`key`": params.Key}) } _, err := session.Delete(&entity.PluginKVStorage{}) return err } -func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) { - if key != "" { - if err := kv.data.Cache.Del(ctx, kv.getCacheKey("", key)); err != nil { - log.Warnf("Failed to delete cache for key %s: %v", key, err) - } - - if group != "" { - if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, key)); err != nil { - log.Warnf("Failed to delete cache for group %s, key %s: %v", group, key, err) - } - } - } - - if group != "" { - if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, "")); err != nil { - log.Warnf("Failed to delete cache for group %s: %v", group, err) - } - } -} - -func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSize int) (map[string]string, error) { - if group == "" { +func (kv *KVOperator) GetByGroup(ctx context.Context, params KVParams) (map[string]string, error) { + if params.Group == "" { return nil, ErrKVGroupEmpty } - if page < 1 { - page = 1 + if params.Page < 1 { + params.Page = 1 } - if pageSize < 1 { - pageSize = 10 - } - - cacheKey := kv.getCacheKey(group, "") - if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == nil && exist { - result := make(map[string]string) - if err := json.Unmarshal([]byte(value), &result); err == nil { - return result, nil - } + if params.PageSize < 1 { + params.PageSize = 10 } query, cleanup := kv.getSession(ctx) defer cleanup() var items []entity.PluginKVStorage - err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": group}). - Limit(pageSize, (page-1)*pageSize). + err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": params.Group}). + Limit(params.PageSize, (params.Page-1)*params.PageSize). OrderBy("id ASC"). Find(&items) if err != nil { @@ -227,13 +260,6 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, pageSi result := make(map[string]string, len(items)) for _, item := range items { result[item.Key] = item.Value - if err := kv.data.Cache.SetString(ctx, kv.getCacheKey(group, item.Key), item.Value, kv.getCacheTTL()); err != nil { - log.Warnf("Failed to set cache for group %s, key %s: %v", group, item.Key, err) - } - } - - if resultJSON, err := json.Marshal(result); err == nil { - _ = kv.data.Cache.SetString(ctx, cacheKey, string(resultJSON), kv.getCacheTTL()) } return result, nil