This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new fb1863c  Add support for PostgreSQL as store engine (#255)
fb1863c is described below

commit fb1863cbaf28743a4181965c6432503c9f09f92b
Author: Boris Martinovic <[email protected]>
AuthorDate: Mon Feb 3 05:08:34 2025 +0100

    Add support for PostgreSQL as store engine (#255)
---
 go.mod                                     |   1 +
 go.sum                                     |   5 +
 scripts/docker/docker-compose.yml          |  12 ++
 scripts/docker/pg-dockerfile/Dockerfile    |  33 ++++
 scripts/docker/pg-init-scripts/init.sql    |  52 +++++
 store/engine/postgresql/postgresql.go      | 300 +++++++++++++++++++++++++++++
 store/engine/postgresql/postgresql_test.go | 124 ++++++++++++
 7 files changed, 527 insertions(+)

diff --git a/go.mod b/go.mod
index 0bfdc29..b739a8f 100644
--- a/go.mod
+++ b/go.mod
@@ -61,6 +61,7 @@ require (
        github.com/klauspost/compress v1.17.11 // indirect
        github.com/klauspost/cpuid/v2 v2.2.9 // indirect
        github.com/leodido/go-urn v1.4.0 // indirect
+       github.com/lib/pq v1.10.9
        github.com/mattn/go-colorable v0.1.14 // indirect
        github.com/mattn/go-isatty v0.0.20 // indirect
        github.com/mattn/go-runewidth v0.0.16 // indirect
diff --git a/go.sum b/go.sum
index 5548735..696dbfe 100644
--- a/go.sum
+++ b/go.sum
@@ -183,6 +183,11 @@ github.com/kylelemons/godebug v1.1.0 
h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
 github.com/kylelemons/godebug v1.1.0/go.mod 
h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
 github.com/leodido/go-urn v1.4.0 
h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
 github.com/leodido/go-urn v1.4.0/go.mod 
h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
+github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
+github.com/lib/pq v1.10.9/go.mod 
h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
+github.com/mattn/go-colorable v0.0.9/go.mod 
h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-colorable v0.1.4/go.mod 
h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
+github.com/mattn/go-colorable v0.1.6/go.mod 
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-colorable v0.1.9/go.mod 
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-colorable v0.1.12/go.mod 
h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
 github.com/mattn/go-colorable v0.1.14 
h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
diff --git a/scripts/docker/docker-compose.yml 
b/scripts/docker/docker-compose.yml
index 7a7450b..587aa5d 100644
--- a/scripts/docker/docker-compose.yml
+++ b/scripts/docker/docker-compose.yml
@@ -76,3 +76,15 @@ services:
     ports:
       - "8500:8500"
     command: "agent -dev -client=0.0.0.0"
+
+  postgres0:
+    build: ./pg-dockerfile
+    container_name: postgres0
+    environment:
+      POSTGRES_USER: postgres
+      POSTGRES_PASSWORD: postgres
+      POSTGRES_DB: testdb
+    ports:
+      - "5432:5432"
+    volumes:
+      - ./pg-init-scripts:/docker-entrypoint-initdb.d
diff --git a/scripts/docker/pg-dockerfile/Dockerfile 
b/scripts/docker/pg-dockerfile/Dockerfile
new file mode 100644
index 0000000..bf9c45d
--- /dev/null
+++ b/scripts/docker/pg-dockerfile/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+FROM postgres:16
+
+RUN apt-get update && apt-get install -y \
+    postgresql-server-dev-16 \
+    build-essential \
+    libpq-dev \
+    wget
+
+RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.5.tar.gz \
+    && tar -xvzf v1.6.5.tar.gz \
+    && cd pg_cron-1.6.5 \
+    && make && make install \
+    && cd .. && rm -rf v1.6.5.tar.gz pg_cron-1.6.5
+
+RUN echo "shared_preload_libraries = 'pg_cron'" >> 
/usr/share/postgresql/postgresql.conf.sample \
+    && echo "cron.database_name = 'testdb'" >> 
/usr/share/postgresql/postgresql.conf.sample
\ No newline at end of file
diff --git a/scripts/docker/pg-init-scripts/init.sql 
b/scripts/docker/pg-init-scripts/init.sql
new file mode 100644
index 0000000..db51808
--- /dev/null
+++ b/scripts/docker/pg-init-scripts/init.sql
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE TABLE locks (
+    name TEXT PRIMARY KEY,
+    leaderID TEXT NOT NULL
+);
+
+CREATE TABLE kv (
+    key TEXT PRIMARY KEY,
+    value BYTEA
+);
+
+CREATE OR REPLACE FUNCTION notify_changes()
+RETURNS TRIGGER AS $$
+BEGIN
+    IF TG_OP = 'INSERT' THEN
+        PERFORM cron.schedule('delete_' || NEW.name, '6 seconds', 
FORMAT('DELETE FROM locks WHERE name = %L', NEW.name));
+        PERFORM pg_notify('lock_change', 'INSERT:' || NEW.leaderID::text);
+    END IF;
+
+    IF TG_OP = 'DELETE' THEN
+        PERFORM cron.unschedule('delete_' || OLD.name);
+        PERFORM pg_notify('lock_change', 'DELETE:' || OLD.leaderID::text);
+    END IF;
+
+    RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER lock_change_trigger
+AFTER INSERT OR DELETE ON locks
+FOR EACH ROW EXECUTE FUNCTION notify_changes();
+
+CREATE EXTENSION IF NOT EXISTS pg_cron;
\ No newline at end of file
diff --git a/store/engine/postgresql/postgresql.go 
b/store/engine/postgresql/postgresql.go
new file mode 100644
index 0000000..8caadd5
--- /dev/null
+++ b/store/engine/postgresql/postgresql.go
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package postgresql
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/kvrocks-controller/consts"
+       "github.com/apache/kvrocks-controller/logger"
+       "github.com/apache/kvrocks-controller/store/engine"
+       "github.com/lib/pq"
+       "go.uber.org/zap"
+)
+
+const (
+       // Need to modify the cron schedule timeout accordingly in init.sql 
before changing the lockTTL
+       lockTTL                      = 6 * time.Second
+       listenerMinReconnectInterval = 10 * time.Second
+       listenerMaxReconnectInterval = 1 * time.Minute
+       defaultElectPath             = "/kvrocks/controller/leader"
+)
+
+type Config struct {
+       Addrs         []string `yaml:"addrs"`
+       Username      string   `yaml:"username"`
+       Password      string   `yaml:"password"`
+       DBName        string   `yaml:"db_name"`
+       NotifyChannel string   `yaml:"notify_channel"`
+       ElectPath     string   `yaml:"elect_path"`
+}
+
+type Postgresql struct {
+       db       *sql.DB
+       listener *pq.Listener
+
+       leaderMu  sync.Mutex
+       leaderID  string
+       myID      string
+       electPath string
+       isReady   atomic.Bool
+
+       quitCh         chan struct{}
+       wg             sync.WaitGroup
+       lockReleaseCh  chan bool
+       leaderChangeCh chan bool
+}
+
+func New(id string, cfg *Config) (*Postgresql, error) {
+       if len(id) == 0 {
+               return nil, errors.New("id must NOT be a empty string")
+       }
+
+       connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", 
cfg.Username, cfg.Password, cfg.Addrs[0], cfg.DBName)
+       db, err := sql.Open("postgres", connStr)
+       if err != nil {
+               return nil, err
+       }
+
+       listener := pq.NewListener(connStr, listenerMinReconnectInterval, 
listenerMaxReconnectInterval, nil)
+       err = listener.Listen(cfg.NotifyChannel)
+       if err != nil {
+               return nil, err
+       }
+
+       electPath := defaultElectPath
+       if cfg.ElectPath != "" {
+               electPath = defaultElectPath
+       }
+
+       p := &Postgresql{
+               myID:           id,
+               electPath:      electPath,
+               db:             db,
+               listener:       listener,
+               quitCh:         make(chan struct{}),
+               lockReleaseCh:  make(chan bool),
+               leaderChangeCh: make(chan bool),
+       }
+       err = p.initLeaderId()
+       if err != nil {
+               return nil, err
+       }
+       p.isReady.Store(false)
+       p.wg.Add(2)
+       go p.electLoop()
+       go p.observeLeaderEvent()
+       return p, nil
+}
+
+func (p *Postgresql) ID() string {
+       return p.myID
+}
+
+func (p *Postgresql) Leader() string {
+       p.leaderMu.Lock()
+       defer p.leaderMu.Unlock()
+       return p.leaderID
+}
+
+func (p *Postgresql) LeaderChange() <-chan bool {
+       return p.leaderChangeCh
+}
+
+func (p *Postgresql) IsReady(ctx context.Context) bool {
+       for {
+               select {
+               case <-p.quitCh:
+                       return false
+               case <-time.After(100 * time.Millisecond):
+                       if p.isReady.Load() {
+                               return true
+                       }
+               case <-ctx.Done():
+                       return p.isReady.Load()
+               }
+       }
+}
+
+func (p *Postgresql) Get(ctx context.Context, key string) ([]byte, error) {
+       var value []byte
+       query := "SELECT value FROM kv WHERE key = $1"
+
+       row := p.db.QueryRow(query, key)
+       err := row.Scan(&value)
+       if errors.Is(err, sql.ErrNoRows) {
+               return nil, consts.ErrNotFound
+       }
+       if err != nil {
+               return nil, err
+       }
+       return value, nil
+}
+
+func (p *Postgresql) Exists(ctx context.Context, key string) (bool, error) {
+       _, err := p.Get(ctx, key)
+       if err != nil {
+               if errors.Is(err, consts.ErrNotFound) {
+                       return false, nil
+               }
+               return false, err
+       }
+       return true, nil
+}
+
+func (p *Postgresql) Set(ctx context.Context, key string, value []byte) error {
+       query := "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) 
DO UPDATE SET value = EXCLUDED.value"
+       _, err := p.db.Exec(query, key, value)
+       return err
+}
+
+func (p *Postgresql) Delete(ctx context.Context, key string) error {
+       query := "DELETE FROM kv WHERE key = $1"
+       _, err := p.db.Exec(query, key)
+       return err
+}
+
+func (p *Postgresql) List(ctx context.Context, prefix string) ([]engine.Entry, 
error) {
+       prefixWithWildcard := prefix + "%"
+       query := "SELECT key, value from kv WHERE key LIKE $1"
+       rows, err := p.db.Query(query, prefixWithWildcard)
+       if err != nil {
+               return nil, err
+       }
+       defer rows.Close()
+
+       prefixLen := len(prefix)
+       entries := make([]engine.Entry, 0)
+       for rows.Next() {
+               var key string
+               var value []byte
+
+               err := rows.Scan(&key, &value)
+               if err != nil {
+                       return nil, err
+               }
+
+               if key == prefix {
+                       continue
+               }
+
+               key = strings.TrimLeft(key[prefixLen+1:], "/")
+               if strings.ContainsRune(key, '/') {
+                       continue
+               }
+               entries = append(entries, engine.Entry{
+                       Key:   key,
+                       Value: value,
+               })
+       }
+
+       if err := rows.Err(); err != nil {
+               return nil, err
+       }
+
+       return entries, nil
+}
+
+func (p *Postgresql) electLoop() {
+       defer p.wg.Done()
+       for {
+               select {
+               case <-p.quitCh:
+                       return
+               default:
+               }
+
+               query := "INSERT INTO locks (name, leaderID) VALUES ($1, $2) ON 
CONFLICT DO NOTHING"
+               _, err := p.db.Exec(query, p.electPath, p.myID)
+               if err != nil {
+                       time.Sleep(lockTTL / 3)
+                       continue
+               }
+
+               select {
+               case <-p.lockReleaseCh:
+                       continue
+               case <-p.quitCh:
+                       return
+               }
+       }
+}
+
+func (p *Postgresql) observeLeaderEvent() {
+       defer p.wg.Done()
+
+       for {
+               select {
+               case <-p.quitCh:
+                       return
+               case notification := <-p.listener.Notify:
+                       p.isReady.Store(true)
+
+                       data := strings.SplitN(notification.Extra, ":", 2)
+                       if len(data) != 2 {
+                               logger.Get().With(
+                                       zap.Error(fmt.Errorf("failed to parse 
notification data: expected two parts separated by a colon")),
+                               ).Error("Failed to parse notification data")
+                       }
+
+                       operation := data[0]
+                       leaderID := data[1]
+
+                       if operation == "INSERT" {
+                               p.leaderMu.Lock()
+                               p.leaderID = leaderID
+                               p.leaderMu.Unlock()
+                               p.leaderChangeCh <- true
+                       } else {
+                               p.lockReleaseCh <- true
+                       }
+               }
+       }
+}
+
+func (p *Postgresql) initLeaderId() error {
+       var leaderId string
+       query := "SELECT leaderID FROM locks WHERE name = $1"
+       row := p.db.QueryRow(query, p.electPath)
+       err := row.Scan(&leaderId)
+       if errors.Is(err, sql.ErrNoRows) {
+               p.leaderID = ""
+               return nil
+       }
+       if err != nil {
+               return err
+       }
+       p.leaderID = leaderId
+       return nil
+}
+
+func (p *Postgresql) Close() error {
+       close(p.quitCh)
+       p.wg.Wait()
+       p.listener.Close()
+       return p.db.Close()
+}
diff --git a/store/engine/postgresql/postgresql_test.go 
b/store/engine/postgresql/postgresql_test.go
new file mode 100644
index 0000000..57617f4
--- /dev/null
+++ b/store/engine/postgresql/postgresql_test.go
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package postgresql
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks-controller/util"
+
+       "github.com/stretchr/testify/require"
+)
+
+const (
+       addr          = "127.0.0.1:5432"
+       notifyChannel = "lock_change"
+       username      = "postgres"
+       password      = "postgres"
+       dbName        = "testdb"
+)
+
+func TestBasicOperations(t *testing.T) {
+       id := util.RandString(40)
+       testElectPath := util.RandString(32)
+       persist, err := New(id, &Config{
+               Username:      username,
+               Password:      password,
+               DBName:        dbName,
+               NotifyChannel: notifyChannel,
+               ElectPath:     testElectPath,
+               Addrs:         []string{addr},
+       })
+       require.NoError(t, err)
+       defer persist.Close()
+       go func() {
+               for range persist.LeaderChange() {
+                       // do nothing
+               }
+       }()
+
+       ctx := context.Background()
+       keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
+       value := []byte("v")
+       for _, key := range keys {
+               require.NoError(t, persist.Set(ctx, key, value))
+               gotValue, err := persist.Get(ctx, key)
+               require.NoError(t, err)
+               require.Equal(t, value, gotValue)
+       }
+       entries, err := persist.List(ctx, "/a/b")
+       require.NoError(t, err)
+       require.Equal(t, len(keys), len(entries))
+       for _, key := range keys {
+               require.NoError(t, persist.Delete(ctx, key))
+       }
+}
+
+func TestElect(t *testing.T) {
+       endpoints := []string{addr}
+
+       testElectPath := util.RandString(32)
+       id0 := util.RandString(40)
+       node0, err := New(id0, &Config{
+               Username:      username,
+               Password:      password,
+               DBName:        dbName,
+               NotifyChannel: notifyChannel,
+               ElectPath:     testElectPath,
+               Addrs:         endpoints,
+       })
+       require.NoError(t, err)
+       require.Eventuallyf(t, func() bool {
+               return node0.Leader() == node0.myID
+       }, 10*time.Second, 100*time.Millisecond, "node0 should be the leader")
+
+       id1 := util.RandString(40)
+       node1, err := New(id1, &Config{
+               Username:      username,
+               Password:      password,
+               DBName:        dbName,
+               NotifyChannel: notifyChannel,
+               ElectPath:     testElectPath,
+               Addrs:         endpoints,
+       })
+       require.NoError(t, err)
+       require.Eventuallyf(t, func() bool {
+               return node1.Leader() == node0.myID
+       }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the 
node0")
+
+       go func() {
+               for {
+                       select {
+                       case <-node0.LeaderChange():
+                               // do nothing
+                       case <-node1.LeaderChange():
+                               // do nothing
+                       }
+               }
+       }()
+
+       require.NoError(t, node0.Close())
+
+       require.Eventuallyf(t, func() bool {
+               return node1.Leader() == node1.myID
+       }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
+}

Reply via email to