This is an automated email from the ASF dual-hosted git repository.
justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1e78aaf53 Implement meta cache (#2371)
1e78aaf53 is described below
commit 1e78aaf53cf29285a878a99ddeb46585e680944f
Author: finalt <[email protected]>
AuthorDate: Fri Aug 11 17:06:09 2023 +0800
Implement meta cache (#2371)
* implement cache manager
* add meta cache
* updata go mod
* improve code
* improve code
* improve init time
* update cache
* fix bug
* improve code
* improve code
* improve code
---
common/constant/key.go | 7 +
go.mod | 1 +
go.sum | 1 +
.../service_instances_changed_listener_impl.go | 36 +++-
registry/servicediscovery/store/cache_manager.go | 182 +++++++++++++++++++++
.../servicediscovery/store/cache_manager_test.go | 150 +++++++++++++++++
6 files changed, 374 insertions(+), 3 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index b786b9951..383b57e57 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -416,3 +416,10 @@ const (
MetricsMetadata = "dubbo.metrics.metadata"
MetricApp = "dubbo.metrics.app"
)
+
+// default meta cache config
+const (
+ DefaultMetaCacheName = "dubbo.meta"
+ DefaultMetaFileName = "dubbo.metadata"
+ DefaultEntrySize = 100
+)
diff --git a/go.mod b/go.mod
index d66112570..89764c71f 100644
--- a/go.mod
+++ b/go.mod
@@ -30,6 +30,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 //
indirect
github.com/grpc-ecosystem/grpc-opentracing
v0.0.0-20180507213350-8e809c8a8645
+ github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/vault/sdk v0.7.0
github.com/influxdata/tdigest v0.0.1
github.com/jinzhu/copier v0.3.5
diff --git a/go.sum b/go.sum
index 5390019e4..3c47c9d14 100644
--- a/go.sum
+++ b/go.sum
@@ -801,6 +801,7 @@ github.com/hashicorp/go-version v1.2.0/go.mod
h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1/go.mod
h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4
h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
diff --git
a/registry/servicediscovery/service_instances_changed_listener_impl.go
b/registry/servicediscovery/service_instances_changed_listener_impl.go
index a0039bdb2..e0714591a 100644
--- a/registry/servicediscovery/service_instances_changed_listener_impl.go
+++ b/registry/servicediscovery/service_instances_changed_listener_impl.go
@@ -19,6 +19,7 @@ package servicediscovery
import (
"reflect"
+ "time"
)
import (
@@ -32,9 +33,22 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/store"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
+var (
+ metaCache *store.CacheManager
+)
+
+func init() {
+ cache, err := store.NewCacheManager(constant.DefaultMetaCacheName,
constant.DefaultMetaFileName, time.Minute*10, constant.DefaultEntrySize)
+ if err != nil {
+ logger.Fatal("Failed to create cache [%s],the err is %v",
constant.DefaultMetaCacheName, err)
+ }
+ metaCache = cache
+}
+
// ServiceInstancesChangedListenerImpl The Service Discovery Changed Event
Listener
type ServiceInstancesChangedListenerImpl struct {
serviceNames *gxset.HashSet
@@ -88,9 +102,14 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e
observer.Event) error
revisionToInstances[revision] = append(subInstances,
instance)
metadataInfo := lstn.revisionToMetadata[revision]
if metadataInfo == nil {
- metadataInfo, err = GetMetadataInfo(instance,
revision)
- if err != nil {
- return err
+ if val, ok := metaCache.Get(revision); ok {
+ metadataInfo =
val.(*common.MetadataInfo)
+ } else {
+ metadataInfo, err =
GetMetadataInfo(instance, revision)
+ if err != nil {
+ return err
+ }
+ metaCache.Set(revision, metadataInfo)
}
}
instance.SetServiceMetadata(metadataInfo)
@@ -104,6 +123,9 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e
observer.Event) error
newRevisionToMetadata[revision] = metadataInfo
}
lstn.revisionToMetadata = newRevisionToMetadata
+ for revision, metadataInfo := range newRevisionToMetadata {
+ metaCache.Set(revision, metadataInfo)
+ }
for serviceInfo, revisions := range localServiceToRevisions {
revisionsToUrls :=
protocolRevisionsToUrls[serviceInfo.Protocol]
@@ -187,6 +209,11 @@ func (lstn *ServiceInstancesChangedListenerImpl)
GetEventType() reflect.Type {
// GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is
null
func GetMetadataInfo(instance registry.ServiceInstance, revision string)
(*common.MetadataInfo, error) {
+
+ if metadataInfo, ok := metaCache.Get(revision); ok {
+ return metadataInfo.(*common.MetadataInfo), nil
+ }
+
var metadataStorageType string
var metadataInfo *common.MetadataInfo
if instance.GetMetadata() == nil {
@@ -212,5 +239,8 @@ func GetMetadataInfo(instance registry.ServiceInstance,
revision string) (*commo
return nil, err
}
}
+
+ metaCache.Set(revision, metadataInfo)
+
return metadataInfo, nil
}
diff --git a/registry/servicediscovery/store/cache_manager.go
b/registry/servicediscovery/store/cache_manager.go
new file mode 100644
index 000000000..861e51dc4
--- /dev/null
+++ b/registry/servicediscovery/store/cache_manager.go
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package store
+
+import (
+ "encoding/gob"
+ "os"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ "github.com/hashicorp/golang-lru"
+)
+
+type CacheManager struct {
+ name string // The name of the cache manager
+ cacheFile string // The file path where the cache is stored
+ dumpInterval time.Duration // The duration after which the cache dump
+ stop chan struct{} // Channel used to stop the cache expiration
routine
+ cache *lru.Cache // The LRU cache implementation
+ lock sync.Mutex
+}
+
+type Item struct {
+ Key string
+ Value interface{}
+}
+
+// NewCacheManager creates a new CacheManager instance.
+// It initializes the cache manager with the provided parameters and starts a
routine for cache expiration.
+func NewCacheManager(name, cacheFile string, dumpInterval time.Duration,
maxCacheSize int) (*CacheManager, error) {
+ cm := &CacheManager{
+ name: name,
+ cacheFile: cacheFile,
+ dumpInterval: dumpInterval,
+ stop: make(chan struct{}),
+ }
+ cache, err := lru.New(maxCacheSize)
+ if err != nil {
+ return nil, err
+ }
+ cm.cache = cache
+
+ // Check if the cache file exists and load the cache if it does
+ if _, err := os.Stat(cacheFile); err == nil {
+ if err = cm.loadCache(); err != nil {
+ logger.Warnf("Failed to load the cache file:[%s].The
err is %v", cm.cacheFile, err)
+ }
+ }
+
+ go cm.RunDumpTask()
+
+ return cm, nil
+}
+
+// Get retrieves the value associated with the given key from the cache.
+func (cm *CacheManager) Get(key string) (interface{}, bool) {
+ return cm.cache.Get(key)
+}
+
+// Set sets the value associated with the given key in the cache.
+func (cm *CacheManager) Set(key string, value interface{}) {
+ cm.cache.Add(key, value)
+}
+
+// Delete removes the value associated with the given key from the cache.
+func (cm *CacheManager) Delete(key string) {
+ cm.cache.Remove(key)
+}
+
+// GetAll returns all the key-value pairs in the cache.
+func (cm *CacheManager) GetAll() map[string]interface{} {
+ keys := cm.cache.Keys()
+
+ result := make(map[string]interface{})
+ for _, k := range keys {
+ result[k.(string)], _ = cm.cache.Get(k)
+ }
+
+ return result
+}
+
+// loadCache loads the cache from the cache file.
+func (cm *CacheManager) loadCache() error {
+ cf, err := os.Open(cm.cacheFile)
+ if err != nil {
+ return err
+ }
+
+ decoder := gob.NewDecoder(cf)
+ for {
+ var it Item
+ err = decoder.Decode(&it)
+ if err != nil {
+ if err.Error() == "EOF" {
+ break // Reached end of file
+ }
+ return err
+ }
+ // Add the loaded keys to the front of the LRU list
+ cm.cache.Add(it.Key, it.Value)
+ }
+
+ return cf.Close()
+}
+
+// dumpCache dumps the cache to the cache file.
+func (cm *CacheManager) dumpCache() error {
+
+ cm.lock.Lock()
+ defer cm.lock.Unlock()
+
+ items := cm.GetAll()
+
+ file, err := os.Create(cm.cacheFile)
+ if err != nil {
+ return err
+ }
+
+ encoder := gob.NewEncoder(file)
+ for k, v := range items {
+ gob.Register(v)
+ err = encoder.Encode(&Item{
+ Key: k,
+ Value: v,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return file.Close()
+}
+
+func (cm *CacheManager) RunDumpTask() {
+ ticker := time.NewTicker(cm.dumpInterval)
+ for {
+ select {
+ case <-ticker.C:
+ // Dump the cache to the file
+ if err := cm.dumpCache(); err != nil {
+ // Handle error
+ logger.Warnf("Failed to dump cache,the err is
%v", err)
+ } else {
+ logger.Infof("Dumping [%s] caches, latest
entries %d", cm.name, cm.cache.Len())
+ }
+ case <-cm.stop:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+// destroy stops the cache dump routine, clears the cache and removes the
cache file.
+func (cm *CacheManager) destroy() {
+ cm.stop <- struct{}{} // Stop the cache dump routine
+ cm.cache.Purge() // Clear the cache
+
+ // Delete the cache file if it exists
+ if _, err := os.Stat(cm.cacheFile); err == nil {
+ if err := os.Remove(cm.cacheFile); err == nil {
+ logger.Infof("The cacheFile [%s] was cleared",
cm.cacheFile)
+ }
+ }
+}
diff --git a/registry/servicediscovery/store/cache_manager_test.go
b/registry/servicediscovery/store/cache_manager_test.go
new file mode 100644
index 000000000..d02675959
--- /dev/null
+++ b/registry/servicediscovery/store/cache_manager_test.go
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package store
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestCacheManager(t *testing.T) {
+ cm, err := NewCacheManager("test", "test_cache", time.Second, 10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+
+ // Test Set and Get
+ cm.Set("key1", "value1")
+ cm.Set("key2", "value2")
+ value, ok := cm.Get("key1")
+ if !ok {
+ t.Errorf("failed to get key1: %v", err)
+ }
+ if value != "value1" {
+ t.Errorf("unexpected Value for key1: got %v, want %v", value,
"value1")
+ }
+
+ // Test Delete
+ cm.Delete("key2")
+ _, ok = cm.Get("key2")
+ if ok {
+ t.Errorf("key2 was not removed from cache")
+ }
+
+ // Test GetAll
+ cm.Set("key3", "value3")
+ all := cm.GetAll()
+ if len(all) != 2 {
+ t.Errorf("unexpected number of items in cache: got %d, want
%d", len(all), 2)
+ }
+
+ // Test cache file creation and loading
+ cm2, err := NewCacheManager("test2", "nonexistent_cache", time.Second,
10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+ cm2.Set("key4", "value4")
+ cm2.dumpCache()
+ time.Sleep(time.Second * 4)
+ cm3, err := NewCacheManager("test3", "nonexistent_cache", time.Second,
10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+ all3 := cm3.GetAll()
+ if len(all3) != 1 {
+ t.Errorf("unexpected number of items in cache: got %d, want
%d", len(all3), 1)
+ }
+ _, ok = cm3.Get("key4")
+ if !ok {
+ t.Errorf("failed to get key4: %v", err)
+ }
+ cm3.destroy()
+ cm2.destroy()
+ cm.destroy() // clear cache file
+}
+
+func TestMetaInfoCacheManager(t *testing.T) {
+
+ serverInfo := make(map[string]*common.ServiceInfo)
+ serverInfo["1"] = common.NewServiceInfo("1", "1", "1", "1", "1",
make(map[string]string))
+ serverInfo["2"] = common.NewServiceInfo("2", "2", "2", "2", "2",
make(map[string]string))
+ serverInfo["3"] = common.NewServiceInfo("3", "3", "3", "3", "3",
make(map[string]string))
+
+ metadataInfo1 := common.NewMetadataInfo("1", "1", serverInfo)
+ metadataInfo2 := common.NewMetadataInfo("2", "2", serverInfo)
+ metadataInfo3 := common.NewMetadataInfo("3", "3", serverInfo)
+ metadataInfo4 := common.NewMetadataInfo("4", "4", serverInfo)
+
+ cm, err := NewCacheManager("metaTest1", "test_meta_cache", time.Second,
10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+
+ // Test Set and Get
+ cm.Set("key1", metadataInfo1)
+ cm.Set("key2", metadataInfo2)
+ value, ok := cm.Get("key1")
+ if !ok {
+ t.Errorf("failed to get key1: %v", err)
+ }
+ if value != metadataInfo1 {
+ t.Errorf("unexpected Value for key1: got %v, want %v", value,
"value1")
+ }
+
+ // Test Delete
+ cm.Delete("key2")
+ _, ok = cm.Get("key2")
+ if ok {
+ t.Errorf("key2 was not removed from cache")
+ }
+
+ // Test GetAll
+ cm.Set("key3", metadataInfo3)
+ all := cm.GetAll()
+ if len(all) != 2 {
+ t.Errorf("unexpected number of items in cache: got %d, want
%d", len(all), 2)
+ }
+
+ // Test cache file creation and loading
+ cm2, err := NewCacheManager("metaTest2", "nonexistent_meta_cache",
time.Second, 10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+ cm2.Set("key4", metadataInfo4)
+ cm2.dumpCache()
+ time.Sleep(time.Second * 4)
+ cm3, err := NewCacheManager("test3", "nonexistent_meta_cache",
time.Second, 10)
+ if err != nil {
+ t.Fatalf("failed to create cache manager: %v", err)
+ }
+ all3 := cm3.GetAll()
+ if len(all3) != 1 {
+ t.Errorf("unexpected number of items in cache: got %d, want
%d", len(all3), 1)
+ }
+ _, ok = cm3.Get("key4")
+ if !ok {
+ t.Errorf("failed to get key4: %v", err)
+ }
+ cm3.destroy()
+ cm2.destroy()
+ cm.destroy() // clear cache file
+}