This is an automated email from the ASF dual-hosted git repository.

linkinstar pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/answer.git

commit 56f36ec5e605bfc74fd17e86198d657a1a3f45ce
Author: Sonui <m...@sonui.cn>
AuthorDate: Mon Mar 10 00:28:57 2025 +0800

    refactor(plugin): improve KV storage with better caching and param handling
---
 plugin/kv_storage.go | 198 +++++++++++++++++++++++++++++----------------------
 1 file changed, 112 insertions(+), 86 deletions(-)

diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go
index 0ec3d640..17617f6c 100644
--- a/plugin/kv_storage.go
+++ b/plugin/kv_storage.go
@@ -20,32 +20,64 @@ package plugin
 
 import (
        "context"
-       "encoding/json"
        "fmt"
-       "math/rand"
+       "math/rand/v2"
        "time"
 
        "github.com/apache/answer/internal/entity"
+       "github.com/segmentfault/pacman/cache"
        "github.com/segmentfault/pacman/log"
        "xorm.io/builder"
        "xorm.io/xorm"
 )
 
-// define error
+// Error variables for KV storage operations
 var (
-       ErrKVKeyNotFound        = fmt.Errorf("key not found in KV storage")
-       ErrKVGroupEmpty         = fmt.Errorf("group name is empty")
-       ErrKVKeyEmpty           = fmt.Errorf("key name is empty")
-       ErrKVKeyAndGroupEmpty   = fmt.Errorf("both key and group are empty")
-       ErrKVTransactionFailed  = fmt.Errorf("KV storage transaction failed")
-       ErrKVDataNotInitialized = fmt.Errorf("KV storage data not initialized")
-       ErrKVDBNotInitialized   = fmt.Errorf("KV storage database connection 
not initialized")
+       // ErrKVKeyNotFound is returned when the requested key does not exist 
in the KV storage
+       ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage")
+       // ErrKVGroupEmpty is returned when a required group name is empty
+       ErrKVGroupEmpty = fmt.Errorf("group name is empty")
+       // ErrKVKeyEmpty is returned when a required key name is empty
+       ErrKVKeyEmpty = fmt.Errorf("key name is empty")
+       // ErrKVKeyAndGroupEmpty is returned when both key and group names are 
empty
+       ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty")
+       // ErrKVTransactionFailed is returned when a KV storage transaction 
operation fails
+       ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed")
 )
 
+// KVParams is the parameters for KV storage operations
+type KVParams struct {
+       Group    string
+       Key      string
+       Value    string
+       Page     int
+       PageSize int
+}
+
+// KVOperator provides methods to interact with the key-value storage system 
for plugins
 type KVOperator struct {
        data           *Data
        session        *xorm.Session
        pluginSlugName string
+       cacheTTL       time.Duration
+}
+
+// KVStorageOption defines a function type that configures a KVOperator
+type KVStorageOption func(*KVOperator)
+
+// WithCacheTTL is the option to set the cache TTL; the default value is 30 
minutes.
+// If ttl is less than 0, the cache will not be used
+func WithCacheTTL(ttl time.Duration) KVStorageOption {
+       return func(kv *KVOperator) {
+               kv.cacheTTL = ttl
+       }
+}
+
+// Option is used to set the options for the KV storage
+func (kv *KVOperator) Option(opts ...KVStorageOption) {
+       for _, opt := range opts {
+               opt(kv)
+       }
 }
 
 func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) {
@@ -62,28 +94,53 @@ func (kv *KVOperator) getSession(ctx context.Context) 
(*xorm.Session, func()) {
        return session, cleanup
 }
 
-func (kv *KVOperator) getCacheTTL() time.Duration {
-       return 30*time.Minute + time.Duration(rand.Intn(300))*time.Second
+func (kv *KVOperator) getCacheKey(params KVParams) string {
+       return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", 
kv.pluginSlugName, params.Group, params.Key)
+}
+
+func (kv *KVOperator) setCache(ctx context.Context, params KVParams) {
+       if kv.cacheTTL < 0 {
+               return
+       }
+
+       ttl := kv.cacheTTL
+       if ttl > 10 {
+               ttl += time.Duration(float64(ttl) * 0.1 * (1 - rand.Float64()))
+       }
+
+       cacheKey := kv.getCacheKey(params)
+       if err := kv.data.Cache.SetString(ctx, cacheKey, params.Value, ttl); 
err != nil {
+               log.Warnf("cache set failed: %v, key: %s", err, cacheKey)
+       }
+}
+
+func (kv *KVOperator) getCache(ctx context.Context, params KVParams) (string, 
bool, error) {
+       if kv.cacheTTL < 0 {
+               return "", false, nil
+       }
+
+       cacheKey := kv.getCacheKey(params)
+       return kv.data.Cache.GetString(ctx, cacheKey)
 }
 
-func (kv *KVOperator) getCacheKey(group, key string) string {
-       if group == "" {
-               return fmt.Sprintf("plugin_kv_storage:%s:key:%s", 
kv.pluginSlugName, key)
+func (kv *KVOperator) cleanCache(ctx context.Context, params KVParams) {
+       if kv.cacheTTL < 0 {
+               return
        }
-       if key == "" {
-               return fmt.Sprintf("plugin_kv_storage:%s:group:%s", 
kv.pluginSlugName, group)
+
+       if err := kv.data.Cache.Del(ctx, kv.getCacheKey(params)); err != nil {
+               log.Warnf("Failed to delete cache for key %s: %v", params.Key, 
err)
        }
-       return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", 
kv.pluginSlugName, group, key)
 }
 
-func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, 
error) {
-       // validate
-       if key == "" {
+// Get retrieves a value from KV storage by group and key.
+// Returns the value as a string or an error if the key is not found.
+func (kv *KVOperator) Get(ctx context.Context, params KVParams) (string, 
error) {
+       if params.Key == "" {
                return "", ErrKVKeyEmpty
        }
 
-       cacheKey := kv.getCacheKey(group, key)
-       if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == 
nil && exist {
+       if value, exist, err := kv.getCache(ctx, params); err == nil && exist {
                return value, nil
        }
 
@@ -94,8 +151,8 @@ func (kv *KVOperator) Get(ctx context.Context, group, key 
string) (string, error
 
        query.Where(builder.Eq{
                "plugin_slug_name": kv.pluginSlugName,
-               "`group`":          group,
-               "`key`":            key,
+               "`group`":          params.Group,
+               "`key`":            params.Key,
        })
 
        has, err := query.Get(&data)
@@ -106,15 +163,15 @@ func (kv *KVOperator) Get(ctx context.Context, group, key 
string) (string, error
                return "", ErrKVKeyNotFound
        }
 
-       if err := kv.data.Cache.SetString(ctx, cacheKey, data.Value, 
kv.getCacheTTL()); err != nil {
-               log.Error(err)
-       }
+       kv.setCache(ctx, params)
 
        return data.Value, nil
 }
 
-func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error 
{
-       if key == "" {
+// Set stores a value in KV storage with the specified group and key.
+// Updates the value if it already exists.
+func (kv *KVOperator) Set(ctx context.Context, params KVParams) error {
+       if params.Key == "" {
                return ErrKVKeyEmpty
        }
 
@@ -123,17 +180,17 @@ func (kv *KVOperator) Set(ctx context.Context, group, 
key, value string) error {
 
        data := &entity.PluginKVStorage{
                PluginSlugName: kv.pluginSlugName,
-               Group:          group,
-               Key:            key,
-               Value:          value,
+               Group:          params.Group,
+               Key:            params.Key,
+               Value:          params.Value,
        }
 
-       kv.cleanCache(ctx, group, key)
+       kv.cleanCache(ctx, params)
 
        affected, err := query.Where(builder.Eq{
                "plugin_slug_name": kv.pluginSlugName,
-               "`group`":          group,
-               "`key`":            key,
+               "`group`":          params.Group,
+               "`key`":            params.Key,
        }).Cols("value").Update(data)
        if err != nil {
                return err
@@ -148,12 +205,16 @@ func (kv *KVOperator) Set(ctx context.Context, group, 
key, value string) error {
        return nil
 }
 
-func (kv *KVOperator) Del(ctx context.Context, group, key string) error {
-       if key == "" && group == "" {
+// Del removes values from KV storage by group and/or key.
+// If both group and key are provided, only that specific entry is deleted.
+// If only group is provided, all entries in that group are deleted.
+// At least one of group or key must be provided.
+func (kv *KVOperator) Del(ctx context.Context, params KVParams) error {
+       if params.Key == "" && params.Group == "" {
                return ErrKVKeyAndGroupEmpty
        }
 
-       kv.cleanCache(ctx, group, key)
+       kv.cleanCache(ctx, params)
 
        session, cleanup := kv.getSession(ctx)
        defer cleanup()
@@ -161,63 +222,35 @@ func (kv *KVOperator) Del(ctx context.Context, group, key 
string) error {
        session.Where(builder.Eq{
                "plugin_slug_name": kv.pluginSlugName,
        })
-       if group != "" {
-               session.Where(builder.Eq{"`group`": group})
+       if params.Group != "" {
+               session.Where(builder.Eq{"`group`": params.Group})
        }
-       if key != "" {
-               session.Where(builder.Eq{"`key`": key})
+       if params.Key != "" {
+               session.Where(builder.Eq{"`key`": params.Key})
        }
 
        _, err := session.Delete(&entity.PluginKVStorage{})
        return err
 }
 
-func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) {
-       if key != "" {
-               if err := kv.data.Cache.Del(ctx, kv.getCacheKey("", key)); err 
!= nil {
-                       log.Warnf("Failed to delete cache for key %s: %v", key, 
err)
-               }
-
-               if group != "" {
-                       if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, 
key)); err != nil {
-                               log.Warnf("Failed to delete cache for group %s, 
key %s: %v", group, key, err)
-                       }
-               }
-       }
-
-       if group != "" {
-               if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, "")); 
err != nil {
-                       log.Warnf("Failed to delete cache for group %s: %v", 
group, err)
-               }
-       }
-}
-
-func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, 
pageSize int) (map[string]string, error) {
-       if group == "" {
+func (kv *KVOperator) GetByGroup(ctx context.Context, params KVParams) 
(map[string]string, error) {
+       if params.Group == "" {
                return nil, ErrKVGroupEmpty
        }
 
-       if page < 1 {
-               page = 1
+       if params.Page < 1 {
+               params.Page = 1
        }
-       if pageSize < 1 {
-               pageSize = 10
-       }
-
-       cacheKey := kv.getCacheKey(group, "")
-       if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == 
nil && exist {
-               result := make(map[string]string)
-               if err := json.Unmarshal([]byte(value), &result); err == nil {
-                       return result, nil
-               }
+       if params.PageSize < 1 {
+               params.PageSize = 10
        }
 
        query, cleanup := kv.getSession(ctx)
        defer cleanup()
 
        var items []entity.PluginKVStorage
-       err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, 
"`group`": group}).
-               Limit(pageSize, (page-1)*pageSize).
+       err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, 
"`group`": params.Group}).
+               Limit(params.PageSize, (params.Page-1)*params.PageSize).
                OrderBy("id ASC").
                Find(&items)
        if err != nil {
@@ -227,13 +260,6 @@ func (kv *KVOperator) GetByGroup(ctx context.Context, 
group string, page, pageSi
        result := make(map[string]string, len(items))
        for _, item := range items {
                result[item.Key] = item.Value
-               if err := kv.data.Cache.SetString(ctx, kv.getCacheKey(group, 
item.Key), item.Value, kv.getCacheTTL()); err != nil {
-                       log.Warnf("Failed to set cache for group %s, key %s: 
%v", group, item.Key, err)
-               }
-       }
-
-       if resultJSON, err := json.Marshal(result); err == nil {
-               _ = kv.data.Cache.SetString(ctx, cacheKey, string(resultJSON), 
kv.getCacheTTL())
        }
 
        return result, nil

Reply via email to