This is an automated email from the ASF dual-hosted git repository.

justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e78aaf53 Implement meta cache (#2371)
1e78aaf53 is described below

commit 1e78aaf53cf29285a878a99ddeb46585e680944f
Author: finalt <[email protected]>
AuthorDate: Fri Aug 11 17:06:09 2023 +0800

    Implement meta cache (#2371)
    
    * implement cache manager
    
    * add meta cache
    
    * updata go mod
    
    * improve code
    
    * improve code
    
    * improve init time
    
    * update cache
    
    * fix bug
    
    * improve code
    
    * improve code
    
    * improve code
---
 common/constant/key.go                             |   7 +
 go.mod                                             |   1 +
 go.sum                                             |   1 +
 .../service_instances_changed_listener_impl.go     |  36 +++-
 registry/servicediscovery/store/cache_manager.go   | 182 +++++++++++++++++++++
 .../servicediscovery/store/cache_manager_test.go   | 150 +++++++++++++++++
 6 files changed, 374 insertions(+), 3 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index b786b9951..383b57e57 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -416,3 +416,10 @@ const (
        MetricsMetadata = "dubbo.metrics.metadata"
        MetricApp       = "dubbo.metrics.app"
 )
+
+// default meta cache config
+const (
+       DefaultMetaCacheName = "dubbo.meta"
+       DefaultMetaFileName  = "dubbo.metadata"
+       DefaultEntrySize     = 100
+)
diff --git a/go.mod b/go.mod
index d66112570..89764c71f 100644
--- a/go.mod
+++ b/go.mod
@@ -30,6 +30,7 @@ require (
        github.com/google/uuid v1.3.0
        github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // 
indirect
        github.com/grpc-ecosystem/grpc-opentracing 
v0.0.0-20180507213350-8e809c8a8645
+       github.com/hashicorp/golang-lru v0.5.4
        github.com/hashicorp/vault/sdk v0.7.0
        github.com/influxdata/tdigest v0.0.1
        github.com/jinzhu/copier v0.3.5
diff --git a/go.sum b/go.sum
index 5390019e4..3c47c9d14 100644
--- a/go.sum
+++ b/go.sum
@@ -801,6 +801,7 @@ github.com/hashicorp/go-version v1.2.0/go.mod 
h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
 github.com/hashicorp/go.net v0.0.1/go.mod 
h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
 github.com/hashicorp/golang-lru v0.5.0/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4 
h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
 github.com/hashicorp/golang-lru v0.5.4/go.mod 
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod 
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
diff --git 
a/registry/servicediscovery/service_instances_changed_listener_impl.go 
b/registry/servicediscovery/service_instances_changed_listener_impl.go
index a0039bdb2..e0714591a 100644
--- a/registry/servicediscovery/service_instances_changed_listener_impl.go
+++ b/registry/servicediscovery/service_instances_changed_listener_impl.go
@@ -19,6 +19,7 @@ package servicediscovery
 
 import (
        "reflect"
+       "time"
 )
 
 import (
@@ -32,9 +33,22 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/registry"
+       "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/store"
        "dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
+var (
+       metaCache *store.CacheManager
+)
+
+func init() {
+       cache, err := store.NewCacheManager(constant.DefaultMetaCacheName, 
constant.DefaultMetaFileName, time.Minute*10, constant.DefaultEntrySize)
+       if err != nil {
+               logger.Fatal("Failed to create cache [%s],the err is %v", 
constant.DefaultMetaCacheName, err)
+       }
+       metaCache = cache
+}
+
 // ServiceInstancesChangedListenerImpl The Service Discovery Changed  Event 
Listener
 type ServiceInstancesChangedListenerImpl struct {
        serviceNames       *gxset.HashSet
@@ -88,9 +102,14 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e 
observer.Event) error
                        revisionToInstances[revision] = append(subInstances, 
instance)
                        metadataInfo := lstn.revisionToMetadata[revision]
                        if metadataInfo == nil {
-                               metadataInfo, err = GetMetadataInfo(instance, 
revision)
-                               if err != nil {
-                                       return err
+                               if val, ok := metaCache.Get(revision); ok {
+                                       metadataInfo = 
val.(*common.MetadataInfo)
+                               } else {
+                                       metadataInfo, err = 
GetMetadataInfo(instance, revision)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       metaCache.Set(revision, metadataInfo)
                                }
                        }
                        instance.SetServiceMetadata(metadataInfo)
@@ -104,6 +123,9 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e 
observer.Event) error
                        newRevisionToMetadata[revision] = metadataInfo
                }
                lstn.revisionToMetadata = newRevisionToMetadata
+               for revision, metadataInfo := range newRevisionToMetadata {
+                       metaCache.Set(revision, metadataInfo)
+               }
 
                for serviceInfo, revisions := range localServiceToRevisions {
                        revisionsToUrls := 
protocolRevisionsToUrls[serviceInfo.Protocol]
@@ -187,6 +209,11 @@ func (lstn *ServiceInstancesChangedListenerImpl) 
GetEventType() reflect.Type {
 
 // GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is 
null
 func GetMetadataInfo(instance registry.ServiceInstance, revision string) 
(*common.MetadataInfo, error) {
+
+       if metadataInfo, ok := metaCache.Get(revision); ok {
+               return metadataInfo.(*common.MetadataInfo), nil
+       }
+
        var metadataStorageType string
        var metadataInfo *common.MetadataInfo
        if instance.GetMetadata() == nil {
@@ -212,5 +239,8 @@ func GetMetadataInfo(instance registry.ServiceInstance, 
revision string) (*commo
                        return nil, err
                }
        }
+
+       metaCache.Set(revision, metadataInfo)
+
        return metadataInfo, nil
 }
diff --git a/registry/servicediscovery/store/cache_manager.go 
b/registry/servicediscovery/store/cache_manager.go
new file mode 100644
index 000000000..861e51dc4
--- /dev/null
+++ b/registry/servicediscovery/store/cache_manager.go
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package store
+
+import (
+       "encoding/gob"
+       "os"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/dubbogo/gost/log/logger"
+
+       "github.com/hashicorp/golang-lru"
+)
+
+type CacheManager struct {
+       name         string        // The name of the cache manager
+       cacheFile    string        // The file path where the cache is stored
+       dumpInterval time.Duration // The duration after which the cache dump
+       stop         chan struct{} // Channel used to stop the cache expiration 
routine
+       cache        *lru.Cache    // The LRU cache implementation
+       lock         sync.Mutex
+}
+
+type Item struct {
+       Key   string
+       Value interface{}
+}
+
+// NewCacheManager creates a new CacheManager instance.
+// It initializes the cache manager with the provided parameters and starts a 
routine for cache expiration.
+func NewCacheManager(name, cacheFile string, dumpInterval time.Duration, 
maxCacheSize int) (*CacheManager, error) {
+       cm := &CacheManager{
+               name:         name,
+               cacheFile:    cacheFile,
+               dumpInterval: dumpInterval,
+               stop:         make(chan struct{}),
+       }
+       cache, err := lru.New(maxCacheSize)
+       if err != nil {
+               return nil, err
+       }
+       cm.cache = cache
+
+       // Check if the cache file exists and load the cache if it does
+       if _, err := os.Stat(cacheFile); err == nil {
+               if err = cm.loadCache(); err != nil {
+                       logger.Warnf("Failed to load the cache file:[%s].The 
err is %v", cm.cacheFile, err)
+               }
+       }
+
+       go cm.RunDumpTask()
+
+       return cm, nil
+}
+
+// Get retrieves the value associated with the given key from the cache.
+func (cm *CacheManager) Get(key string) (interface{}, bool) {
+       return cm.cache.Get(key)
+}
+
+// Set sets the value associated with the given key in the cache.
+func (cm *CacheManager) Set(key string, value interface{}) {
+       cm.cache.Add(key, value)
+}
+
+// Delete removes the value associated with the given key from the cache.
+func (cm *CacheManager) Delete(key string) {
+       cm.cache.Remove(key)
+}
+
+// GetAll returns all the key-value pairs in the cache.
+func (cm *CacheManager) GetAll() map[string]interface{} {
+       keys := cm.cache.Keys()
+
+       result := make(map[string]interface{})
+       for _, k := range keys {
+               result[k.(string)], _ = cm.cache.Get(k)
+       }
+
+       return result
+}
+
+// loadCache loads the cache from the cache file.
+func (cm *CacheManager) loadCache() error {
+       cf, err := os.Open(cm.cacheFile)
+       if err != nil {
+               return err
+       }
+
+       decoder := gob.NewDecoder(cf)
+       for {
+               var it Item
+               err = decoder.Decode(&it)
+               if err != nil {
+                       if err.Error() == "EOF" {
+                               break // Reached end of file
+                       }
+                       return err
+               }
+               // Add the loaded keys to the front of the LRU list
+               cm.cache.Add(it.Key, it.Value)
+       }
+
+       return cf.Close()
+}
+
+// dumpCache dumps the cache to the cache file.
+func (cm *CacheManager) dumpCache() error {
+
+       cm.lock.Lock()
+       defer cm.lock.Unlock()
+
+       items := cm.GetAll()
+
+       file, err := os.Create(cm.cacheFile)
+       if err != nil {
+               return err
+       }
+
+       encoder := gob.NewEncoder(file)
+       for k, v := range items {
+               gob.Register(v)
+               err = encoder.Encode(&Item{
+                       Key:   k,
+                       Value: v,
+               })
+               if err != nil {
+                       return err
+               }
+       }
+       return file.Close()
+}
+
+func (cm *CacheManager) RunDumpTask() {
+       ticker := time.NewTicker(cm.dumpInterval)
+       for {
+               select {
+               case <-ticker.C:
+                       // Dump the cache to the file
+                       if err := cm.dumpCache(); err != nil {
+                               // Handle error
+                               logger.Warnf("Failed to dump cache,the err is 
%v", err)
+                       } else {
+                               logger.Infof("Dumping [%s] caches, latest 
entries %d", cm.name, cm.cache.Len())
+                       }
+               case <-cm.stop:
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+// destroy stops the cache dump routine, clears the cache and removes the 
cache file.
+func (cm *CacheManager) destroy() {
+       cm.stop <- struct{}{} // Stop the cache dump routine
+       cm.cache.Purge()      // Clear the cache
+
+       // Delete the cache file if it exists
+       if _, err := os.Stat(cm.cacheFile); err == nil {
+               if err := os.Remove(cm.cacheFile); err == nil {
+                       logger.Infof("The cacheFile [%s] was cleared", 
cm.cacheFile)
+               }
+       }
+}
diff --git a/registry/servicediscovery/store/cache_manager_test.go 
b/registry/servicediscovery/store/cache_manager_test.go
new file mode 100644
index 000000000..d02675959
--- /dev/null
+++ b/registry/servicediscovery/store/cache_manager_test.go
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package store
+
+import (
+       "testing"
+       "time"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestCacheManager(t *testing.T) {
+       cm, err := NewCacheManager("test", "test_cache", time.Second, 10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+
+       // Test Set and Get
+       cm.Set("key1", "value1")
+       cm.Set("key2", "value2")
+       value, ok := cm.Get("key1")
+       if !ok {
+               t.Errorf("failed to get key1: %v", err)
+       }
+       if value != "value1" {
+               t.Errorf("unexpected Value for key1: got %v, want %v", value, 
"value1")
+       }
+
+       // Test Delete
+       cm.Delete("key2")
+       _, ok = cm.Get("key2")
+       if ok {
+               t.Errorf("key2 was not removed from cache")
+       }
+
+       // Test GetAll
+       cm.Set("key3", "value3")
+       all := cm.GetAll()
+       if len(all) != 2 {
+               t.Errorf("unexpected number of items in cache: got %d, want 
%d", len(all), 2)
+       }
+
+       // Test cache file creation and loading
+       cm2, err := NewCacheManager("test2", "nonexistent_cache", time.Second, 
10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+       cm2.Set("key4", "value4")
+       cm2.dumpCache()
+       time.Sleep(time.Second * 4)
+       cm3, err := NewCacheManager("test3", "nonexistent_cache", time.Second, 
10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+       all3 := cm3.GetAll()
+       if len(all3) != 1 {
+               t.Errorf("unexpected number of items in cache: got %d, want 
%d", len(all3), 1)
+       }
+       _, ok = cm3.Get("key4")
+       if !ok {
+               t.Errorf("failed to get key4: %v", err)
+       }
+       cm3.destroy()
+       cm2.destroy()
+       cm.destroy() // clear cache file
+}
+
+func TestMetaInfoCacheManager(t *testing.T) {
+
+       serverInfo := make(map[string]*common.ServiceInfo)
+       serverInfo["1"] = common.NewServiceInfo("1", "1", "1", "1", "1", 
make(map[string]string))
+       serverInfo["2"] = common.NewServiceInfo("2", "2", "2", "2", "2", 
make(map[string]string))
+       serverInfo["3"] = common.NewServiceInfo("3", "3", "3", "3", "3", 
make(map[string]string))
+
+       metadataInfo1 := common.NewMetadataInfo("1", "1", serverInfo)
+       metadataInfo2 := common.NewMetadataInfo("2", "2", serverInfo)
+       metadataInfo3 := common.NewMetadataInfo("3", "3", serverInfo)
+       metadataInfo4 := common.NewMetadataInfo("4", "4", serverInfo)
+
+       cm, err := NewCacheManager("metaTest1", "test_meta_cache", time.Second, 
10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+
+       // Test Set and Get
+       cm.Set("key1", metadataInfo1)
+       cm.Set("key2", metadataInfo2)
+       value, ok := cm.Get("key1")
+       if !ok {
+               t.Errorf("failed to get key1: %v", err)
+       }
+       if value != metadataInfo1 {
+               t.Errorf("unexpected Value for key1: got %v, want %v", value, 
"value1")
+       }
+
+       // Test Delete
+       cm.Delete("key2")
+       _, ok = cm.Get("key2")
+       if ok {
+               t.Errorf("key2 was not removed from cache")
+       }
+
+       // Test GetAll
+       cm.Set("key3", metadataInfo3)
+       all := cm.GetAll()
+       if len(all) != 2 {
+               t.Errorf("unexpected number of items in cache: got %d, want 
%d", len(all), 2)
+       }
+
+       // Test cache file creation and loading
+       cm2, err := NewCacheManager("metaTest2", "nonexistent_meta_cache", 
time.Second, 10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+       cm2.Set("key4", metadataInfo4)
+       cm2.dumpCache()
+       time.Sleep(time.Second * 4)
+       cm3, err := NewCacheManager("test3", "nonexistent_meta_cache", 
time.Second, 10)
+       if err != nil {
+               t.Fatalf("failed to create cache manager: %v", err)
+       }
+       all3 := cm3.GetAll()
+       if len(all3) != 1 {
+               t.Errorf("unexpected number of items in cache: got %d, want 
%d", len(all3), 1)
+       }
+       _, ok = cm3.Get("key4")
+       if !ok {
+               t.Errorf("failed to get key4: %v", err)
+       }
+       cm3.destroy()
+       cm2.destroy()
+       cm.destroy() // clear cache file
+}

Reply via email to