This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new fb1863c Add support for PostgreSQL as store engine (#255)
fb1863c is described below
commit fb1863cbaf28743a4181965c6432503c9f09f92b
Author: Boris Martinovic <[email protected]>
AuthorDate: Mon Feb 3 05:08:34 2025 +0100
Add support for PostgreSQL as store engine (#255)
---
go.mod | 1 +
go.sum | 5 +
scripts/docker/docker-compose.yml | 12 ++
scripts/docker/pg-dockerfile/Dockerfile | 33 ++++
scripts/docker/pg-init-scripts/init.sql | 52 +++++
store/engine/postgresql/postgresql.go | 300 +++++++++++++++++++++++++++++
store/engine/postgresql/postgresql_test.go | 124 ++++++++++++
7 files changed, 527 insertions(+)
diff --git a/go.mod b/go.mod
index 0bfdc29..b739a8f 100644
--- a/go.mod
+++ b/go.mod
@@ -61,6 +61,7 @@ require (
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
+ github.com/lib/pq v1.10.9
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
diff --git a/go.sum b/go.sum
index 5548735..696dbfe 100644
--- a/go.sum
+++ b/go.sum
@@ -183,6 +183,11 @@ github.com/kylelemons/godebug v1.1.0
h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod
h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leodido/go-urn v1.4.0
h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod
h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
+github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
+github.com/lib/pq v1.10.9/go.mod
h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
+github.com/mattn/go-colorable v0.0.9/go.mod
h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-colorable v0.1.4/go.mod
h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
+github.com/mattn/go-colorable v0.1.6/go.mod
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod
h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.14
h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
diff --git a/scripts/docker/docker-compose.yml
b/scripts/docker/docker-compose.yml
index 7a7450b..587aa5d 100644
--- a/scripts/docker/docker-compose.yml
+++ b/scripts/docker/docker-compose.yml
@@ -76,3 +76,15 @@ services:
ports:
- "8500:8500"
command: "agent -dev -client=0.0.0.0"
+
+ postgres0:
+ build: ./pg-dockerfile
+ container_name: postgres0
+ environment:
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_DB: testdb
+ ports:
+ - "5432:5432"
+ volumes:
+ - ./pg-init-scripts:/docker-entrypoint-initdb.d
diff --git a/scripts/docker/pg-dockerfile/Dockerfile
b/scripts/docker/pg-dockerfile/Dockerfile
new file mode 100644
index 0000000..bf9c45d
--- /dev/null
+++ b/scripts/docker/pg-dockerfile/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+FROM postgres:16
+
+RUN apt-get update && apt-get install -y \
+ postgresql-server-dev-16 \
+ build-essential \
+ libpq-dev \
+ wget
+
+RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.5.tar.gz \
+ && tar -xvzf v1.6.5.tar.gz \
+ && cd pg_cron-1.6.5 \
+ && make && make install \
+ && cd .. && rm -rf v1.6.5.tar.gz pg_cron-1.6.5
+
+RUN echo "shared_preload_libraries = 'pg_cron'" >>
/usr/share/postgresql/postgresql.conf.sample \
+ && echo "cron.database_name = 'testdb'" >>
/usr/share/postgresql/postgresql.conf.sample
\ No newline at end of file
diff --git a/scripts/docker/pg-init-scripts/init.sql
b/scripts/docker/pg-init-scripts/init.sql
new file mode 100644
index 0000000..db51808
--- /dev/null
+++ b/scripts/docker/pg-init-scripts/init.sql
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE TABLE locks (
+ name TEXT PRIMARY KEY,
+ leaderID TEXT NOT NULL
+);
+
+CREATE TABLE kv (
+ key TEXT PRIMARY KEY,
+ value BYTEA
+);
+
+CREATE OR REPLACE FUNCTION notify_changes()
+RETURNS TRIGGER AS $$
+BEGIN
+ IF TG_OP = 'INSERT' THEN
+ PERFORM cron.schedule('delete_' || NEW.name, '6 seconds',
FORMAT('DELETE FROM locks WHERE name = %L', NEW.name));
+ PERFORM pg_notify('lock_change', 'INSERT:' || NEW.leaderID::text);
+ END IF;
+
+ IF TG_OP = 'DELETE' THEN
+ PERFORM cron.unschedule('delete_' || OLD.name);
+ PERFORM pg_notify('lock_change', 'DELETE:' || OLD.leaderID::text);
+ END IF;
+
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER lock_change_trigger
+AFTER INSERT OR DELETE ON locks
+FOR EACH ROW EXECUTE FUNCTION notify_changes();
+
+CREATE EXTENSION IF NOT EXISTS pg_cron;
\ No newline at end of file
diff --git a/store/engine/postgresql/postgresql.go
b/store/engine/postgresql/postgresql.go
new file mode 100644
index 0000000..8caadd5
--- /dev/null
+++ b/store/engine/postgresql/postgresql.go
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package postgresql
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/kvrocks-controller/consts"
+ "github.com/apache/kvrocks-controller/logger"
+ "github.com/apache/kvrocks-controller/store/engine"
+ "github.com/lib/pq"
+ "go.uber.org/zap"
+)
+
+const (
+ // Need to modify the cron schedule timeout accordingly in init.sql
before changing the lockTTL
+ lockTTL = 6 * time.Second
+ listenerMinReconnectInterval = 10 * time.Second
+ listenerMaxReconnectInterval = 1 * time.Minute
+ defaultElectPath = "/kvrocks/controller/leader"
+)
+
+type Config struct {
+ Addrs []string `yaml:"addrs"`
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ DBName string `yaml:"db_name"`
+ NotifyChannel string `yaml:"notify_channel"`
+ ElectPath string `yaml:"elect_path"`
+}
+
+type Postgresql struct {
+ db *sql.DB
+ listener *pq.Listener
+
+ leaderMu sync.Mutex
+ leaderID string
+ myID string
+ electPath string
+ isReady atomic.Bool
+
+ quitCh chan struct{}
+ wg sync.WaitGroup
+ lockReleaseCh chan bool
+ leaderChangeCh chan bool
+}
+
+func New(id string, cfg *Config) (*Postgresql, error) {
+ if len(id) == 0 {
+ return nil, errors.New("id must NOT be a empty string")
+ }
+
+ connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable",
cfg.Username, cfg.Password, cfg.Addrs[0], cfg.DBName)
+ db, err := sql.Open("postgres", connStr)
+ if err != nil {
+ return nil, err
+ }
+
+ listener := pq.NewListener(connStr, listenerMinReconnectInterval,
listenerMaxReconnectInterval, nil)
+ err = listener.Listen(cfg.NotifyChannel)
+ if err != nil {
+ return nil, err
+ }
+
+ electPath := defaultElectPath
+ if cfg.ElectPath != "" {
+ electPath = defaultElectPath
+ }
+
+ p := &Postgresql{
+ myID: id,
+ electPath: electPath,
+ db: db,
+ listener: listener,
+ quitCh: make(chan struct{}),
+ lockReleaseCh: make(chan bool),
+ leaderChangeCh: make(chan bool),
+ }
+ err = p.initLeaderId()
+ if err != nil {
+ return nil, err
+ }
+ p.isReady.Store(false)
+ p.wg.Add(2)
+ go p.electLoop()
+ go p.observeLeaderEvent()
+ return p, nil
+}
+
+func (p *Postgresql) ID() string {
+ return p.myID
+}
+
+func (p *Postgresql) Leader() string {
+ p.leaderMu.Lock()
+ defer p.leaderMu.Unlock()
+ return p.leaderID
+}
+
+func (p *Postgresql) LeaderChange() <-chan bool {
+ return p.leaderChangeCh
+}
+
+func (p *Postgresql) IsReady(ctx context.Context) bool {
+ for {
+ select {
+ case <-p.quitCh:
+ return false
+ case <-time.After(100 * time.Millisecond):
+ if p.isReady.Load() {
+ return true
+ }
+ case <-ctx.Done():
+ return p.isReady.Load()
+ }
+ }
+}
+
+func (p *Postgresql) Get(ctx context.Context, key string) ([]byte, error) {
+ var value []byte
+ query := "SELECT value FROM kv WHERE key = $1"
+
+ row := p.db.QueryRow(query, key)
+ err := row.Scan(&value)
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, consts.ErrNotFound
+ }
+ if err != nil {
+ return nil, err
+ }
+ return value, nil
+}
+
+func (p *Postgresql) Exists(ctx context.Context, key string) (bool, error) {
+ _, err := p.Get(ctx, key)
+ if err != nil {
+ if errors.Is(err, consts.ErrNotFound) {
+ return false, nil
+ }
+ return false, err
+ }
+ return true, nil
+}
+
+func (p *Postgresql) Set(ctx context.Context, key string, value []byte) error {
+ query := "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value"
+ _, err := p.db.Exec(query, key, value)
+ return err
+}
+
+func (p *Postgresql) Delete(ctx context.Context, key string) error {
+ query := "DELETE FROM kv WHERE key = $1"
+ _, err := p.db.Exec(query, key)
+ return err
+}
+
+func (p *Postgresql) List(ctx context.Context, prefix string) ([]engine.Entry,
error) {
+ prefixWithWildcard := prefix + "%"
+ query := "SELECT key, value from kv WHERE key LIKE $1"
+ rows, err := p.db.Query(query, prefixWithWildcard)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ prefixLen := len(prefix)
+ entries := make([]engine.Entry, 0)
+ for rows.Next() {
+ var key string
+ var value []byte
+
+ err := rows.Scan(&key, &value)
+ if err != nil {
+ return nil, err
+ }
+
+ if key == prefix {
+ continue
+ }
+
+ key = strings.TrimLeft(key[prefixLen+1:], "/")
+ if strings.ContainsRune(key, '/') {
+ continue
+ }
+ entries = append(entries, engine.Entry{
+ Key: key,
+ Value: value,
+ })
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+
+ return entries, nil
+}
+
+func (p *Postgresql) electLoop() {
+ defer p.wg.Done()
+ for {
+ select {
+ case <-p.quitCh:
+ return
+ default:
+ }
+
+ query := "INSERT INTO locks (name, leaderID) VALUES ($1, $2) ON
CONFLICT DO NOTHING"
+ _, err := p.db.Exec(query, p.electPath, p.myID)
+ if err != nil {
+ time.Sleep(lockTTL / 3)
+ continue
+ }
+
+ select {
+ case <-p.lockReleaseCh:
+ continue
+ case <-p.quitCh:
+ return
+ }
+ }
+}
+
+func (p *Postgresql) observeLeaderEvent() {
+ defer p.wg.Done()
+
+ for {
+ select {
+ case <-p.quitCh:
+ return
+ case notification := <-p.listener.Notify:
+ p.isReady.Store(true)
+
+ data := strings.SplitN(notification.Extra, ":", 2)
+ if len(data) != 2 {
+ logger.Get().With(
+ zap.Error(fmt.Errorf("failed to parse
notification data: expected two parts separated by a colon")),
+ ).Error("Failed to parse notification data")
+ }
+
+ operation := data[0]
+ leaderID := data[1]
+
+ if operation == "INSERT" {
+ p.leaderMu.Lock()
+ p.leaderID = leaderID
+ p.leaderMu.Unlock()
+ p.leaderChangeCh <- true
+ } else {
+ p.lockReleaseCh <- true
+ }
+ }
+ }
+}
+
+func (p *Postgresql) initLeaderId() error {
+ var leaderId string
+ query := "SELECT leaderID FROM locks WHERE name = $1"
+ row := p.db.QueryRow(query, p.electPath)
+ err := row.Scan(&leaderId)
+ if errors.Is(err, sql.ErrNoRows) {
+ p.leaderID = ""
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ p.leaderID = leaderId
+ return nil
+}
+
+func (p *Postgresql) Close() error {
+ close(p.quitCh)
+ p.wg.Wait()
+ p.listener.Close()
+ return p.db.Close()
+}
diff --git a/store/engine/postgresql/postgresql_test.go
b/store/engine/postgresql/postgresql_test.go
new file mode 100644
index 0000000..57617f4
--- /dev/null
+++ b/store/engine/postgresql/postgresql_test.go
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package postgresql
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks-controller/util"
+
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ addr = "127.0.0.1:5432"
+ notifyChannel = "lock_change"
+ username = "postgres"
+ password = "postgres"
+ dbName = "testdb"
+)
+
+func TestBasicOperations(t *testing.T) {
+ id := util.RandString(40)
+ testElectPath := util.RandString(32)
+ persist, err := New(id, &Config{
+ Username: username,
+ Password: password,
+ DBName: dbName,
+ NotifyChannel: notifyChannel,
+ ElectPath: testElectPath,
+ Addrs: []string{addr},
+ })
+ require.NoError(t, err)
+ defer persist.Close()
+ go func() {
+ for range persist.LeaderChange() {
+ // do nothing
+ }
+ }()
+
+ ctx := context.Background()
+ keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"}
+ value := []byte("v")
+ for _, key := range keys {
+ require.NoError(t, persist.Set(ctx, key, value))
+ gotValue, err := persist.Get(ctx, key)
+ require.NoError(t, err)
+ require.Equal(t, value, gotValue)
+ }
+ entries, err := persist.List(ctx, "/a/b")
+ require.NoError(t, err)
+ require.Equal(t, len(keys), len(entries))
+ for _, key := range keys {
+ require.NoError(t, persist.Delete(ctx, key))
+ }
+}
+
+func TestElect(t *testing.T) {
+ endpoints := []string{addr}
+
+ testElectPath := util.RandString(32)
+ id0 := util.RandString(40)
+ node0, err := New(id0, &Config{
+ Username: username,
+ Password: password,
+ DBName: dbName,
+ NotifyChannel: notifyChannel,
+ ElectPath: testElectPath,
+ Addrs: endpoints,
+ })
+ require.NoError(t, err)
+ require.Eventuallyf(t, func() bool {
+ return node0.Leader() == node0.myID
+ }, 10*time.Second, 100*time.Millisecond, "node0 should be the leader")
+
+ id1 := util.RandString(40)
+ node1, err := New(id1, &Config{
+ Username: username,
+ Password: password,
+ DBName: dbName,
+ NotifyChannel: notifyChannel,
+ ElectPath: testElectPath,
+ Addrs: endpoints,
+ })
+ require.NoError(t, err)
+ require.Eventuallyf(t, func() bool {
+ return node1.Leader() == node0.myID
+ }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the
node0")
+
+ go func() {
+ for {
+ select {
+ case <-node0.LeaderChange():
+ // do nothing
+ case <-node1.LeaderChange():
+ // do nothing
+ }
+ }
+ }()
+
+ require.NoError(t, node0.Close())
+
+ require.Eventuallyf(t, func() bool {
+ return node1.Leader() == node1.myID
+ }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader")
+}