This is an automated email from the ASF dual-hosted git repository.
robocanic pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/develop by this push:
new 5d2946d0 feat: implement distributed lock by gorm (#1432)
5d2946d0 is described below
commit 5d2946d05cdaa8a662e683f68daf55d598de82de
Author: EVERFID <[email protected]>
AuthorDate: Sun Mar 15 15:19:41 2026 +0800
feat: implement distributed lock by gorm (#1432)
---
app/dubbo-admin/cmd/run.go | 1 +
pkg/common/bizerror/error.go | 2 +
pkg/common/constants/lock.go | 57 +++++
pkg/console/context/context.go | 10 +
pkg/console/service/condition_rule.go | 35 +++
pkg/console/service/configurator_rule.go | 35 +++
pkg/console/service/tag_rule.go | 39 ++++
pkg/core/bootstrap/bootstrap.go | 2 +
pkg/core/lock/component.go | 137 ++++++++++++
pkg/core/lock/factory.go | 100 +++++++++
pkg/core/lock/key.go | 48 ++++
pkg/core/lock/lock.go | 50 +++++
pkg/lock/gorm/factory.go | 61 ++++++
pkg/lock/gorm/lock.go | 278 +++++++++++++++++++++++
pkg/lock/gorm/lock_test.go | 364 +++++++++++++++++++++++++++++++
pkg/lock/gorm/model.go | 37 ++++
pkg/store/dbcommon/connection_pool.go | 18 ++
17 files changed, 1274 insertions(+)
diff --git a/app/dubbo-admin/cmd/run.go b/app/dubbo-admin/cmd/run.go
index 23ac9c50..a6bd5acb 100644
--- a/app/dubbo-admin/cmd/run.go
+++ b/app/dubbo-admin/cmd/run.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/dubbo-admin/pkg/config/app"
"github.com/apache/dubbo-admin/pkg/core/bootstrap"
"github.com/apache/dubbo-admin/pkg/core/logger"
+ _ "github.com/apache/dubbo-admin/pkg/lock/gorm"
dubboversion "github.com/apache/dubbo-admin/pkg/version"
)
diff --git a/pkg/common/bizerror/error.go b/pkg/common/bizerror/error.go
index b8cfc35d..e989d34c 100644
--- a/pkg/common/bizerror/error.go
+++ b/pkg/common/bizerror/error.go
@@ -46,6 +46,8 @@ const (
YamlError ErrorCode = "YamlError"
NotFoundError ErrorCode = "NotFoundError"
NetWorkError ErrorCode = "NetWorkError"
+ LockNotHeld ErrorCode = "LockNotHeld"
+ LockExpired ErrorCode = "LockExpired"
)
type bizError struct {
diff --git a/pkg/common/constants/lock.go b/pkg/common/constants/lock.go
new file mode 100644
index 00000000..30a9f404
--- /dev/null
+++ b/pkg/common/constants/lock.go
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package constants
+
+import "time"
+
+const (
+ // DefaultLockTimeout is the default timeout for distributed lock
operations
+ // This timeout applies to lock acquisition, renewal, and release
operations
+ DefaultLockTimeout = 30 * time.Second
+
+ // DefaultAutoRenewThreshold is the TTL threshold above which
auto-renewal is enabled
+ // Locks with TTL longer than this value will be automatically renewed
+ DefaultAutoRenewThreshold = 10 * time.Second
+
+ // DefaultUnlockTimeout is the timeout for unlock operations in
deferred cleanup
+ DefaultUnlockTimeout = 5 * time.Second
+
+ // DefaultRenewTimeout is the timeout for lock renewal operations
+ DefaultRenewTimeout = 5 * time.Second
+
+ // DefaultLockRetryInterval is the interval between lock acquisition
retry attempts
+ DefaultLockRetryInterval = 100 * time.Millisecond
+
+ // DefaultCleanupInterval is the interval for periodic expired lock
cleanup
+ DefaultCleanupInterval = 5 * time.Minute
+
+ // DefaultCleanupTimeout is the timeout for cleanup operations
+ DefaultCleanupTimeout = 30 * time.Second
+)
+
+// Lock key prefixes for different resource types
+const (
+ // TagRouteKeyPrefix is the prefix for tag route lock keys
+ TagRouteKeyPrefix = "tag_route"
+
+ // ConfiguratorRuleKeyPrefix is the prefix for configurator rule lock
keys
+ ConfiguratorRuleKeyPrefix = "configurator_rule"
+
+ // ConditionRuleKeyPrefix is the prefix for condition rule lock keys
+ ConditionRuleKeyPrefix = "condition_rule"
+)
diff --git a/pkg/console/context/context.go b/pkg/console/context/context.go
index 9133c4f9..f4c64ce5 100644
--- a/pkg/console/context/context.go
+++ b/pkg/console/context/context.go
@@ -19,6 +19,7 @@ package context
import (
ctx "context"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/apache/dubbo-admin/pkg/config/app"
"github.com/apache/dubbo-admin/pkg/console/counter"
@@ -33,6 +34,7 @@ type Context interface {
Config() app.AdminConfig
AppContext() ctx.Context
+ LockManager() lock.Lock
}
var _ Context = &context{}
@@ -71,3 +73,11 @@ func (c *context) CounterManager() counter.CounterManager {
}
return managerComp.CounterManager()
}
+
+func (c *context) LockManager() lock.Lock {
+ distributedLock, err := lock.GetLockFromRuntime(c.coreRt)
+ if err != nil {
+ return nil
+ }
+ return distributedLock
+}
diff --git a/pkg/console/service/condition_rule.go
b/pkg/console/service/condition_rule.go
index 7878928c..de0ebfc8 100644
--- a/pkg/console/service/condition_rule.go
+++ b/pkg/console/service/condition_rule.go
@@ -18,6 +18,8 @@
package service
import (
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/strutil"
@@ -106,6 +108,17 @@ func GetConditionRule(ctx context.Context, name string,
mesh string) (*meshresou
}
func UpdateConditionRule(ctx context.Context, res
*meshresource.ConditionRouteResource) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return updateConditionRuleUnsafe(ctx, res)
+ }
+ lockKey := lock.BuildConditionRuleLockKey(res.Mesh, res.Name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return updateConditionRuleUnsafe(ctx, res)
+ })
+}
+
+func updateConditionRuleUnsafe(ctx context.Context, res
*meshresource.ConditionRouteResource) error {
if err := ctx.ResourceManager().Update(res); err != nil {
logger.Warnf("update %s condition failed with error: %s",
res.Name, err.Error())
return err
@@ -114,6 +127,17 @@ func UpdateConditionRule(ctx context.Context, res
*meshresource.ConditionRouteRe
}
func CreateConditionRule(ctx context.Context, res
*meshresource.ConditionRouteResource) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return createConditionRuleUnsafe(ctx, res)
+ }
+ lockKey := lock.BuildConditionRuleLockKey(res.Mesh, res.Name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return createConditionRuleUnsafe(ctx, res)
+ })
+}
+
+func createConditionRuleUnsafe(ctx context.Context, res
*meshresource.ConditionRouteResource) error {
if err := ctx.ResourceManager().Add(res); err != nil {
logger.Warnf("create %s condition failed with error: %s",
res.Name, err.Error())
return err
@@ -122,6 +146,17 @@ func CreateConditionRule(ctx context.Context, res
*meshresource.ConditionRouteRe
}
func DeleteConditionRule(ctx context.Context, name string, mesh string) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return deleteConditionRuleUnsafe(ctx, name, mesh)
+ }
+ lockKey := lock.BuildConditionRuleLockKey(mesh, name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return deleteConditionRuleUnsafe(ctx, name, mesh)
+ })
+}
+
+func deleteConditionRuleUnsafe(ctx context.Context, name string, mesh string)
error {
if err :=
ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, mesh,
coremodel.BuildResourceKey(mesh, name)); err != nil {
return err
}
diff --git a/pkg/console/service/configurator_rule.go
b/pkg/console/service/configurator_rule.go
index 7bdf9794..16aa59e4 100644
--- a/pkg/console/service/configurator_rule.go
+++ b/pkg/console/service/configurator_rule.go
@@ -18,6 +18,8 @@
package service
import (
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/duke-git/lancet/v2/slice"
"github.com/apache/dubbo-admin/pkg/common/bizerror"
@@ -114,6 +116,17 @@ func GetConfigurator(ctx consolectx.Context, name string,
mesh string) (*meshres
}
func UpdateConfigurator(ctx consolectx.Context, res
*meshresource.DynamicConfigResource) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return updateConfiguratorUnsafe(ctx, res)
+ }
+ lockKey := lock.BuildConfiguratorRuleLockKey(res.Mesh, res.Name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return updateConfiguratorUnsafe(ctx, res)
+ })
+}
+
+func updateConfiguratorUnsafe(ctx consolectx.Context, res
*meshresource.DynamicConfigResource) error {
if err := ctx.ResourceManager().Update(res); err != nil {
logger.Warnf("update %s configurator failed with error: %s",
res.Name, err.Error())
return err
@@ -122,6 +135,17 @@ func UpdateConfigurator(ctx consolectx.Context, res
*meshresource.DynamicConfigR
}
func CreateConfigurator(ctx consolectx.Context, res
*meshresource.DynamicConfigResource) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return createConfiguratorUnsafe(ctx, res)
+ }
+ lockKey := lock.BuildConfiguratorRuleLockKey(res.Mesh, res.Name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return createConfiguratorUnsafe(ctx, res)
+ })
+}
+
+func createConfiguratorUnsafe(ctx consolectx.Context, res
*meshresource.DynamicConfigResource) error {
if err := ctx.ResourceManager().Add(res); err != nil {
logger.Warnf("create %s configurator failed with error: %s",
res.Name, err.Error())
return err
@@ -130,6 +154,17 @@ func CreateConfigurator(ctx consolectx.Context, res
*meshresource.DynamicConfigR
}
func DeleteConfigurator(ctx consolectx.Context, name string, mesh string)
error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return deleteConfiguratorUnsafe(ctx, name, mesh)
+ }
+ lockKey := lock.BuildConfiguratorRuleLockKey(mesh, name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return deleteConfiguratorUnsafe(ctx, name, mesh)
+ })
+}
+
+func deleteConfiguratorUnsafe(ctx consolectx.Context, name string, mesh
string) error {
if err :=
ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, mesh,
coremodel.BuildResourceKey(mesh, name)); err != nil {
logger.Warnf("delete %s configurator failed with error: %s",
name, err.Error())
return err
diff --git a/pkg/console/service/tag_rule.go b/pkg/console/service/tag_rule.go
index 0ab9e81e..972cbf10 100644
--- a/pkg/console/service/tag_rule.go
+++ b/pkg/console/service/tag_rule.go
@@ -18,6 +18,8 @@
package service
import (
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/duke-git/lancet/v2/slice"
"github.com/apache/dubbo-admin/pkg/common/bizerror"
@@ -112,6 +114,19 @@ func GetTagRule(ctx consolectx.Context, name string, mesh
string) (*meshresource
}
func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource)
error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return updateTagRuleUnsafe(ctx, res)
+ }
+
+ lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name)
+
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return updateTagRuleUnsafe(ctx, res)
+ })
+}
+
+func updateTagRuleUnsafe(ctx consolectx.Context, res
*meshresource.TagRouteResource) error {
err := ctx.ResourceManager().Update(res)
if err != nil {
logger.Warnf("update tag rule %s error: %v", res.Name, err)
@@ -121,6 +136,19 @@ func UpdateTagRule(ctx consolectx.Context, res
*meshresource.TagRouteResource) e
}
func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource)
error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return createTagRuleUnsafe(ctx, res)
+ }
+
+ lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name)
+
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return createTagRuleUnsafe(ctx, res)
+ })
+}
+
+func createTagRuleUnsafe(ctx consolectx.Context, res
*meshresource.TagRouteResource) error {
err := ctx.ResourceManager().Add(res)
if err != nil {
logger.Warnf("create tag rule %s error: %v", res.Name, err)
@@ -130,6 +158,17 @@ func CreateTagRule(ctx consolectx.Context, res
*meshresource.TagRouteResource) e
}
func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error {
+ lockMgr := ctx.LockManager()
+ if lockMgr == nil {
+ return deleteTagRuleUnsafe(ctx, name, mesh)
+ }
+ lockKey := lock.BuildTagRouteLockKey(mesh, name)
+ return lockMgr.WithLock(ctx.AppContext(), lockKey,
constants.DefaultLockTimeout, func() error {
+ return deleteTagRuleUnsafe(ctx, name, mesh)
+ })
+}
+
+func deleteTagRuleUnsafe(ctx consolectx.Context, name string, mesh string)
error {
err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind,
mesh, coremodel.BuildResourceKey(mesh, name))
if err != nil {
logger.Warnf("delete tag rule %s error: %v", name, err)
diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go
index b07376ed..d1ee2c0d 100644
--- a/pkg/core/bootstrap/bootstrap.go
+++ b/pkg/core/bootstrap/bootstrap.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/dubbo-admin/pkg/common/bizerror"
"github.com/apache/dubbo-admin/pkg/config/app"
"github.com/apache/dubbo-admin/pkg/console/counter"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/apache/dubbo-admin/pkg/core/logger"
"github.com/apache/dubbo-admin/pkg/core/runtime"
"github.com/apache/dubbo-admin/pkg/diagnostics"
@@ -128,6 +129,7 @@ func (sb *SmartBootstrapper) gatherComponents()
([]runtime.Component, error) {
}{
{"CounterManager", counter.ComponentType},
{"DiagnosticsServer", diagnostics.DiagnosticsServer},
+ {"DistributedLock", lock.DistributedLockComponent},
}
for _, comp := range optionalComps {
diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go
new file mode 100644
index 00000000..11814d28
--- /dev/null
+++ b/pkg/core/lock/component.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package lock
+
+import (
+ "context"
+ "math"
+ "time"
+
+ "github.com/pkg/errors"
+
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+ "github.com/apache/dubbo-admin/pkg/core/runtime"
+)
+
+func init() {
+ runtime.RegisterComponent(NewComponent())
+}
+
+const (
+ // DistributedLockComponent is the component type for distributed lock
+ DistributedLockComponent runtime.ComponentType = "distributed lock"
+)
+
+// Component implements the runtime.Component interface for distributed lock
+type Component struct {
+ lock Lock
+}
+
+// NewComponent creates a new distributed lock component
+func NewComponent() *Component {
+ return &Component{}
+}
+
+// Type returns the component type
+func (c *Component) Type() runtime.ComponentType {
+ return DistributedLockComponent
+}
+
+// Order indicates the initialization order
+// Lock should be initialized after Store (Order math.MaxInt - 1)
+// Higher order values are initialized first, so we use math.MaxInt - 2
+func (c *Component) Order() int {
+ return math.MaxInt - 2 // After Store, before other services
+}
+
+// Init initializes the distributed lock component
+func (c *Component) Init(ctx runtime.BuilderContext) error {
+ factory, err := LockFactoryRegistry().GetSupportedFactory(ctx)
+ if err != nil {
+ // No supporting factory found
+ logger.Warnf("No supported lock factory found: %v", err)
+ logger.Warn("Distributed lock will not be available")
+ return nil
+ }
+
+ // Lock created using a factory
+ lock, err := factory.NewLock(ctx)
+ if err != nil {
+ return errors.Wrap(err, "failed to create distributed lock")
+ }
+
+ c.lock = lock
+ logger.Info("Distributed lock component initialized successfully")
+ return nil
+}
+
+// Start starts the distributed lock component
+func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error {
+ if c.lock == nil {
+ logger.Warn("Distributed lock not available, skipping")
+ return nil
+ }
+
+ // Start background cleanup task
+ ticker := time.NewTicker(constants.DefaultCleanupInterval) // Cleanup
every 5 minutes
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-stop:
+ return nil
+ case <-ticker.C:
+ ctx, cancel :=
context.WithTimeout(context.Background(), constants.DefaultCleanupTimeout)
+ if err := c.lock.CleanupExpiredLocks(ctx); err != nil {
+ logger.Errorf("Failed to cleanup expired locks:
%v", err)
+ }
+ cancel()
+ }
+ }
+}
+
+// GetLock returns the lock instance
+func (c *Component) GetLock() Lock {
+ return c.lock
+}
+
+// GetLockFromRuntime extracts the lock instance from runtime
+func GetLockFromRuntime(rt runtime.Runtime) (Lock, error) {
+ comp, err := rt.GetComponent(DistributedLockComponent)
+ if err != nil {
+ return nil, err
+ }
+
+ lockComp, ok := comp.(*Component)
+ if !ok {
+ return nil, errors.Errorf("component %s is not a valid lock
component", DistributedLockComponent)
+ }
+
+ if lockComp.lock == nil {
+ return nil, errors.New("distributed lock is not available
(possibly using memory store)")
+ }
+
+ return lockComp.GetLock(), nil
+}
+
+func (c *Component) RequiredDependencies() []runtime.ComponentType {
+ return []runtime.ComponentType{
+ runtime.ResourceStore,
+ }
+}
diff --git a/pkg/core/lock/factory.go b/pkg/core/lock/factory.go
new file mode 100644
index 00000000..afa38d8d
--- /dev/null
+++ b/pkg/core/lock/factory.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package lock
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-admin/pkg/core/runtime"
+)
+
+var registry = newLockFactoryRegistry()
+
+// RegisterLockFactory registers a Lock factory
+func RegisterLockFactory(f Factory) {
+ registry.Register(f)
+}
+
+// LockFactoryRegistry returns the global factory registry
+func LockFactoryRegistry() Registry {
+ return registry
+}
+
+// Factory defines the factory interface for creating Locks
+type Factory interface {
+ // Support determines whether the factory supports creating a Lock from
a given context
+ // Determines this by inspecting the components in the context
+ Support(ctx runtime.BuilderContext) bool
+
+ // NewLock creates a Lock instance from the BuilderContext
+ // The factory decides for itself how to extract dependencies from the
context
+ NewLock(ctx runtime.BuilderContext) (Lock, error)
+}
+
+// Registry defines the query interface for the factory registry
+type Registry interface {
+ // GetSupportedFactory returns the first supported factory
+ GetSupportedFactory(ctx runtime.BuilderContext) (Factory, error)
+ // GetAllSupportedFactories returns all supported factories
+ GetAllSupportedFactories(ctx runtime.BuilderContext) []Factory
+}
+
+// RegistryMutator defines the interface for modifying the factory registry
+type RegistryMutator interface {
+ Register(Factory)
+}
+
+// MutableRegistry combines query and modification interfaces
+type MutableRegistry interface {
+ Registry
+ RegistryMutator
+}
+
+var _ MutableRegistry = &lockFactoryRegistry{}
+
+type lockFactoryRegistry struct {
+ factories []Factory
+}
+
+func newLockFactoryRegistry() MutableRegistry {
+ return &lockFactoryRegistry{
+ factories: make([]Factory, 0),
+ }
+}
+
+func (r *lockFactoryRegistry) GetSupportedFactory(ctx runtime.BuilderContext)
(Factory, error) {
+ for _, factory := range r.factories {
+ if factory.Support(ctx) {
+ return factory, nil
+ }
+ }
+ return nil, fmt.Errorf("no supported lock factory found")
+}
+
+func (r *lockFactoryRegistry) GetAllSupportedFactories(ctx
runtime.BuilderContext) []Factory {
+ supported := make([]Factory, 0)
+ for _, factory := range r.factories {
+ if factory.Support(ctx) {
+ supported = append(supported, factory)
+ }
+ }
+ return supported
+}
+
+func (r *lockFactoryRegistry) Register(factory Factory) {
+ r.factories = append(r.factories, factory)
+}
diff --git a/pkg/core/lock/key.go b/pkg/core/lock/key.go
new file mode 100644
index 00000000..29994e0b
--- /dev/null
+++ b/pkg/core/lock/key.go
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package lock
+
+import (
+ "fmt"
+
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+)
+
+// BuildLockKey constructs a lock key from a prefix and parts
+func BuildLockKey(prefix string, parts ...string) string {
+ key := prefix
+ for _, part := range parts {
+ key += ":" + part
+ }
+ return key
+}
+
+// BuildTagRouteLockKey constructs a lock key for tag route operations
+func BuildTagRouteLockKey(mesh, name string) string {
+ return fmt.Sprintf("%s:%s:%s", constants.TagRouteKeyPrefix, mesh, name)
+}
+
+// BuildConfiguratorRuleLockKey constructs a lock key for configurator rule
operations
+func BuildConfiguratorRuleLockKey(mesh, name string) string {
+ return fmt.Sprintf("%s:%s:%s", constants.ConfiguratorRuleKeyPrefix,
mesh, name)
+}
+
+// BuildConditionRuleLockKey constructs a lock key for condition rule
operations
+func BuildConditionRuleLockKey(mesh, name string) string {
+ return fmt.Sprintf("%s:%s:%s", constants.ConditionRuleKeyPrefix, mesh,
name)
+}
diff --git a/pkg/core/lock/lock.go b/pkg/core/lock/lock.go
new file mode 100644
index 00000000..4b3bd781
--- /dev/null
+++ b/pkg/core/lock/lock.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package lock
+
+import (
+ "context"
+ "time"
+)
+
+// Lock defines the distributed lock interface
+// This abstraction allows for multiple implementations (GORM, Redis, etcd,
etc.)
+type Lock interface {
+ // Lock acquires a distributed lock, blocking until successful or
context cancelled
+ Lock(ctx context.Context, key string, ttl time.Duration) error
+
+ // TryLock attempts to acquire a lock without blocking
+ // Returns true if lock was acquired, false otherwise
+ TryLock(ctx context.Context, key string, ttl time.Duration) (bool,
error)
+
+ // Unlock releases a lock held by this instance
+ Unlock(ctx context.Context, key string) error
+
+ // Renew extends the TTL of a lock held by this instance
+ Renew(ctx context.Context, key string, ttl time.Duration) error
+
+ // IsLocked checks if a lock is currently held by anyone
+ IsLocked(ctx context.Context, key string) (bool, error)
+
+ // WithLock executes a function while holding a lock
+ // Automatically acquires the lock, executes the function, and releases
the lock
+ WithLock(ctx context.Context, key string, ttl time.Duration, fn func()
error) error
+
+ // CleanupExpiredLocks removes expired locks (maintenance task)
+ CleanupExpiredLocks(ctx context.Context) error
+}
diff --git a/pkg/lock/gorm/factory.go b/pkg/lock/gorm/factory.go
new file mode 100644
index 00000000..ff9f81bd
--- /dev/null
+++ b/pkg/lock/gorm/factory.go
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gorm
+
+import (
+ "fmt"
+
+ "github.com/apache/dubbo-admin/pkg/core/lock"
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+ "github.com/apache/dubbo-admin/pkg/core/runtime"
+ "github.com/apache/dubbo-admin/pkg/store/dbcommon"
+)
+
+func init() {
+ lock.RegisterLockFactory(&gormLockFactory{})
+}
+
+type gormLockFactory struct{}
+
+// Support checks if GORM-based lock is supported based on store configuration
+func (f *gormLockFactory) Support(ctx runtime.BuilderContext) bool {
+ cfg := ctx.Config().Store
+ // GORM lock is supported for database-backed stores (mysql, postgres)
+ return cfg.Type == "mysql" || cfg.Type == "postgres"
+}
+
+// NewLock creates a GORM Lock instance by obtaining DB from dbcommon package
+func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (lock.Lock,
error) {
+ cfg := ctx.Config().Store
+
+ // Get the database connection from dbcommon's global connection pool
+ // This reuses the existing connection pool created by the store
+ // but accesses it through the dbcommon package instead of
StoreComponent
+ db := dbcommon.GetGlobalDB(cfg.Type)
+ if db == nil {
+ return nil, fmt.Errorf("no database connection found for store
type: %s", cfg.Type)
+ }
+
+ // Auto-migrate lock table
+ if err := db.AutoMigrate(&LockRecord{}); err != nil {
+ return nil, fmt.Errorf("failed to migrate lock table: %w", err)
+ }
+
+ logger.Info("Creating GORM-based distributed lock using existing
database connection")
+ return NewGormLockFromDB(db), nil
+}
diff --git a/pkg/lock/gorm/lock.go b/pkg/lock/gorm/lock.go
new file mode 100644
index 00000000..83e1e658
--- /dev/null
+++ b/pkg/lock/gorm/lock.go
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gorm
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/google/uuid"
+ "gorm.io/gorm"
+ "gorm.io/gorm/clause"
+
+ "github.com/apache/dubbo-admin/pkg/common/bizerror"
+ "github.com/apache/dubbo-admin/pkg/common/constants"
+ "github.com/apache/dubbo-admin/pkg/core/lock"
+ "github.com/apache/dubbo-admin/pkg/core/logger"
+ "github.com/apache/dubbo-admin/pkg/store/dbcommon"
+)
+
+// Ensure GormLock implements Lock interface
+var _ lock.Lock = (*GormLock)(nil)
+
+// GormLock provides distributed locking using database as backend
+// It uses GORM for database operations and supports MySQL, PostgreSQL, etc.
+type GormLock struct {
+ pool *dbcommon.ConnectionPool
+ db *gorm.DB // Direct DB reference to avoid circular dependency
+ owner string // Unique identifier for this lock instance
+}
+
+// NewGormLock creates a new GORM-based distributed lock instance
+// Deprecated: Use NewGormLockFromDB to avoid circular dependencies
+func NewGormLock(pool *dbcommon.ConnectionPool) lock.Lock {
+ return &GormLock{
+ pool: pool,
+ db: pool.GetDB(),
+ owner: uuid.New().String(),
+ }
+}
+
+// NewGormLockFromDB creates a new GORM-based distributed lock instance from a
DB connection
+// This is the preferred constructor to avoid circular dependencies
+func NewGormLockFromDB(db *gorm.DB) lock.Lock {
+ return &GormLock{
+ db: db,
+ owner: uuid.New().String(),
+ }
+}
+
+// getDB returns the database instance, to prefer direct DB to pool
+func (g *GormLock) getDB() *gorm.DB {
+ if g.db != nil {
+ return g.db
+ }
+ if g.pool != nil {
+ return g.pool.GetDB()
+ }
+ return nil
+}
+
+// Lock acquires a lock with the specified key and TTL
+// It blocks until the lock is acquired or context is cancelled
+func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration)
error {
+ ticker := time.NewTicker(constants.DefaultLockRetryInterval)
+ defer ticker.Stop()
+
+ for {
+ acquired, err := g.TryLock(ctx, key, ttl)
+ if err != nil {
+ return fmt.Errorf("failed to try lock: %w", err)
+ }
+ if acquired {
+ return nil
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ }
+ }
+}
+
+// TryLock attempts to acquire a lock without blocking
+// Returns true if lock was acquired, false otherwise
+func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration)
(bool, error) {
+ db := g.getDB().WithContext(ctx)
+ expireAt := time.Now().Add(ttl)
+
+ var acquired bool
+ err := db.Transaction(func(tx *gorm.DB) error {
+ // Clean up only this key's expired lock to improve performance
+ now := time.Now()
+ if err := tx.Where("lock_key = ? AND expire_at < ?", key, now).
+ Delete(&LockRecord{}).Error; err != nil {
+ return fmt.Errorf("failed to clean expired lock for key
%s: %w", key, err)
+ }
+
+ // Try to acquire lock using INSERT ... ON CONFLICT
+ lock := &LockRecord{
+ LockKey: key,
+ Owner: g.owner,
+ ExpireAt: expireAt,
+ }
+
+ // Try to insert the lock record
+ result := tx.Clauses(clause.OnConflict{
+ Columns: []clause.Column{{Name: "lock_key"}},
+ DoNothing: true, // If conflict, do nothing
+ }).Create(lock)
+
+ if result.Error != nil {
+ return fmt.Errorf("failed to insert lock record: %w",
result.Error)
+ }
+
+ // Check if the insertion was successful
+ if result.RowsAffected == 0 {
+ // The lock already exists
+ acquired = false
+ return nil
+ }
+
+ // New row inserted successfully, lock acquired successfully
+ acquired = true
+ return nil
+ })
+
+ if err != nil {
+ return false, err
+ }
+
+ return acquired, nil
+}
+
+// Unlock releases a lock held by this instance
+func (g *GormLock) Unlock(ctx context.Context, key string) error {
+ db := g.getDB().WithContext(ctx)
+
+ result := db.Where("lock_key = ? AND owner = ?", key, g.owner).
+ Delete(&LockRecord{})
+
+ if result.Error != nil {
+ return fmt.Errorf("failed to release lock: %w", result.Error)
+ }
+
+ if result.RowsAffected == 0 {
+ return bizerror.New(bizerror.LockNotHeld, "lock not held by
this owner")
+ }
+
+ return nil
+}
+
+// Renew extends the TTL of a lock held by this instance
+func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration)
error {
+ db := g.getDB().WithContext(ctx)
+ newExpireAt := time.Now().Add(ttl)
+
+ result := db.Model(&LockRecord{}).
+ Where("lock_key = ? AND owner = ?", key, g.owner).
+ Update("expire_at", newExpireAt)
+
+ if result.Error != nil {
+ return fmt.Errorf("failed to renew lock: %w", result.Error)
+ }
+
+ if result.RowsAffected == 0 {
+ return bizerror.New(bizerror.LockNotHeld, "lock not held by
this owner")
+ }
+
+ return nil
+}
+
+// IsLocked checks if a lock is currently held (by anyone)
+func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) {
+ db := g.getDB().WithContext(ctx)
+
+ var count int64
+ err := db.Model(&LockRecord{}).
+ Where("lock_key = ? AND expire_at > ?", key, time.Now()).
+ Count(&count).Error
+
+ if err != nil {
+ return false, fmt.Errorf("failed to check lock status: %w", err)
+ }
+
+ return count > 0, nil
+}
+
+// WithLock executes a function while holding a lock
+func (g *GormLock) WithLock(ctx context.Context, key string, ttl
time.Duration, fn func() error) error {
+ // Acquire lock
+ if err := g.Lock(ctx, key, ttl); err != nil {
+ return fmt.Errorf("failed to acquire lock: %w", err)
+ }
+
+ // Ensure lock is released
+ defer func() {
+ // Use background context for unlock to ensure it completes
even if ctx is cancelled
+ unlockCtx, cancel := context.WithTimeout(context.Background(),
constants.DefaultUnlockTimeout)
+ defer cancel()
+
+ if err := g.Unlock(unlockCtx, key); err != nil {
+ logger.Errorf("Failed to release lock %s: %v", key, err)
+ }
+ }()
+
+ // Start auto-renewal if TTL is long enough
+ var renewDone chan struct{}
+ if ttl > constants.DefaultAutoRenewThreshold {
+ renewDone = make(chan struct{})
+ go g.autoRenew(ctx, key, ttl, renewDone)
+ defer close(renewDone)
+ }
+
+ // Execute the function
+ return fn()
+}
+
+// autoRenew periodically renews the lock until done channel is closed
+func (g *GormLock) autoRenew(ctx context.Context, key string, ttl
time.Duration, done <-chan struct{}) {
+ // Renew at 1/3 of TTL to ensure lock doesn't expire
+ renewInterval := ttl / 3
+ ticker := time.NewTicker(renewInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-done:
+ return
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ // Double-check done channel before renewing to avoid
unnecessary renewal
+ select {
+ case <-done:
+ return
+ default:
+ }
+
+ renewCtx, cancel :=
context.WithTimeout(context.Background(), constants.DefaultRenewTimeout)
+ if err := g.Renew(renewCtx, key, ttl); err != nil {
+ logger.Warnf("Failed to renew lock %s: %v",
key, err)
+ cancel()
+ return
+ }
+ cancel()
+ }
+ }
+}
+
+// CleanupExpiredLocks removes all expired locks from the database
+// This should be called periodically as a maintenance task
+func (g *GormLock) CleanupExpiredLocks(ctx context.Context) error {
+ db := g.getDB().WithContext(ctx)
+
+ result := db.Where("expire_at < ?", time.Now()).Delete(&LockRecord{})
+ if result.Error != nil {
+ return fmt.Errorf("failed to cleanup expired locks: %w",
result.Error)
+ }
+
+ return nil
+}
diff --git a/pkg/lock/gorm/lock_test.go b/pkg/lock/gorm/lock_test.go
new file mode 100644
index 00000000..48057994
--- /dev/null
+++ b/pkg/lock/gorm/lock_test.go
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gorm_test
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gorm.io/driver/sqlite"
+ "gorm.io/gorm"
+
+ "github.com/apache/dubbo-admin/pkg/common/bizerror"
+ gormlock "github.com/apache/dubbo-admin/pkg/lock/gorm"
+)
+
+func setupTestDB(t *testing.T) *gorm.DB {
+ db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"),
&gorm.Config{
+ PrepareStmt: false,
+ })
+ require.NoError(t, err, "failed to create test database")
+
+ sqlDB, err := db.DB()
+ require.NoError(t, err)
+
+ sqlDB.SetMaxOpenConns(1)
+
+ err = db.Exec("PRAGMA journal_mode=WAL;").Error
+ require.NoError(t, err, "failed to set WAL mode")
+
+ err = db.Exec("PRAGMA busy_timeout=5000;").Error
+ require.NoError(t, err, "failed to set busy timeout")
+
+ err = db.AutoMigrate(&gormlock.LockRecord{})
+ require.NoError(t, err, "failed to migrate lock table")
+
+ return db
+}
+
+func TestBasicLockUnlock(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err := lockInstance.Lock(ctx, "test-key", 5*time.Second)
+ assert.NoError(t, err, "should acquire lock successfully")
+
+ isLocked, err := lockInstance.IsLocked(ctx, "test-key")
+ assert.NoError(t, err)
+ assert.True(t, isLocked, "lock should be held")
+
+ err = lockInstance.Unlock(ctx, "test-key")
+ assert.NoError(t, err, "should release lock successfully")
+
+ isLocked, err = lockInstance.IsLocked(ctx, "test-key")
+ assert.NoError(t, err)
+ assert.False(t, isLocked, "lock should be released")
+}
+
+func TestTryLock(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ acquired, err := lock1.TryLock(ctx, "test-key", 5*time.Second)
+ assert.NoError(t, err)
+ assert.True(t, acquired, "first lock should be acquired")
+
+ acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second)
+ assert.NoError(t, err)
+ assert.False(t, acquired, "second lock should not be acquired")
+
+ err = lock1.Unlock(ctx, "test-key")
+ assert.NoError(t, err)
+
+ acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second)
+ assert.NoError(t, err)
+ assert.True(t, acquired, "second lock should be acquired after first is
released")
+
+ _ = lock2.Unlock(ctx, "test-key")
+}
+
+func TestConcurrentLockAttempts(t *testing.T) {
+ db := setupTestDB(t)
+ ctx := context.Background()
+
+ const numGoroutines = 10
+ var successCount atomic.Int32
+ var wg sync.WaitGroup
+ wg.Add(numGoroutines)
+
+ for i := 0; i < numGoroutines; i++ {
+ go func() {
+ defer wg.Done()
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ acquired, err := lockInstance.TryLock(ctx,
"concurrent-key", 1*time.Second)
+ if err == nil && acquired {
+ successCount.Add(1)
+ time.Sleep(100 * time.Millisecond) // Hold lock
briefly
+ _ = lockInstance.Unlock(ctx, "concurrent-key")
+ }
+ }()
+ }
+
+ wg.Wait()
+
+ assert.Equal(t, int32(1), successCount.Load(), "only one goroutine
should acquire the lock")
+}
+
+func TestLockExpiration(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ acquired, err := lock1.TryLock(ctx, "expire-key", 100*time.Millisecond)
+ assert.NoError(t, err)
+ assert.True(t, acquired)
+
+ acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second)
+ assert.NoError(t, err)
+ assert.False(t, acquired, "lock should still be held")
+
+ time.Sleep(200 * time.Millisecond)
+
+ acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second)
+ assert.NoError(t, err)
+ assert.True(t, acquired, "lock should be acquired after expiration")
+
+ _ = lock2.Unlock(ctx, "expire-key")
+}
+
+func TestLockRenewal(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err := lockInstance.Lock(ctx, "renew-key", 1*time.Second)
+ require.NoError(t, err)
+
+ time.Sleep(500 * time.Millisecond)
+
+ err = lockInstance.Renew(ctx, "renew-key", 2*time.Second)
+ assert.NoError(t, err, "should renew lock successfully")
+
+ isLocked, err := lockInstance.IsLocked(ctx, "renew-key")
+ assert.NoError(t, err)
+ assert.True(t, isLocked, "lock should still be held after renewal")
+
+ _ = lockInstance.Unlock(ctx, "renew-key")
+}
+
+func TestUnlockNotHeld(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err := lock1.Lock(ctx, "test-key", 5*time.Second)
+ require.NoError(t, err)
+
+ err = lock2.Unlock(ctx, "test-key")
+ assert.Error(t, err, "should return error")
+
+ // 检查错误类型和错误码
+ var bizErr bizerror.Error
+ if assert.ErrorAs(t, err, &bizErr) {
+ assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should
return LockNotHeld error code")
+ }
+
+ _ = lock1.Unlock(ctx, "test-key")
+}
+
+func TestRenewNotHeld(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err := lock1.Lock(ctx, "test-key", 5*time.Second)
+ require.NoError(t, err)
+
+ err = lock2.Renew(ctx, "test-key", 10*time.Second)
+ assert.Error(t, err, "should return error")
+
+ var bizErr bizerror.Error
+ if assert.ErrorAs(t, err, &bizErr) {
+ assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should
return LockNotHeld error code")
+ }
+
+ _ = lock1.Unlock(ctx, "test-key")
+}
+
+func TestWithLock(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ executed := false
+ err := lockInstance.WithLock(ctx, "with-lock-key", 2*time.Second,
func() error {
+ executed = true
+ isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key")
+ assert.NoError(t, err)
+ assert.True(t, isLocked)
+ return nil
+ })
+
+ assert.NoError(t, err)
+ assert.True(t, executed, "function should be executed")
+
+ time.Sleep(100 * time.Millisecond)
+ isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key")
+ assert.NoError(t, err)
+ assert.False(t, isLocked, "lock should be released after WithLock")
+}
+
+func TestWithLockAutoRenewal(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ executed := false
+ err := lockInstance.WithLock(ctx, "auto-renew-key", 15*time.Second,
func() error {
+ time.Sleep(6 * time.Second)
+ executed = true
+ return nil
+ })
+
+ assert.NoError(t, err)
+ assert.True(t, executed, "function should be executed")
+
+ time.Sleep(100 * time.Millisecond)
+ isLocked, err := lockInstance.IsLocked(ctx, "auto-renew-key")
+ assert.NoError(t, err)
+ assert.False(t, isLocked, "lock should be released after WithLock")
+}
+
+func TestWithLockContextCancellation(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ started := make(chan struct{})
+ err := lockInstance.WithLock(ctx, "cancel-key", 5*time.Second, func()
error {
+ close(started)
+ cancel()
+ time.Sleep(100 * time.Millisecond)
+ return nil
+ })
+
+ <-started
+
+ assert.NoError(t, err, "function should complete even if context is
cancelled during execution")
+
+ time.Sleep(100 * time.Millisecond)
+ isLocked, err := lockInstance.IsLocked(context.Background(),
"cancel-key")
+ assert.NoError(t, err)
+ assert.False(t, isLocked, "lock should be released even after context
cancellation")
+}
+
+func TestCleanupExpiredLocks(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ _, _ = lock1.TryLock(ctx, "cleanup-key-1", 100*time.Millisecond)
+ _, _ = lock2.TryLock(ctx, "cleanup-key-2", 100*time.Millisecond)
+
+ time.Sleep(200 * time.Millisecond)
+
+ err := lock1.CleanupExpiredLocks(ctx)
+ assert.NoError(t, err)
+
+ var count int64
+ db.Model(&gormlock.LockRecord{}).Count(&count)
+ assert.Equal(t, int64(0), count, "all expired locks should be cleaned
up")
+}
+
+func TestMultipleDifferentLocks(t *testing.T) {
+ db := setupTestDB(t)
+ lockInstance := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err1 := lockInstance.Lock(ctx, "key-1", 5*time.Second)
+ err2 := lockInstance.Lock(ctx, "key-2", 5*time.Second)
+ err3 := lockInstance.Lock(ctx, "key-3", 5*time.Second)
+
+ assert.NoError(t, err1)
+ assert.NoError(t, err2)
+ assert.NoError(t, err3)
+
+ isLocked1, _ := lockInstance.IsLocked(ctx, "key-1")
+ isLocked2, _ := lockInstance.IsLocked(ctx, "key-2")
+ isLocked3, _ := lockInstance.IsLocked(ctx, "key-3")
+
+ assert.True(t, isLocked1)
+ assert.True(t, isLocked2)
+ assert.True(t, isLocked3)
+
+ _ = lockInstance.Unlock(ctx, "key-1")
+ _ = lockInstance.Unlock(ctx, "key-2")
+ _ = lockInstance.Unlock(ctx, "key-3")
+}
+
+func TestLockBlockingBehavior(t *testing.T) {
+ db := setupTestDB(t)
+ lock1 := gormlock.NewGormLockFromDB(db)
+ lock2 := gormlock.NewGormLockFromDB(db)
+ ctx := context.Background()
+
+ err := lock1.Lock(ctx, "blocking-key", 10*time.Second)
+ require.NoError(t, err)
+
+ isLocked, err := lock1.IsLocked(ctx, "blocking-key")
+ require.NoError(t, err)
+ require.True(t, isLocked)
+
+ acquiredTime := time.Now()
+ done := make(chan time.Time)
+
+ go func() {
+ _ = lock2.Lock(ctx, "blocking-key", 10*time.Second)
+ done <- time.Now()
+ }()
+
+ time.Sleep(500 * time.Millisecond)
+
+ unlockErr := lock1.Unlock(ctx, "blocking-key")
+ require.NoError(t, unlockErr, "unlock should succeed")
+
+ isLocked, err = lock1.IsLocked(ctx, "blocking-key")
+ require.NoError(t, err)
+
+ lock2AcquiredTime := <-done
+
+ duration := lock2AcquiredTime.Sub(acquiredTime)
+
+ assert.GreaterOrEqual(t, duration, 500*time.Millisecond, "lock2 should
acquire after lock1 releases")
+ assert.Less(t, duration, 1500*time.Millisecond, "lock2 should acquire
shortly after lock1 releases")
+
+ _ = lock2.Unlock(ctx, "blocking-key")
+}
diff --git a/pkg/lock/gorm/model.go b/pkg/lock/gorm/model.go
new file mode 100644
index 00000000..0becabb8
--- /dev/null
+++ b/pkg/lock/gorm/model.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gorm
+
+import (
+ "time"
+)
+
+// LockRecord represents a distributed lock record in the database
+type LockRecord struct {
+ ID uint `gorm:"primarykey"`
+ LockKey string `gorm:"uniqueIndex;size:255;not null"` // Unique
lock identifier
+ Owner string `gorm:"size:255;not null"` // UUID of
the lock holder
+ ExpireAt time.Time `gorm:"index;not null"` // Lock
expiration time
+ CreatedAt time.Time `gorm:"autoCreateTime"` // Lock
creation time
+ UpdatedAt time.Time `gorm:"autoUpdateTime"` // Last
renewal time
+}
+
+// TableName returns the table name for LockRecord
+func (LockRecord) TableName() string {
+ return "distributed_locks"
+}
diff --git a/pkg/store/dbcommon/connection_pool.go
b/pkg/store/dbcommon/connection_pool.go
index c9487825..5a866262 100644
--- a/pkg/store/dbcommon/connection_pool.go
+++ b/pkg/store/dbcommon/connection_pool.go
@@ -224,3 +224,21 @@ func (p *ConnectionPool) Stats() sql.DBStats {
}
return sql.DBStats{}
}
+
+// GetGlobalDB returns the first available gorm.DB instance from the global
pool registry
+// This is used by components that need database access without going through
StoreComponent
+// Returns nil if no database connection is available
+func GetGlobalDB(storeType storecfg.Type) *gorm.DB {
+ poolsMutex.RLock()
+ defer poolsMutex.RUnlock()
+
+ // Find the first pool matching the store type
+ prefix := string(storeType) + ":"
+ for key, pool := range pools {
+ if len(key) >= len(prefix) && key[:len(prefix)] == prefix {
+ return pool.GetDB()
+ }
+ }
+
+ return nil
+}