This is an automated email from the ASF dual-hosted git repository.

linkinstar pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/answer.git

commit 20d7e4bc10f9625856bae43b4312c4be46c31a71
Author: Sonui <m...@sonui.cn>
AuthorDate: Tue Feb 25 18:18:26 2025 +0800

    feat(plugin): add key-value storage support for plugins
---
 internal/entity/plugin_kv_storage_entity.go        |  32 ++
 internal/migrations/init_data.go                   |   1 +
 internal/migrations/migrations.go                  |   1 +
 .../service/plugin_common/plugin_common_service.go |   5 +
 plugin/kv_storage.go                               | 344 +++++++++++++++++++++
 plugin/plugin.go                                   |  12 +
 6 files changed, 395 insertions(+)

diff --git a/internal/entity/plugin_kv_storage_entity.go 
b/internal/entity/plugin_kv_storage_entity.go
new file mode 100644
index 00000000..c7e6efbe
--- /dev/null
+++ b/internal/entity/plugin_kv_storage_entity.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package entity
+
+type PluginKVStorage struct {
+       ID             int    `xorm:"not null pk autoincr INT(11) id"`
+       PluginSlugName string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 
plugin_slug_name"`
+       Group          string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 
'group'"`
+       Key            string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 
'key'"`
+       Value          string `xorm:"not null TEXT value"`
+}
+
+func (PluginKVStorage) TableName() string {
+       return "plugin_kv_storage"
+}
diff --git a/internal/migrations/init_data.go b/internal/migrations/init_data.go
index 7322f738..8b853c4d 100644
--- a/internal/migrations/init_data.go
+++ b/internal/migrations/init_data.go
@@ -74,6 +74,7 @@ var (
                &entity.Badge{},
                &entity.BadgeGroup{},
                &entity.BadgeAward{},
+               &entity.PluginKVStorage{},
        }
 
        roles = []*entity.Role{
diff --git a/internal/migrations/migrations.go 
b/internal/migrations/migrations.go
index 57f4778e..ca715649 100644
--- a/internal/migrations/migrations.go
+++ b/internal/migrations/migrations.go
@@ -101,6 +101,7 @@ var migrations = []Migration{
        NewMigration("v1.4.1", "add question link", addQuestionLink, true),
        NewMigration("v1.4.2", "add the number of question links", 
addQuestionLinkedCount, true),
        NewMigration("v1.4.5", "add file record", addFileRecord, true),
+       NewMigration("v1.4.6", "add plugin kv storage", addPluginKVStorage, 
true),
 }
 
 func GetMigrations() []Migration {
diff --git a/internal/service/plugin_common/plugin_common_service.go 
b/internal/service/plugin_common/plugin_common_service.go
index c1b0ad44..de6d981d 100644
--- a/internal/service/plugin_common/plugin_common_service.go
+++ b/internal/service/plugin_common/plugin_common_service.go
@@ -135,6 +135,11 @@ func (ps *PluginCommonService) GetUserPluginConfig(ctx 
context.Context, req *sch
 }
 
 func (ps *PluginCommonService) initPluginData() {
+       plugin.SetKVStorageDB(&plugin.Data{
+               DB:    ps.data.DB,
+               Cache: ps.data.Cache,
+       })
+
        // init plugin status
        pluginStatus, err := ps.configService.GetStringValue(context.TODO(), 
constant.PluginStatus)
        if err != nil {
diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go
new file mode 100644
index 00000000..9714487f
--- /dev/null
+++ b/plugin/kv_storage.go
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package plugin
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "math/rand"
+       "time"
+
+       "github.com/apache/answer/internal/entity"
+       "github.com/segmentfault/pacman/log"
+       "xorm.io/builder"
+       "xorm.io/xorm"
+)
+
+// define error
+var (
+       ErrKVKeyNotFound        = fmt.Errorf("key not found in KV storage")
+       ErrKVGroupEmpty         = fmt.Errorf("group name is empty")
+       ErrKVKeyEmpty           = fmt.Errorf("key name is empty")
+       ErrKVKeyAndGroupEmpty   = fmt.Errorf("both key and group are empty")
+       ErrKVTransactionFailed  = fmt.Errorf("KV storage transaction failed")
+       ErrKVDataNotInitialized = fmt.Errorf("KV storage data not initialized")
+       ErrKVDBNotInitialized   = fmt.Errorf("KV storage database connection 
not initialized")
+)
+
+type KVOperator struct {
+       data           *Data
+       session        *xorm.Session
+       pluginSlugName string
+}
+
+func (kv *KVOperator) checkDB() error {
+       if kv.data == nil {
+               return ErrKVDataNotInitialized
+       }
+       if kv.data.DB == nil {
+               return ErrKVDBNotInitialized
+       }
+       return nil
+}
+
+func (kv *KVOperator) getSession(ctx context.Context) (session *xorm.Session, 
close func()) {
+       if kv.session != nil {
+               session = kv.session
+       } else {
+               session = kv.data.DB.NewSession().Context(ctx)
+               close = func() {
+                       if session != nil {
+                               session.Close()
+                       }
+               }
+       }
+       return
+}
+
+func (kv *KVOperator) getCacheTTL() time.Duration {
+       return 30*time.Minute + time.Duration(rand.Intn(300))*time.Second
+}
+
+func (kv *KVOperator) getCacheKey(group, key string) string {
+       if group == "" {
+               return fmt.Sprintf("plugin_kv_storage:%s:key:%s", 
kv.pluginSlugName, key)
+       }
+       if key == "" {
+               return fmt.Sprintf("plugin_kv_storage:%s:group:%s", 
kv.pluginSlugName, group)
+       }
+       return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", 
kv.pluginSlugName, group, key)
+}
+
+func (kv *KVOperator) Get(ctx context.Context, group, key string) (string, 
error) {
+       // validate
+       if err := kv.checkDB(); err != nil {
+               return "", err
+       }
+       if key == "" {
+               return "", ErrKVKeyEmpty
+       }
+
+       cacheKey := kv.getCacheKey(group, key)
+       if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == 
nil && exist {
+               return value, nil
+       }
+
+       // query
+       data := entity.PluginKVStorage{}
+       query, close := kv.getSession(ctx)
+       defer close()
+
+       query.Where(builder.Eq{
+               "plugin_slug_name": kv.pluginSlugName,
+               "`group`":          group,
+               "`key`":            key,
+       })
+
+       has, err := query.Get(&data)
+       if err != nil {
+               return "", err
+       }
+       if !has {
+               return "", ErrKVKeyNotFound
+       }
+
+       if err := kv.data.Cache.SetString(ctx, cacheKey, data.Value, 
kv.getCacheTTL()); err != nil {
+               log.Error(err)
+       }
+
+       return data.Value, nil
+}
+
+func (kv *KVOperator) Set(ctx context.Context, group, key, value string) error 
{
+       if err := kv.checkDB(); err != nil {
+               return err
+       }
+
+       if key == "" {
+               return ErrKVKeyEmpty
+       }
+
+       query, close := kv.getSession(ctx)
+       if close != nil {
+               defer close()
+       }
+
+       data := &entity.PluginKVStorage{
+               PluginSlugName: kv.pluginSlugName,
+               Group:          group,
+               Key:            key,
+               Value:          value,
+       }
+
+       kv.cleanCache(ctx, group, key)
+
+       affected, err := query.Where(builder.Eq{
+               "plugin_slug_name": kv.pluginSlugName,
+               "`group`":          group,
+               "`key`":            key,
+       }).Cols("value").Update(data)
+       if err != nil {
+               return err
+       }
+
+       if affected == 0 {
+               _, err = query.Insert(data)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func (kv *KVOperator) Del(ctx context.Context, group, key string) error {
+       if err := kv.checkDB(); err != nil {
+               return err
+       }
+
+       if key == "" && group == "" {
+               return ErrKVKeyAndGroupEmpty
+       }
+
+       kv.cleanCache(ctx, group, key)
+
+       session, close := kv.getSession(ctx)
+       defer close()
+
+       session.Where(builder.Eq{
+               "plugin_slug_name": kv.pluginSlugName,
+       })
+       if group != "" {
+               session.Where(builder.Eq{"`group`": group})
+       }
+       if key != "" {
+               session.Where(builder.Eq{"`key`": key})
+       }
+
+       _, err := session.Delete(&entity.PluginKVStorage{})
+       return err
+}
+
+func (kv *KVOperator) cleanCache(ctx context.Context, group, key string) {
+       if key != "" {
+               if err := kv.data.Cache.Del(ctx, kv.getCacheKey("", key)); err 
!= nil {
+                       log.Warnf("Failed to delete cache for key %s: %v", key, 
err)
+               }
+
+               if group != "" {
+                       if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, 
key)); err != nil {
+                               log.Warnf("Failed to delete cache for group %s, 
key %s: %v", group, key, err)
+                       }
+               }
+       }
+
+       if group != "" {
+               if err := kv.data.Cache.Del(ctx, kv.getCacheKey(group, "")); 
err != nil {
+                       log.Warnf("Failed to delete cache for group %s: %v", 
group, err)
+               }
+       }
+}
+
+func (kv *KVOperator) GetByGroup(ctx context.Context, group string, page, 
pageSize int) (map[string]string, error) {
+       if err := kv.checkDB(); err != nil {
+               return nil, err
+       }
+
+       if group == "" {
+               return nil, ErrKVGroupEmpty
+       }
+
+       if page < 1 {
+               page = 1
+       }
+       if pageSize < 1 {
+               pageSize = 10
+       }
+
+       if pageSize > 100 {
+               pageSize = 100
+       }
+
+       cacheKey := kv.getCacheKey(group, "")
+       if value, exist, err := kv.data.Cache.GetString(ctx, cacheKey); err == 
nil && exist {
+               result := make(map[string]string)
+               if err := json.Unmarshal([]byte(value), &result); err == nil {
+                       return result, nil
+               }
+       }
+
+       query, close := kv.getSession(ctx)
+       defer close()
+
+       var items []entity.PluginKVStorage
+       err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, 
"`group`": group}).
+               Limit(pageSize, (page-1)*pageSize).
+               OrderBy("id ASC").
+               Find(&items)
+       if err != nil {
+               return nil, err
+       }
+
+       result := make(map[string]string, len(items))
+       for _, item := range items {
+               result[item.Key] = item.Value
+               if err := kv.data.Cache.SetString(ctx, kv.getCacheKey(group, 
item.Key), item.Value, kv.getCacheTTL()); err != nil {
+                       log.Warnf("Failed to set cache for group %s, key %s: 
%v", group, item.Key, err)
+               }
+       }
+
+       if resultJSON, err := json.Marshal(result); err == nil {
+               _ = kv.data.Cache.SetString(ctx, cacheKey, string(resultJSON), 
kv.getCacheTTL())
+       }
+
+       return result, nil
+}
+
+func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv 
*KVOperator) error) error {
+       if err := kv.checkDB(); err != nil {
+               return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err)
+       }
+
+       var (
+               txKv         = kv
+               shouldCommit bool
+       )
+
+       if kv.session == nil {
+               session := kv.data.DB.NewSession().Context(ctx)
+               if err := session.Begin(); err != nil {
+                       session.Close()
+                       return fmt.Errorf("%w: begin transaction failed: %v", 
ErrKVTransactionFailed, err)
+               }
+
+               defer func() {
+                       if !shouldCommit {
+                               if rollbackErr := session.Rollback(); 
rollbackErr != nil {
+                                       log.Errorf("rollback failed: %v", 
rollbackErr)
+                               }
+                       }
+                       session.Close()
+               }()
+
+               txKv = &KVOperator{
+                       session:        session,
+                       data:           kv.data,
+                       pluginSlugName: kv.pluginSlugName,
+               }
+               shouldCommit = true
+       }
+
+       if err := fn(ctx, txKv); err != nil {
+               return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err)
+       }
+
+       if shouldCommit {
+               if err := txKv.session.Commit(); err != nil {
+                       return fmt.Errorf("%w: commit failed: %v", 
ErrKVTransactionFailed, err)
+               }
+       }
+       return nil
+}
+
+// PluginData defines the interface for plugins that need data storage 
capabilities
+type KVStorage interface {
+       Info() Info
+       SetOperator(operator *KVOperator)
+}
+
+var (
+       _,
+       registerPluginKVStorage = func() (CallFn[KVStorage], 
RegisterFn[KVStorage]) {
+               callFn, registerFn := MakePlugin[KVStorage](false)
+               return callFn, func(p KVStorage) {
+                       registerFn(p)
+                       kvStoragePluginStack.plugins = 
append(kvStoragePluginStack.plugins, p)
+               }
+       }()
+       kvStoragePluginStack = &Stack[KVStorage]{}
+)
+
+func SetKVStorageDB(data *Data) {
+       for _, p := range kvStoragePluginStack.plugins {
+               p.SetOperator(&KVOperator{
+                       data:           data,
+                       pluginSlugName: p.Info().SlugName,
+               })
+       }
+}
diff --git a/plugin/plugin.go b/plugin/plugin.go
index 36087c54..dc8c35ea 100644
--- a/plugin/plugin.go
+++ b/plugin/plugin.go
@@ -23,13 +23,21 @@ import (
        "encoding/json"
        "sync"
 
+       "github.com/segmentfault/pacman/cache"
        "github.com/segmentfault/pacman/i18n"
+       "xorm.io/xorm"
 
        "github.com/apache/answer/internal/base/handler"
        "github.com/apache/answer/internal/base/translator"
        "github.com/gin-gonic/gin"
 )
 
+// Data is defined here to avoid circular dependency with internal/base/data
+type Data struct {
+       DB    *xorm.Engine
+       Cache cache.Cache
+}
+
 // GinContext is a wrapper of gin.Context
 // We export it to make it easy to use in plugins
 type GinContext = gin.Context
@@ -114,6 +122,10 @@ func Register(p Base) {
        if _, ok := p.(Importer); ok {
                registerImporter(p.(Importer))
        }
+
+       if _, ok := p.(KVStorage); ok {
+               registerPluginKVStorage(p.(KVStorage))
+       }
 }
 
 type Stack[T Base] struct {

Reply via email to