This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new d891105 Remove the migrate and failover history from the storage
(#121)
d891105 is described below
commit d891105260314252b763612174c2ff6cd1e11245
Author: hulk <[email protected]>
AuthorDate: Mon Oct 9 19:30:20 2023 +0800
Remove the migrate and failover history from the storage (#121)
Remove the migrate and failover history from the stroage
Currently, all failover/migrate operations will be recorded in storage
which may slowdown the service if users frequently migrate or failover.
So it'd be better to remove history records before thinking through
how do we use and retain them.
---
controller/failover/cluster.go | 1 -
controller/failover/failover.go | 2 --
controller/migrate/migrator.go | 2 --
storage/failover.go | 26 --------------------------
storage/helper.go | 16 ----------------
storage/migrate.go | 35 -----------------------------------
6 files changed, 82 deletions(-)
diff --git a/controller/failover/cluster.go b/controller/failover/cluster.go
index f08d640..28b0697 100644
--- a/controller/failover/cluster.go
+++ b/controller/failover/cluster.go
@@ -207,5 +207,4 @@ func (c *Cluster) promoteMaster(ctx context.Context, task
*storage.FailoverTask)
}
task.FinishTime = time.Now().Unix()
- _ = c.storage.AddFailOverHistory(ctx, task)
}
diff --git a/controller/failover/failover.go b/controller/failover/failover.go
index fa64266..aa06d33 100644
--- a/controller/failover/failover.go
+++ b/controller/failover/failover.go
@@ -140,8 +140,6 @@ func (f *FailOver) GetTasks(ctx context.Context, ns,
cluster string, queryType s
return nil, nil
}
return f.clusters[clusterKey].GetTasks()
- case "history":
- return f.storage.GetFailOverHistory(ctx, ns, cluster)
default:
return nil, errors.New("unknown query type")
}
diff --git a/controller/migrate/migrator.go b/controller/migrate/migrator.go
index 8fe7e06..854285e 100644
--- a/controller/migrate/migrator.go
+++ b/controller/migrate/migrator.go
@@ -229,7 +229,6 @@ func (m *Migrator) abortMigratingTask(ctx context.Context,
task *storage.Migrati
task.ErrorDetail = err.Error()
task.FinishTime = time.Now().Unix()
_ = m.removeMigratingTask(ctx, task)
- _ = m.storage.AddMigrateHistory(ctx, task)
logger.Get().With(
zap.Error(err),
zap.Any("task", task),
@@ -240,7 +239,6 @@ func (m *Migrator) finishMigratingTask(ctx context.Context,
task *storage.Migrat
task.Status = TaskStatusSuccess
task.FinishTime = time.Now().Unix()
_ = m.removeMigratingTask(ctx, task)
- _ = m.storage.AddMigrateHistory(ctx, task)
logger.Get().With(
zap.Any("task", task),
).Info("Success to migrate the slot")
diff --git a/storage/failover.go b/storage/failover.go
index d04a069..8b293e9 100644
--- a/storage/failover.go
+++ b/storage/failover.go
@@ -66,29 +66,3 @@ func (s *Storage) GetFailOverTask(ctx context.Context, ns,
cluster string) (*Fai
}
return &task, nil
}
-
-func (s *Storage) AddFailOverHistory(ctx context.Context, task *FailoverTask)
error {
- taskKey := buildFailOverHistoryKey(task.Namespace, task.Cluster,
task.Node.ID, task.QueuedTime)
- taskData, err := json.Marshal(task)
- if err != nil {
- return err
- }
- return s.persist.Set(ctx, taskKey, taskData)
-}
-
-func (s *Storage) GetFailOverHistory(ctx context.Context, ns, cluster string)
([]*FailoverTask, error) {
- prefixKey := buildFailOverHistoryPrefix(ns, cluster)
- entries, err := s.persist.List(ctx, prefixKey)
- if err != nil {
- return nil, err
- }
- tasks := make([]*FailoverTask, 0)
- for _, entry := range entries {
- var task FailoverTask
- if err = json.Unmarshal(entry.Value, &task); err != nil {
- return nil, err
- }
- tasks = append(tasks, &task)
- }
- return tasks, nil
-}
diff --git a/storage/helper.go b/storage/helper.go
index f1591f9..fe7a3b5 100644
--- a/storage/helper.go
+++ b/storage/helper.go
@@ -41,22 +41,6 @@ func buildMigratingKeyPrefix(ns, cluster string) string {
return fmt.Sprintf("%s/%s/migrate/doing", buildClusterPrefix(ns),
cluster)
}
-func buildMigrateHistoryKey(ns, cluster, taskID string) string {
- return fmt.Sprintf("%s/%s/migrate/history/%s", buildClusterPrefix(ns),
cluster, taskID)
-}
-
-func buildMigrateHistoryPrefix(ns, cluster string) string {
- return fmt.Sprintf("%s/%s/migrate/history/", buildClusterPrefix(ns),
cluster)
-}
-
func buildFailOverKey(ns, cluster string) string {
return fmt.Sprintf("%s/%s/failover/queue", buildClusterPrefix(ns),
cluster)
}
-
-func buildFailOverHistoryKey(ns, cluster, node string, ts int64) string {
- return fmt.Sprintf("%s/%s/failover/history/%d/%s",
buildClusterPrefix(ns), cluster, ts, node)
-}
-
-func buildFailOverHistoryPrefix(ns, cluster string) string {
- return fmt.Sprintf("%s/%s/failover/history/", buildClusterPrefix(ns),
cluster)
-}
diff --git a/storage/migrate.go b/storage/migrate.go
index 80523af..c04bd83 100644
--- a/storage/migrate.go
+++ b/storage/migrate.go
@@ -77,38 +77,3 @@ func (s *Storage) RemoveMigratingTask(ctx context.Context,
ns, cluster string) e
taskKey := buildMigratingKeyPrefix(ns, cluster)
return s.persist.Delete(ctx, taskKey)
}
-
-func (s *Storage) AddMigrateHistory(ctx context.Context, task *MigrationTask)
error {
- taskKey := buildMigrateHistoryKey(task.Namespace, task.Cluster,
task.TaskID)
- taskData, err := json.Marshal(task)
- if err != nil {
- return err
- }
- return s.persist.Set(ctx, taskKey, taskData)
-}
-
-func (s *Storage) GetMigrateHistory(ctx context.Context, ns, cluster string)
([]*MigrationTask, error) {
- prefixKey := buildMigrateHistoryPrefix(ns, cluster)
- entries, err := s.persist.List(ctx, prefixKey)
- if err != nil {
- return nil, err
- }
- var tasks []*MigrationTask
- for _, entry := range entries {
- var task MigrationTask
- if err = json.Unmarshal(entry.Value, &task); err != nil {
- return nil, err
- }
- tasks = append(tasks, &task)
- }
- return tasks, nil
-}
-
-func (s *Storage) IsMigrateHistoryExists(ctx context.Context, task
*MigrationTask) (bool, error) {
- taskKey := buildMigrateHistoryKey(task.Namespace, task.Cluster,
task.TaskID)
- value, err := s.persist.Get(ctx, taskKey)
- if err != nil {
- return false, err
- }
- return len(value) != 0, nil
-}