This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 293067ab3fa227eb6a38ef5d1255fa4641da142c
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Fri Dec 15 18:50:39 2023 +0800

    [FLINK-33450][autoscaler] Support the JDBCAutoScalerStateStore
---
 .github/workflows/ci.yml                           |   5 +
 flink-autoscaler-plugin-jdbc/pom.xml               | 146 ++++++++++
 .../jdbc/state/JdbcAutoScalerStateStore.java       | 253 +++++++++++++++++
 .../autoscaler/jdbc/state/JdbcStateInteractor.java | 119 ++++++++
 .../autoscaler/jdbc/state/JdbcStateStore.java      |  92 +++++++
 .../flink/autoscaler/jdbc/state/JobStateView.java  | 262 ++++++++++++++++++
 .../flink/autoscaler/jdbc/state/StateType.java     |  58 ++++
 .../src/main/resources/schema/derby_schema.sql     |  26 ++
 .../src/main/resources/schema/mysql_schema.sql     |  32 +++
 .../src/main/resources/schema/postgres_schema.sql  |  44 +++
 .../state/AbstractJdbcStateInteractorITCase.java   |  93 +++++++
 .../jdbc/state/AbstractJdbcStateStoreITCase.java   | 304 +++++++++++++++++++++
 .../jdbc/state/CountableJdbcStateInteractor.java   |  81 ++++++
 .../jdbc/state/JdbcAutoScalerStateStoreTest.java   | 102 +++++++
 .../autoscaler/jdbc/state/JobStateViewTest.java    | 201 ++++++++++++++
 .../flink/autoscaler/jdbc/state/StateTypeTest.java |  45 +++
 .../jdbc/testutils/databases/DatabaseTest.java     |  26 ++
 .../testutils/databases/derby/DerbyExtension.java  |  88 ++++++
 .../testutils/databases/derby/DerbyTestBase.java   |  35 +++
 .../testutils/databases/mysql/MySQL56TestBase.java |  34 +++
 .../testutils/databases/mysql/MySQL57TestBase.java |  34 +++
 .../testutils/databases/mysql/MySQL8TestBase.java  |  34 +++
 .../testutils/databases/mysql/MySQLExtension.java  |  76 ++++++
 .../databases/postgres/PostgreSQLExtension.java    |  75 +++++
 .../databases/postgres/PostgreSQLTestBase.java     |  34 +++
 .../src/test/resources/log4j2.properties           |  27 ++
 .../test/resources/test_schema/mysql_schema.sql    |  28 ++
 .../test/resources/test_schema/postgres_schema.sql |  27 ++
 pom.xml                                            |   1 +
 29 files changed, 2382 insertions(+)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 40bdfc64..3734b59b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -66,6 +66,11 @@ jobs:
           cd flink-kubernetes-webhook
           mvn verify -Dit.skip=false
           cd ..
+      - name: Tests in flink-autoscaler-plugin-jdbc
+        run: |
+          cd flink-autoscaler-plugin-jdbc
+          mvn verify -Dit.skip=false
+          cd ..
   e2e_ci:
     runs-on: ubuntu-latest
     strategy:
diff --git a/flink-autoscaler-plugin-jdbc/pom.xml 
b/flink-autoscaler-plugin-jdbc/pom.xml
new file mode 100644
index 00000000..e009e3f0
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/pom.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.8-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-autoscaler-plugin-jdbc</artifactId>
+    <name>Flink Autoscaler Plugin JDBC</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <testcontainers.version>1.18.2</testcontainers.version>
+        <derby.version>10.15.2.0</derby.version>
+        <postgres.version>42.5.4</postgres.version>
+        <mysql.version>8.0.33</mysql.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers-bom</artifactId>
+                <version>${testcontainers.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+
+    </dependencyManagement>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with 
flink shaded jackson. It can be done
+        after the flink-1.18.1 is released. -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+        </dependency>
+
+        <!-- Test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-api</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Derby tests -->
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>${derby.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Postgres tests -->
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${postgres.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- MySQL tests -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
new file mode 100644
index 00000000..8a78782b
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.ScalingTracking;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.PARALLELISM_OVERRIDES;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
+
+/**
+ * The state store which persists its state in JDBC related database.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> The job autoscaler context.
+ */
+@Experimental
+public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<KEY>>
+        implements AutoScalerStateStore<KEY, Context> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAutoScalerStateStore.class);
+
+    private final JdbcStateStore jdbcStateStore;
+
+    protected static final ObjectMapper YAML_MAPPER =
+            new ObjectMapper()
+                    .registerModule(new JavaTimeModule())
+                    .registerModule(new AutoScalerSerDeModule());
+
+    public JdbcAutoScalerStateStore(JdbcStateStore jdbcStateStore) {
+        this.jdbcStateStore = jdbcStateStore;
+    }
+
+    @Override
+    public void storeScalingHistory(
+            Context jobContext, Map<JobVertexID, SortedMap<Instant, 
ScalingSummary>> scalingHistory)
+            throws Exception {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext),
+                SCALING_HISTORY,
+                serializeScalingHistory(scalingHistory));
+    }
+
+    @Nonnull
+    @Override
+    public Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getScalingHistory(
+            Context jobContext) {
+        Optional<String> serializedScalingHistory =
+                jdbcStateStore.getSerializedState(getSerializeKey(jobContext), 
SCALING_HISTORY);
+        if (serializedScalingHistory.isEmpty()) {
+            return new HashMap<>();
+        }
+        try {
+            return deserializeScalingHistory(serializedScalingHistory.get());
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize scaling history, possibly the 
format changed. Discarding...",
+                    e);
+            jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
SCALING_HISTORY);
+            return new HashMap<>();
+        }
+    }
+
+    @Override
+    public void removeScalingHistory(Context jobContext) {
+        jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
SCALING_HISTORY);
+    }
+
+    @Override
+    public void storeScalingTracking(Context jobContext, ScalingTracking 
scalingTrack)
+            throws Exception {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext),
+                SCALING_TRACKING,
+                serializeScalingTracking(scalingTrack));
+    }
+
+    @Override
+    public ScalingTracking getScalingTracking(Context jobContext) {
+        Optional<String> serializedRescalingHistory =
+                jdbcStateStore.getSerializedState(getSerializeKey(jobContext), 
SCALING_TRACKING);
+        if (serializedRescalingHistory.isEmpty()) {
+            return new ScalingTracking();
+        }
+        try {
+            return 
deserializeScalingTracking(serializedRescalingHistory.get());
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize rescaling history, possibly the 
format changed. Discarding...",
+                    e);
+            jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
SCALING_TRACKING);
+            return new ScalingTracking();
+        }
+    }
+
+    @Override
+    public void storeCollectedMetrics(
+            Context jobContext, SortedMap<Instant, CollectedMetrics> metrics) 
throws Exception {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext), COLLECTED_METRICS, 
serializeEvaluatedMetrics(metrics));
+    }
+
+    @Nonnull
+    @Override
+    public SortedMap<Instant, CollectedMetrics> getCollectedMetrics(Context 
jobContext) {
+        Optional<String> serializedEvaluatedMetricsOpt =
+                jdbcStateStore.getSerializedState(getSerializeKey(jobContext), 
COLLECTED_METRICS);
+        if (serializedEvaluatedMetricsOpt.isEmpty()) {
+            return new TreeMap<>();
+        }
+        try {
+            return 
deserializeEvaluatedMetrics(serializedEvaluatedMetricsOpt.get());
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format 
changed. Discarding...",
+                    e);
+            jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
COLLECTED_METRICS);
+            return new TreeMap<>();
+        }
+    }
+
+    @Override
+    public void removeCollectedMetrics(Context jobContext) {
+        jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
COLLECTED_METRICS);
+    }
+
+    @Override
+    public void storeParallelismOverrides(
+            Context jobContext, Map<String, String> parallelismOverrides) {
+        jdbcStateStore.putSerializedState(
+                getSerializeKey(jobContext),
+                PARALLELISM_OVERRIDES,
+                serializeParallelismOverrides(parallelismOverrides));
+    }
+
+    @Nonnull
+    @Override
+    public Map<String, String> getParallelismOverrides(Context jobContext) {
+        return jdbcStateStore
+                .getSerializedState(getSerializeKey(jobContext), 
PARALLELISM_OVERRIDES)
+                .map(JdbcAutoScalerStateStore::deserializeParallelismOverrides)
+                .orElse(new HashMap<>());
+    }
+
+    @Override
+    public void removeParallelismOverrides(Context jobContext) {
+        jdbcStateStore.removeSerializedState(getSerializeKey(jobContext), 
PARALLELISM_OVERRIDES);
+    }
+
+    @Override
+    public void clearAll(Context jobContext) {
+        jdbcStateStore.clearAll(getSerializeKey(jobContext));
+    }
+
+    @Override
+    public void flush(Context jobContext) throws Exception {
+        jdbcStateStore.flush(getSerializeKey(jobContext));
+    }
+
+    @Override
+    public void removeInfoFromCache(KEY jobKey) {
+        jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey));
+    }
+
+    private String getSerializeKey(Context jobContext) {
+        return getSerializeKey(jobContext.getJobKey());
+    }
+
+    private String getSerializeKey(KEY jobKey) {
+        return jobKey.toString();
+    }
+
+    // The serialization and deserialization are similar to 
KubernetesAutoScalerStateStore
+    protected static String serializeScalingHistory(
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory) throws Exception {
+        return YAML_MAPPER.writeValueAsString(scalingHistory);
+    }
+
+    private static Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
deserializeScalingHistory(
+            String scalingHistory) throws JacksonException {
+        return YAML_MAPPER.readValue(scalingHistory, new TypeReference<>() {});
+    }
+
+    protected static String serializeScalingTracking(ScalingTracking 
scalingTracking)
+            throws Exception {
+        return YAML_MAPPER.writeValueAsString(scalingTracking);
+    }
+
+    private static ScalingTracking deserializeScalingTracking(String 
scalingTracking)
+            throws JacksonException {
+        return YAML_MAPPER.readValue(scalingTracking, new TypeReference<>() 
{});
+    }
+
+    @VisibleForTesting
+    protected static String serializeEvaluatedMetrics(
+            SortedMap<Instant, CollectedMetrics> evaluatedMetrics) throws 
Exception {
+        return YAML_MAPPER.writeValueAsString(evaluatedMetrics);
+    }
+
+    private static SortedMap<Instant, CollectedMetrics> 
deserializeEvaluatedMetrics(
+            String evaluatedMetrics) throws JacksonException {
+        return YAML_MAPPER.readValue(evaluatedMetrics, new TypeReference<>() 
{});
+    }
+
+    private static String serializeParallelismOverrides(Map<String, String> 
overrides) {
+        return ConfigurationUtils.convertValue(overrides, String.class);
+    }
+
+    private static Map<String, String> deserializeParallelismOverrides(String 
overrides) {
+        return ConfigurationUtils.convertValue(overrides, Map.class);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
new file mode 100644
index 00000000..4c2f9a0f
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Responsible for interacting with the database. */
+public class JdbcStateInteractor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcStateInteractor.class);
+
+    private final Connection conn;
+
+    public JdbcStateInteractor(Connection conn) {
+        this.conn = conn;
+    }
+
+    public Map<StateType, String> queryData(String jobKey) throws Exception {
+        var query =
+                "select state_type, state_value from 
t_flink_autoscaler_state_store where job_key = ?";
+        var data = new HashMap<StateType, String>();
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setString(1, jobKey);
+            var rs = pstmt.executeQuery();
+            while (rs.next()) {
+                var stateTypeIdentifier = rs.getString("state_type");
+                var stateType = 
StateType.createFromIdentifier(stateTypeIdentifier);
+                var stateValue = rs.getString("state_value");
+                data.put(stateType, stateValue);
+            }
+        }
+        return data;
+    }
+
+    public void deleteData(String jobKey, List<StateType> deletedStateTypes) 
throws Exception {
+        var query =
+                String.format(
+                        "DELETE FROM t_flink_autoscaler_state_store where 
job_key = ? and state_type in (%s)",
+                        String.join(",", 
Collections.nCopies(deletedStateTypes.size(), "?")));
+        try (var pstmt = conn.prepareStatement(query)) {
+            pstmt.setString(1, jobKey);
+            int i = 2;
+            for (var stateType : deletedStateTypes) {
+                pstmt.setString(i++, stateType.getIdentifier());
+            }
+            pstmt.execute();
+        }
+        LOG.info("Delete jobKey: {} stateTypes: {} from database.", jobKey, 
deletedStateTypes);
+    }
+
+    public void createData(
+            String jobKey, List<StateType> createdStateTypes, Map<StateType, 
String> data)
+            throws Exception {
+        var query =
+                "INSERT INTO t_flink_autoscaler_state_store (job_key, 
state_type, state_value) values (?, ?, ?)";
+        try (var pstmt = conn.prepareStatement(query)) {
+            for (var stateType : createdStateTypes) {
+                pstmt.setString(1, jobKey);
+                pstmt.setString(2, stateType.getIdentifier());
+
+                String stateValue = data.get(stateType);
+                checkState(
+                        stateValue != null,
+                        "The state value shouldn't be null during inserting. "
+                                + "It may be a bug, please raise a JIRA to 
Flink Community.");
+                pstmt.setString(3, stateValue);
+                pstmt.addBatch();
+            }
+            pstmt.executeBatch();
+        }
+        LOG.info("Insert jobKey: {} stateTypes: {} from database.", jobKey, 
createdStateTypes);
+    }
+
+    public void updateData(
+            String jobKey, List<StateType> updatedStateTypes, Map<StateType, 
String> data)
+            throws Exception {
+        var query =
+                "UPDATE t_flink_autoscaler_state_store set state_value = ? 
where job_key = ? and state_type = ?";
+
+        try (var pstmt = conn.prepareStatement(query)) {
+            for (var stateType : updatedStateTypes) {
+                String stateValue = data.get(stateType);
+                checkState(
+                        stateValue != null,
+                        "The state value shouldn't be null during inserting. "
+                                + "It may be a bug, please raise a JIRA to 
Flink Community.");
+                pstmt.setString(1, stateValue);
+                pstmt.setString(2, jobKey);
+                pstmt.setString(3, stateType.getIdentifier());
+                pstmt.addBatch();
+            }
+            pstmt.executeBatch();
+        }
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
new file mode 100644
index 00000000..1e237a47
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The jdbc state store. */
+public class JdbcStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcStateStore.class);
+
+    private final ConcurrentHashMap<String, JobStateView> cache = new 
ConcurrentHashMap<>();
+
+    private final JdbcStateInteractor jdbcStateInteractor;
+
+    public JdbcStateStore(JdbcStateInteractor jdbcStateInteractor) {
+        this.jdbcStateInteractor = jdbcStateInteractor;
+    }
+
+    protected void putSerializedState(String jobKey, StateType stateType, 
String value) {
+        getJobStateView(jobKey).put(stateType, value);
+    }
+
+    protected Optional<String> getSerializedState(String jobKey, StateType 
stateType) {
+        return Optional.ofNullable(getJobStateView(jobKey).get(stateType));
+    }
+
+    protected void removeSerializedState(String jobKey, StateType stateType) {
+        getJobStateView(jobKey).remove(stateType);
+    }
+
+    public void flush(String jobKey) throws Exception {
+        JobStateView jobStateView = cache.get(jobKey);
+        if (jobStateView == null) {
+            LOG.debug("The JobStateView doesn't exist, so skip the flush.");
+            return;
+        }
+        try {
+            jobStateView.flush();
+        } catch (Exception e) {
+            LOG.error(
+                    "Error while flush autoscaler info to database, 
invalidating to clear the cache",
+                    e);
+            removeInfoFromCache(jobKey);
+            throw e;
+        }
+    }
+
+    public void removeInfoFromCache(String jobKey) {
+        cache.remove(jobKey);
+    }
+
+    public void clearAll(String jobKey) {
+        getJobStateView(jobKey).clear();
+    }
+
+    private JobStateView getJobStateView(String jobKey) {
+        return cache.computeIfAbsent(
+                jobKey,
+                (id) -> {
+                    try {
+                        return createJobStateView(jobKey);
+                    } catch (Exception exception) {
+                        throw new RuntimeException(
+                                "Meet exception during create job state 
view.", exception);
+                    }
+                });
+    }
+
+    private JobStateView createJobStateView(String jobKey) throws Exception {
+        return new JobStateView(jdbcStateInteractor, jobKey);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java
new file mode 100644
index 00000000..f8d52417
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static 
org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+    /**
+     * The state of state type about the cache and database.
+     *
+     * <p>Note: {@link #inLocally} and {@link #inDatabase} are only for 
understand, we don't use
+     * them.
+     */
+    @SuppressWarnings("unused")
+    enum State {
+
+        /** State doesn't exist at database, and it's not used so far, so it's 
not needed. */
+        NOT_NEEDED(false, false, false),
+        /** State is only stored locally, not created in JDBC database yet. */
+        NEEDS_CREATE(true, false, true),
+        /** State exists in JDBC database but there are newer local changes. */
+        NEEDS_UPDATE(true, true, true),
+        /** State is stored locally and in database, and they are same. */
+        UP_TO_DATE(true, true, false),
+        /** State is stored in database, but it's deleted in local. */
+        NEEDS_DELETE(false, true, true);
+
+        /** The data of this state type is stored in locally when it is true. 
*/
+        private final boolean inLocally;
+
+        /** The data of this state type is stored in database when it is true. 
*/
+        private final boolean inDatabase;
+
+        /** The data of this state type is stored in database when it is true. 
*/
+        private final boolean needFlush;
+
+        State(boolean inLocally, boolean inDatabase, boolean needFlush) {
+            this.inLocally = inLocally;
+            this.inDatabase = inDatabase;
+            this.needFlush = needFlush;
+        }
+
+        public boolean isNeedFlush() {
+            return needFlush;
+        }
+    }
+
+    /** Transition old state to the new state when some operations happen to 
cache or database. */
+    private static class StateTransitioner {
+
+        /** The transition when put data to cache. */
+        @Nonnull
+        public State putTransition(@Nonnull State oldState) {
+            switch (oldState) {
+                case NOT_NEEDED:
+                case NEEDS_CREATE:
+                    return NEEDS_CREATE;
+                case NEEDS_UPDATE:
+                case UP_TO_DATE:
+                case NEEDS_DELETE:
+                    return NEEDS_UPDATE;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unknown state : %s.", oldState));
+            }
+        }
+
+        /** The transition when delete data from cache. */
+        @Nonnull
+        public State deleteTransition(@Nonnull State oldState) {
+            switch (oldState) {
+                case NOT_NEEDED:
+                case NEEDS_CREATE:
+                    return NOT_NEEDED;
+                case NEEDS_UPDATE:
+                case UP_TO_DATE:
+                case NEEDS_DELETE:
+                    return NEEDS_DELETE;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unknown state : %s.", oldState));
+            }
+        }
+
+        /** The transition when flush data from cache to database. */
+        @Nonnull
+        public State flushTransition(@Nonnull State oldState) {
+            switch (oldState) {
+                case NOT_NEEDED:
+                case NEEDS_DELETE:
+                    return NOT_NEEDED;
+                case NEEDS_CREATE:
+                case NEEDS_UPDATE:
+                case UP_TO_DATE:
+                    return UP_TO_DATE;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unknown state : %s.", oldState));
+            }
+        }
+    }
+
+    private static final StateTransitioner STATE_TRANSITIONER = new 
StateTransitioner();
+
+    private final JdbcStateInteractor jdbcStateInteractor;
+    private final String jobKey;
+    private final Map<StateType, String> data;
+
+    /**
+     * The state is maintained for each state type, which means that part of 
state types of current
+     * job are stored in the database, but the rest of the state types may 
have been created in the
+     * database.
+     */
+    private final Map<StateType, State> states;
+
+    public JobStateView(JdbcStateInteractor jdbcStateInteractor, String 
jobKey) throws Exception {
+        this.jdbcStateInteractor = jdbcStateInteractor;
+        this.jobKey = jobKey;
+        this.data = jdbcStateInteractor.queryData(jobKey);
+        this.states = generateStates(this.data);
+    }
+
+    private Map<StateType, State> generateStates(Map<StateType, String> data) {
+        final var states = new HashMap<StateType, State>();
+        for (StateType stateType : StateType.values()) {
+            if (data.containsKey(stateType)) {
+                states.put(stateType, UP_TO_DATE);
+            } else {
+                states.put(stateType, NOT_NEEDED);
+            }
+        }
+        return states;
+    }
+
+    public String get(StateType stateType) {
+        return data.get(stateType);
+    }
+
+    public void put(StateType stateType, String value) {
+        data.put(stateType, value);
+        updateState(stateType, STATE_TRANSITIONER::putTransition);
+    }
+
+    public void remove(StateType stateType) {
+        var oldKey = data.remove(stateType);
+        if (oldKey == null) {
+            return;
+        }
+        updateState(stateType, STATE_TRANSITIONER::deleteTransition);
+    }
+
+    public void clear() {
+        if (data.isEmpty()) {
+            return;
+        }
+        var iterator = data.keySet().iterator();
+        while (iterator.hasNext()) {
+            var stateType = iterator.next();
+            iterator.remove();
+            updateState(stateType, STATE_TRANSITIONER::deleteTransition);
+        }
+    }
+
+    public void flush() throws Exception {
+        if (states.values().stream().noneMatch(State::isNeedFlush)) {
+            // No any state needs to be flushed.
+            return;
+        }
+
+        // Build the data that need to be flushed.
+        var flushData = new HashMap<State, List<StateType>>(3);
+        for (Map.Entry<StateType, State> stateEntry : states.entrySet()) {
+            State state = stateEntry.getValue();
+            if (!state.isNeedFlush()) {
+                continue;
+            }
+            StateType stateType = stateEntry.getKey();
+            flushData.compute(
+                    state,
+                    (st, list) -> {
+                        if (list == null) {
+                            list = new LinkedList<>();
+                        }
+                        list.add(stateType);
+                        return list;
+                    });
+        }
+
+        for (var entry : flushData.entrySet()) {
+            State state = entry.getKey();
+            List<StateType> stateTypes = entry.getValue();
+            switch (state) {
+                case NEEDS_CREATE:
+                    jdbcStateInteractor.createData(jobKey, stateTypes, data);
+                    break;
+                case NEEDS_DELETE:
+                    jdbcStateInteractor.deleteData(jobKey, stateTypes);
+                    break;
+                case NEEDS_UPDATE:
+                    jdbcStateInteractor.updateData(jobKey, stateTypes, data);
+                    break;
+                default:
+                    throw new IllegalStateException(String.format("Unknown 
state : %s", state));
+            }
+            for (var stateType : stateTypes) {
+                updateState(stateType, STATE_TRANSITIONER::flushTransition);
+            }
+        }
+    }
+
+    private void updateState(StateType stateType, Function<State, State> 
stateTransitioner) {
+        states.compute(
+                stateType,
+                (type, oldState) -> {
+                    checkState(
+                            oldState != null,
+                            "The state of each state type should be maintained 
in states. "
+                                    + "It may be a bug, please raise a JIRA to 
Flink Community.");
+                    return stateTransitioner.apply(oldState);
+                });
+    }
+
+    @VisibleForTesting
+    public Map<StateType, String> getDataReadOnly() {
+        return Collections.unmodifiableMap(data);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
new file mode 100644
index 00000000..728afaa2
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The state type. */
+public enum StateType {
+    SCALING_HISTORY("scalingHistory"),
+    SCALING_TRACKING("scalingTracking"),
+    COLLECTED_METRICS("collectedMetrics"),
+    PARALLELISM_OVERRIDES("parallelismOverrides");
+
+    /**
+     * The identifier of each state type, it will be used to store. Please 
ensure the identifier is
+     * different for each state type.
+     */
+    private final String identifier;
+
+    StateType(String identifier) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(identifier) && 
identifier.length() < 30,
+                "Ensure the identifier length is less than 30 to reduce the 
storage cost.");
+        this.identifier = identifier;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public static StateType createFromIdentifier(String identifier) {
+        for (StateType stateType : StateType.values()) {
+            if (stateType.getIdentifier().equals(identifier)) {
+                return stateType;
+            }
+        }
+
+        throw new IllegalArgumentException(
+                String.format("Unknown StateType identifier : %s.", 
identifier));
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
new file mode 100644
index 00000000..c8defefe
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE t_flink_autoscaler_state_store
+(
+    id            BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1),
+    update_time   TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    job_key       VARCHAR(191) NOT NULL,
+    state_type    VARCHAR(100) NOT NULL,
+    state_value   CLOB NOT NULL,
+    PRIMARY KEY (id)
+);
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
new file mode 100644
index 00000000..7a6c2d04
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+create database if not exists `flink_autoscaler` character set utf8mb4 collate 
utf8mb4_general_ci;
+
+use `flink_autoscaler`;
+
+create table `t_flink_autoscaler_state_store`
+(
+    `id`            bigint       not null auto_increment,
+    `update_time`   datetime     not null default current_timestamp on update 
current_timestamp comment 'update time',
+    `job_key`       varchar(191) not null comment 'The job key',
+    `state_type`    varchar(100) not null comment 'The state type',
+    `state_value`   longtext     not null comment 'The real state',
+    primary key (`id`) using btree,
+    unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree
+) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
+
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
new file mode 100644
index 00000000..d9fadfc0
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE DATABASE flink_autoscaler;
+\c flink_autoscaler;
+
+CREATE TABLE t_flink_autoscaler_state_store
+(
+    id            BIGSERIAL     NOT NULL,
+    update_time   TIMESTAMP     NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    job_key       TEXT          NOT NULL,
+    state_type    TEXT          NOT NULL,
+    state_value   TEXT          NOT NULL,
+    PRIMARY KEY (id),
+    UNIQUE (job_key, state_type)
+);
+
+CREATE OR REPLACE FUNCTION update_flink_autoscaler_update_time_column()
+RETURNS TRIGGER AS $$
+BEGIN
+   NEW.update_time = CURRENT_TIMESTAMP;
+RETURN NEW;
+END;
+$$ language 'plpgsql';
+
+CREATE TRIGGER update_t_flink_autoscaler_state_store_modtime
+    BEFORE UPDATE ON t_flink_autoscaler_state_store
+    FOR EACH ROW
+    EXECUTE FUNCTION update_flink_autoscaler_update_time_column();
+
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
new file mode 100644
index 00000000..da737a54
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The abstract IT case for {@link JdbcStateInteractor}. */
+abstract class AbstractJdbcStateInteractorITCase implements DatabaseTest {
+
+    @Test
+    void testAllOperations() throws Exception {
+        var jobKey = "jobKey";
+        var value1 = "value1";
+        var value2 = "value2";
+        var value3 = "value3";
+        try (var conn = getConnection()) {
+            var jdbcStateInteractor = new JdbcStateInteractor(conn);
+            assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
+
+            // Test for creating data.
+            jdbcStateInteractor.createData(
+                    jobKey,
+                    List.of(COLLECTED_METRICS, SCALING_HISTORY),
+                    Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, 
value2));
+            assertThat(jdbcStateInteractor.queryData(jobKey))
+                    .isEqualTo(Map.of(COLLECTED_METRICS, value1, 
SCALING_HISTORY, value2));
+
+            // Test for updating data.
+            jdbcStateInteractor.updateData(
+                    jobKey,
+                    List.of(COLLECTED_METRICS),
+                    Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, 
value2));
+            assertThat(jdbcStateInteractor.queryData(jobKey))
+                    .isEqualTo(Map.of(COLLECTED_METRICS, value3, 
SCALING_HISTORY, value2));
+
+            // Test for deleting data.
+            jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
+            assertThat(jdbcStateInteractor.queryData(jobKey))
+                    .isEqualTo(Map.of(SCALING_HISTORY, value2));
+            jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
+            assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
+        }
+    }
+}
+
+/** Test {@link JdbcStateInteractor} via Derby database. */
+class DerbyJdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase
+        implements DerbyTestBase {}
+
+/** Test {@link JdbcStateInteractor} via MySQL 5.6.x. */
+class MySQL56JdbcStateInteractorITCase extends 
AbstractJdbcStateInteractorITCase
+        implements MySQL56TestBase {}
+
+/** Test {@link JdbcStateInteractor} via MySQL 5.7.x. */
+class MySQL57JdbcStateInteractorITCase extends 
AbstractJdbcStateInteractorITCase
+        implements MySQL57TestBase {}
+
+/** Test {@link JdbcStateInteractor} via MySQL 8.x. */
+class MySQL8JdbcStateInteractorITCase extends AbstractJdbcStateInteractorITCase
+        implements MySQL8TestBase {}
+
+/** Test {@link JdbcStateInteractor} via Postgre SQL. */
+class PostgreSQLJdbcStateInteractorITCase extends 
AbstractJdbcStateInteractorITCase
+        implements PostgreSQLTestBase {}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
new file mode 100644
index 00000000..5ab47137
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The abstract IT case for {@link JdbcStateStore}. */
+abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {
+
+    private static final String DEFAULT_JOB_KEY = "jobKey";
+    private Connection conn;
+    private CountableJdbcStateInteractor jdbcStateInteractor;
+    private JdbcStateStore jdbcStateStore;
+
+    @BeforeEach
+    void beforeEach() throws Exception {
+        this.conn = getConnection();
+        this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+        this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
+    }
+
+    @AfterEach
+    void afterEach() throws SQLException {
+        if (conn != null) {
+            conn.close();
+        }
+    }
+
+    @Test
+    void testCaching() throws Exception {
+        var value1 = "value1";
+        var value2 = "value2";
+        var value3 = "value3";
+
+        jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0);
+
+        // Query from database.
+        jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // The rest of state types of same job key shouldn't query database.
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_HISTORY)).isEmpty();
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_TRACKING)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        //  Putting does not go to database, unless flushing.
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, 
value1);
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, 
value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Flush together! Create counter is one.
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Get
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+        var job2 = "job2";
+        assertThat(jdbcStateStore.getSerializedState(job2, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 1);
+        assertThat(jdbcStateStore.getSerializedState(job2, 
SCALING_HISTORY)).isEmpty();
+
+        // Put and flush state for job2
+        jdbcStateStore.putSerializedState(job2, SCALING_TRACKING, value3);
+        jdbcStateStore.flush(job2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 2);
+
+        // Build the new JdbcStateStore
+        var newJdbcStore = new JdbcStateStore(jdbcStateInteractor);
+        assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS))
+                .hasValue(value1);
+        assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_HISTORY))
+                .hasValue(value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(3, 0, 0, 2);
+
+        assertThat(newJdbcStore.getSerializedState(job2, 
SCALING_TRACKING)).hasValue(value3);
+        jdbcStateInteractor.assertCountableJdbcInteractor(4, 0, 0, 2);
+
+        // Removing the data from cache and query from database again.
+        newJdbcStore.removeInfoFromCache(job2);
+        assertThat(newJdbcStore.getSerializedState(job2, 
SCALING_TRACKING)).hasValue(value3);
+        jdbcStateInteractor.assertCountableJdbcInteractor(5, 0, 0, 2);
+    }
+
+    @Test
+    void testDeleting() throws Exception {
+        var value1 = "value1";
+
+        jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Get from cache, and it shouldn't exist in database.
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, 
value1);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS))
+                .hasValue(value1);
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Deleting before flushing
+        jdbcStateStore.removeSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Flush method shouldn't flush any data.
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Put and flush data to database.
+        var value2 = "value2";
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, 
value2);
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Deleting after flushing, data is deleted in cache, but it still 
exists in database.
+        jdbcStateStore.removeSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).hasValue(value2);
+
+        // Flushing
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1);
+
+        // Get from database for a new JdbcStateStore.
+        var newJdbcStore = new JdbcStateStore(jdbcStateInteractor);
+        assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+    }
+
+    @Test
+    void testErrorHandlingDuringFlush() throws Exception {
+        var value1 = "value1";
+        var value2 = "value2";
+        jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+
+        // Modify the database directly.
+        var tmpJdbcInteractor = new JdbcStateInteractor(conn);
+        tmpJdbcInteractor.createData(
+                DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), 
Map.of(COLLECTED_METRICS, value1));
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).hasValue(value1);
+
+        // Cache cannot read data of database.
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Create it with SQLException due to the data has already existed in 
database.
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, 
value2);
+        assertThatThrownBy(() -> jdbcStateStore.flush(DEFAULT_JOB_KEY))
+                .hasCauseInstanceOf(SQLException.class);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Get normally.
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS))
+                .hasValue(value1);
+        jdbcStateInteractor.assertCountableJdbcInteractor(2, 0, 0, 1);
+    }
+
+    @Test
+    void testErrorHandlingDuringQuery() throws Exception {
+        var value1 = "value1";
+        final var expectedException = new RuntimeException("Database isn't 
stable.");
+
+        var exceptionableJdbcStateInteractor =
+                new CountableJdbcStateInteractor(conn) {
+                    private final AtomicBoolean isFirst = new 
AtomicBoolean(true);
+
+                    @Override
+                    public Map<StateType, String> queryData(String jobKey) 
throws Exception {
+                        if (isFirst.get()) {
+                            isFirst.set(false);
+                            throw expectedException;
+                        }
+                        return super.queryData(jobKey);
+                    }
+                };
+
+        var exceptionableJdbcStore = new 
JdbcStateStore(exceptionableJdbcStateInteractor);
+
+        // First get will fail.
+        jdbcStateInteractor.assertCountableJdbcInteractor(0, 0, 0, 0);
+        assertThatThrownBy(
+                        () ->
+                                exceptionableJdbcStore.getSerializedState(
+                                        DEFAULT_JOB_KEY, COLLECTED_METRICS))
+                .rootCause()
+                .isSameAs(expectedException);
+
+        // It's recovered.
+        assertThat(exceptionableJdbcStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS))
+                .isEmpty();
+        exceptionableJdbcStore.putSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS, value1);
+        exceptionableJdbcStore.flush(DEFAULT_JOB_KEY);
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+    }
+
+    @Test
+    void testDiscardAllState() throws Exception {
+        var value1 = "value1";
+        var value2 = "value2";
+        var value3 = "value3";
+
+        // Put and flush all state types first.
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, 
value1);
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, 
value2);
+        jdbcStateStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING, 
value3);
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+        assertStateValueForCacheAndDatabase(SCALING_TRACKING, value3);
+
+        // Clear all in cache.
+        jdbcStateStore.clearAll(DEFAULT_JOB_KEY);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_HISTORY)).isEmpty();
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_TRACKING)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).hasValue(value1);
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
SCALING_HISTORY)).hasValue(value2);
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
SCALING_TRACKING)).hasValue(value3);
+
+        // Flush!
+        jdbcStateStore.flush(DEFAULT_JOB_KEY);
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_HISTORY)).isEmpty();
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
SCALING_TRACKING)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
COLLECTED_METRICS)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
SCALING_HISTORY)).isEmpty();
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
SCALING_TRACKING)).isEmpty();
+    }
+
+    private void assertStateValueForCacheAndDatabase(StateType stateType, 
String expectedValue)
+            throws Exception {
+        assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, 
stateType))
+                .hasValue(expectedValue);
+        assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, 
stateType)).hasValue(expectedValue);
+    }
+
+    private Optional<String> getValueFromDatabase(String jobKey, StateType 
stateType)
+            throws Exception {
+        var jdbcInteractor = new JdbcStateInteractor(conn);
+        return 
Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
+    }
+}
+
+/** Test {@link JdbcStateStore} via Derby database. */
+class DerbyJdbcStateStoreITCase extends AbstractJdbcStateStoreITCase 
implements DerbyTestBase {}
+
+/** Test {@link JdbcStateStore} via MySQL 5.6.x. */
+class MySQL56JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase 
implements MySQL56TestBase {}
+
+/** Test {@link JdbcStateStore} via MySQL 5.7.x. */
+class MySQL57JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase 
implements MySQL57TestBase {}
+
+/** Test {@link JdbcStateStore} via MySQL 8. */
+class MySQL8JdbcStateStoreITCase extends AbstractJdbcStateStoreITCase 
implements MySQL8TestBase {}
+
+/** Test {@link JdbcStateStore} via Postgre SQL. */
+class PostgreSQLJdbcStoreITCase extends AbstractJdbcStateStoreITCase
+        implements PostgreSQLTestBase {}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
new file mode 100644
index 00000000..8fafc95d
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import java.sql.Connection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Countable {@link JdbcStateInteractor}. */
+public class CountableJdbcStateInteractor extends JdbcStateInteractor {
+
+    private final AtomicLong queryCounter;
+    private final AtomicLong deleteCounter;
+    private final AtomicLong createCounter;
+    private final AtomicLong updateCounter;
+
+    public CountableJdbcStateInteractor(Connection conn) {
+        super(conn);
+        queryCounter = new AtomicLong();
+        deleteCounter = new AtomicLong();
+        createCounter = new AtomicLong();
+        updateCounter = new AtomicLong();
+    }
+
+    @Override
+    public Map<StateType, String> queryData(String jobKey) throws Exception {
+        queryCounter.incrementAndGet();
+        return super.queryData(jobKey);
+    }
+
+    @Override
+    public void deleteData(String jobKey, List<StateType> deletedStateTypes) 
throws Exception {
+        deleteCounter.incrementAndGet();
+        super.deleteData(jobKey, deletedStateTypes);
+    }
+
+    @Override
+    public void createData(
+            String jobKey, List<StateType> createdStateTypes, Map<StateType, 
String> data)
+            throws Exception {
+        createCounter.incrementAndGet();
+        super.createData(jobKey, createdStateTypes, data);
+    }
+
+    @Override
+    public void updateData(
+            String jobKey, List<StateType> updatedStateTypes, Map<StateType, 
String> data)
+            throws Exception {
+        updateCounter.incrementAndGet();
+        super.updateData(jobKey, updatedStateTypes, data);
+    }
+
+    public void assertCountableJdbcInteractor(
+            long expectedQueryCounter,
+            long expectedDeleteCounter,
+            long expectedUpdateCounter,
+            long expectedCreateCounter) {
+        assertThat(queryCounter).hasValue(expectedQueryCounter);
+        assertThat(deleteCounter).hasValue(expectedDeleteCounter);
+        assertThat(updateCounter).hasValue(expectedUpdateCounter);
+        assertThat(createCounter).hasValue(expectedCreateCounter);
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
new file mode 100644
index 00000000..a79c07bc
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+import org.apache.flink.autoscaler.state.AbstractAutoScalerStateStoreTest;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for {@link JdbcAutoScalerStateStore}.
+ *
+ * <p>Note: {@link #testDiscardInvalidHistory} is referred from {@link
+ * KubernetesAutoScalerStateStoreTest}.
+ */
+class JdbcAutoScalerStateStoreTest
+        extends AbstractAutoScalerStateStoreTest<JobID, 
JobAutoScalerContext<JobID>>
+        implements DerbyTestBase {
+
+    private JdbcStateStore jdbcStateStore;
+    private JdbcAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
cachedStateStore;
+
+    @Override
+    protected void preSetup() throws Exception {
+        jdbcStateStore = new JdbcStateStore(new 
JdbcStateInteractor(getConnection()));
+        cachedStateStore = new JdbcAutoScalerStateStore<>(jdbcStateStore);
+    }
+
+    @Override
+    protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
+            createPhysicalAutoScalerStateStore() throws Exception {
+        return new JdbcAutoScalerStateStore<>(
+                new JdbcStateStore(new JdbcStateInteractor(getConnection())));
+    }
+
+    @Override
+    protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
+            createCachedAutoScalerStateStore() {
+        return cachedStateStore;
+    }
+
+    @Override
+    protected JobAutoScalerContext<JobID> createJobContext() {
+        return createDefaultJobAutoScalerContext();
+    }
+
+    @Test
+    void testDiscardInvalidHistory() throws Exception {
+        jdbcStateStore.putSerializedState(
+                ctx.getJobKey().toString(), StateType.COLLECTED_METRICS, 
"invalid");
+        jdbcStateStore.putSerializedState(
+                ctx.getJobKey().toString(), StateType.SCALING_HISTORY, 
"invalid2");
+
+        var now = Instant.now();
+
+        assertThat(
+                        jdbcStateStore.getSerializedState(
+                                ctx.getJobKey().toString(), 
StateType.COLLECTED_METRICS))
+                .isPresent();
+        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+        assertThat(
+                        jdbcStateStore.getSerializedState(
+                                ctx.getJobKey().toString(), 
StateType.COLLECTED_METRICS))
+                .isEmpty();
+
+        assertThat(
+                        jdbcStateStore.getSerializedState(
+                                ctx.getJobKey().toString(), 
StateType.SCALING_HISTORY))
+                .isPresent();
+        Assertions.assertEquals(new TreeMap<>(), 
getTrimmedScalingHistory(stateStore, ctx, now));
+        assertThat(
+                        jdbcStateStore.getSerializedState(
+                                ctx.getJobKey().toString(), 
StateType.SCALING_HISTORY))
+                .isEmpty();
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
new file mode 100644
index 00000000..3989dbff
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import 
org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static 
org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link JobStateView}. */
+class JobStateViewTest implements DerbyTestBase {
+
+    private static final String DEFAULT_JOB_KEY = "jobKey";
+    private Connection conn;
+    private CountableJdbcStateInteractor jdbcStateInteractor;
+    private JobStateView jobStateView;
+
+    @BeforeEach
+    void beforeEach() throws Exception {
+        this.conn = getConnection();
+        this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
+        this.jobStateView = new JobStateView(jdbcStateInteractor, 
DEFAULT_JOB_KEY);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+    }
+
+    @AfterEach
+    void afterEach() throws SQLException {
+        if (conn != null) {
+            conn.close();
+        }
+    }
+
+    @Test
+    void testAllOperations() throws Exception {
+        // All state types should be get together to avoid query database 
frequently.
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+        assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Put data to cache, and it shouldn't exist in database.
+        var value1 = "value1";
+        jobStateView.put(COLLECTED_METRICS, value1);
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value1);
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+
+        var value2 = "value2";
+        jobStateView.put(SCALING_HISTORY, value2);
+        assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value2);
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0);
+
+        // Test creating together.
+        jobStateView.flush();
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Test updating data to cache, and they aren't updated in database.
+        var value3 = "value3";
+        jobStateView.put(COLLECTED_METRICS, value3);
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value3);
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1);
+
+        var value4 = "value4";
+        jobStateView.put(SCALING_HISTORY, value4);
+        assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value4);
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Test updating together.
+        jobStateView.flush();
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value3);
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value4);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 1, 1);
+
+        // Test deleting data from cache, and they aren't deleted in database.
+        jobStateView.remove(COLLECTED_METRICS);
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value3);
+
+        jobStateView.remove(SCALING_HISTORY);
+        assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value4);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 1, 1);
+
+        // Test updating together.
+        jobStateView.flush();
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+        assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 1, 1);
+    }
+
+    @Test
+    void testAvoidUnnecessaryFlushes() throws Exception {
+        var value1 = "value1";
+        jobStateView.put(COLLECTED_METRICS, value1);
+        jobStateView.flush();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Avoid unnecessary flush for creating.
+        jobStateView.flush();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Avoid unnecessary flush for deleting.
+        jobStateView.clear();
+        jobStateView.flush();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1);
+        jobStateView.flush();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1);
+
+        // Avoid unnecessary flush even if clear is called..
+        jobStateView.clear();
+        jobStateView.flush();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 0, 1);
+    }
+
+    @Test
+    void testCreateDeleteAndUpdateWorkTogether() throws Exception {
+        var value1 = "value1";
+        var value2 = "value2";
+        var value3 = "value3";
+        var value4 = "value4";
+        // Create 2 state types first.
+        jobStateView.put(COLLECTED_METRICS, value1);
+        jobStateView.put(SCALING_HISTORY, value2);
+        jobStateView.flush();
+        assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Delete one, update one and create one.
+        jobStateView.remove(COLLECTED_METRICS);
+        jobStateView.put(SCALING_HISTORY, value3);
+        jobStateView.put(SCALING_TRACKING, value4);
+
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1);
+        assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value3);
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2);
+        assertThat(jobStateView.get(SCALING_TRACKING)).isEqualTo(value4);
+        assertThat(getValueFromDatabase(SCALING_TRACKING)).isEmpty();
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 1);
+
+        // Flush!
+        jobStateView.flush();
+        assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+        assertStateValueForCacheAndDatabase(SCALING_HISTORY, value3);
+        assertStateValueForCacheAndDatabase(SCALING_TRACKING, value4);
+        jdbcStateInteractor.assertCountableJdbcInteractor(1, 1, 1, 2);
+
+        // Build the new JobStateView
+        var newJobStateView = new JobStateView(jdbcStateInteractor, 
DEFAULT_JOB_KEY);
+        assertThat(newJobStateView.get(COLLECTED_METRICS)).isNull();
+        assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+        assertThat(newJobStateView.get(SCALING_HISTORY)).isEqualTo(value3);
+        assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value3);
+        assertThat(newJobStateView.get(SCALING_TRACKING)).isEqualTo(value4);
+        assertThat(getValueFromDatabase(SCALING_TRACKING)).hasValue(value4);
+        jdbcStateInteractor.assertCountableJdbcInteractor(2, 1, 1, 2);
+    }
+
+    private void assertStateValueForCacheAndDatabase(StateType stateType, 
String expectedValue)
+            throws Exception {
+        assertThat(jobStateView.get(stateType)).isEqualTo(expectedValue);
+        assertThat(getValueFromDatabase(stateType)).hasValue(expectedValue);
+    }
+
+    private Optional<String> getValueFromDatabase(StateType stateType) throws 
Exception {
+        var jdbcInteractor = new JdbcStateInteractor(conn);
+        return 
Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java
new file mode 100644
index 00000000..ab92458c
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/StateTypeTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StateType}. */
+class StateTypeTest {
+
+    @Test
+    void testIdentifierAreDifferent() {
+        var identifierSet = new HashSet<String>();
+        for (StateType stateType : StateType.values()) {
+            
assertThat(identifierSet).doesNotContain(stateType.getIdentifier());
+            identifierSet.add(stateType.getIdentifier());
+        }
+    }
+
+    @Test
+    void testIdentifierCanBeParsed() {
+        for (StateType stateType : StateType.values()) {
+            
assertThat(StateType.createFromIdentifier(stateType.getIdentifier()))
+                    .isSameAs(stateType);
+        }
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
new file mode 100644
index 00000000..261578b3
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases;
+
+import java.sql.Connection;
+
+/** Database testing. */
+public interface DatabaseTest {
+
+    Connection getConnection() throws Exception;
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
new file mode 100644
index 00000000..b04c4df7
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+/** The extension of Derby. */
+public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, 
AfterEachCallback {
+
+    private static final List<String> TABLES = 
List.of("t_flink_autoscaler_state_store");
+    private static final String JDBC_URL = "jdbc:derby:memory:test";
+
+    public Connection getConnection() throws Exception {
+        return DriverManager.getConnection(JDBC_URL);
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext extensionContext) throws Exception {
+        DriverManager.getConnection(String.format("%s;create=true", 
JDBC_URL)).close();
+
+        var stateStoreDDL =
+                "CREATE TABLE t_flink_autoscaler_state_store\n"
+                        + "(\n"
+                        + "    id            BIGINT NOT NULL GENERATED ALWAYS 
AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
+                        + "    update_time   TIMESTAMP NOT NULL DEFAULT 
CURRENT_TIMESTAMP,\n"
+                        + "    job_key       VARCHAR(191) NOT NULL,\n"
+                        + "    state_type    VARCHAR(100) NOT NULL,\n"
+                        + "    state_value   CLOB NOT NULL,\n"
+                        + "    PRIMARY KEY (id)\n"
+                        + ")\n";
+
+        var createIndex =
+                "CREATE UNIQUE INDEX un_job_state_type_inx ON 
t_flink_autoscaler_state_store (job_key, state_type)";
+        try (var conn = getConnection();
+                var st = conn.createStatement()) {
+            st.execute(stateStoreDDL);
+            st.execute(createIndex);
+        }
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) throws Exception {
+        try (var conn = getConnection();
+                var st = conn.createStatement()) {
+            for (var tableName : TABLES) {
+                st.executeUpdate(String.format("DROP TABLE %s", tableName));
+            }
+        }
+        try {
+            DriverManager.getConnection(String.format("%s;shutdown=true", 
JDBC_URL)).close();
+        } catch (SQLException ignored) {
+        }
+    }
+
+    @Override
+    public void afterEach(ExtensionContext extensionContext) throws Exception {
+        // Clean up all data
+        try (var conn = getConnection();
+                var st = conn.createStatement()) {
+            for (var tableName : TABLES) {
+                st.executeUpdate(String.format("DELETE from %s", tableName));
+            }
+        }
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
new file mode 100644
index 00000000..e0ac3f76
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** Derby database for testing. */
+public interface DerbyTestBase extends DatabaseTest {
+
+    @RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension();
+
+    @Override
+    default Connection getConnection() throws Exception {
+        return DERBY_EXTENSION.getConnection();
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
new file mode 100644
index 00000000..f2b1858c
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 5.6.x database for testing. */
+public interface MySQL56TestBase extends DatabaseTest {
+
+    @RegisterExtension MySQLExtension MYSQL_EXTENSION = new 
MySQLExtension("5.6.51");
+
+    default Connection getConnection() throws Exception {
+        return MYSQL_EXTENSION.getConnection();
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
new file mode 100644
index 00000000..0b8a6968
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 5.7.x database for testing. */
+public interface MySQL57TestBase extends DatabaseTest {
+
+    @RegisterExtension MySQLExtension MYSQL_EXTENSION = new 
MySQLExtension("5.7.41");
+
+    default Connection getConnection() throws Exception {
+        return MYSQL_EXTENSION.getConnection();
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
new file mode 100644
index 00000000..daf77885
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 8.x database for testing. */
+public interface MySQL8TestBase extends DatabaseTest {
+
+    @RegisterExtension MySQLExtension MYSQL_EXTENSION = new 
MySQLExtension("8.0.32");
+
+    default Connection getConnection() throws Exception {
+        return MYSQL_EXTENSION.getConnection();
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
new file mode 100644
index 00000000..40c67a06
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.MySQLContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+/** The extension of MySQL. */
+class MySQLExtension implements BeforeAllCallback, AfterAllCallback, 
AfterEachCallback {
+
+    private static final String MYSQL_INIT_SCRIPT = 
"test_schema/mysql_schema.sql";
+    private static final String DATABASE_NAME = "flink_autoscaler";
+    private static final String USER_NAME = "root";
+    private static final String PASSWORD = "123456";
+    private static final List<String> TABLES = 
List.of("t_flink_autoscaler_state_store");
+
+    private final MySQLContainer<?> container;
+
+    public MySQLExtension(String mysqlVersion) {
+        this.container =
+                new MySQLContainer<>(String.format("mysql:%s", mysqlVersion))
+                        .withCommand("--character-set-server=utf8")
+                        .withDatabaseName(DATABASE_NAME)
+                        .withUsername(USER_NAME)
+                        .withPassword(PASSWORD)
+                        .withInitScript(MYSQL_INIT_SCRIPT)
+                        .withEnv("MYSQL_ROOT_HOST", "%");
+    }
+
+    public Connection getConnection() throws Exception {
+        return DriverManager.getConnection(
+                container.getJdbcUrl(), container.getUsername(), 
container.getPassword());
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext extensionContext) {
+        container.start();
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) {
+        container.stop();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext extensionContext) throws Exception {
+        try (var conn = getConnection();
+                var st = conn.createStatement()) {
+            for (var tableName : TABLES) {
+                st.executeUpdate(String.format("DELETE from %s", tableName));
+            }
+        }
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
new file mode 100644
index 00000000..de4d6e37
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+/** The extension of PostgreSQL. */
+class PostgreSQLExtension implements BeforeAllCallback, AfterAllCallback, 
AfterEachCallback {
+
+    private static final String INIT_SCRIPT = 
"test_schema/postgres_schema.sql";
+    private static final String DATABASE_NAME = "flink_autoscaler";
+    private static final String USER_NAME = "root";
+    private static final String PASSWORD = "123456";
+    private static final List<String> TABLES = 
List.of("t_flink_autoscaler_state_store");
+
+    private final PostgreSQLContainer<?> container;
+
+    public PostgreSQLExtension(String postgresqlVersion) {
+        this.container =
+                new PostgreSQLContainer<>(String.format("postgres:%s", 
postgresqlVersion))
+                        .withDatabaseName(DATABASE_NAME)
+                        .withUsername(USER_NAME)
+                        .withPassword(PASSWORD)
+                        .withInitScript(INIT_SCRIPT)
+                        .withEnv("POSTGRES_MAX_CONNECTIONS", "10");
+    }
+
+    public Connection getConnection() throws Exception {
+        return DriverManager.getConnection(
+                container.getJdbcUrl(), container.getUsername(), 
container.getPassword());
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext extensionContext) {
+        container.start();
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) {
+        container.stop();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext extensionContext) throws Exception {
+        try (var conn = getConnection();
+                var st = conn.createStatement()) {
+            for (var tableName : TABLES) {
+                st.executeUpdate(String.format("DELETE from %s", tableName));
+            }
+        }
+    }
+}
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
new file mode 100644
index 00000000..1e27a6e1
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** PostgreSQL database for testing. */
+public interface PostgreSQLTestBase extends DatabaseTest {
+
+    @RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new 
PostgreSQLExtension("15.1");
+
+    default Connection getConnection() throws Exception {
+        return POSTGRE_SQL_EXTENSION.getConnection();
+    }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties 
b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties
new file mode 100644
index 00000000..5866179a
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} 
%highlight{[%-5level] %msg%n%throwable}
+
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
new file mode 100644
index 00000000..825038d5
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+create table `t_flink_autoscaler_state_store`
+(
+    `id`            bigint       not null auto_increment,
+    `update_time`   datetime     not null default current_timestamp on update 
current_timestamp comment 'update time',
+    `job_key`       varchar(191) not null comment 'The job key',
+    `state_type`    varchar(100) not null comment 'The state type',
+    `state_value`   longtext     not null comment 'The real state',
+    primary key (`id`) using btree,
+    unique key `un_job_state_type_inx` (`job_key`,`state_type`) using btree
+) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
+
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
new file mode 100644
index 00000000..beb90a75
--- /dev/null
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE t_flink_autoscaler_state_store
+(
+    id            BIGSERIAL     NOT NULL,
+    update_time   TIMESTAMP     NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    job_key       TEXT          NOT NULL,
+    state_type    TEXT          NOT NULL,
+    state_value   TEXT          NOT NULL,
+    PRIMARY KEY (id),
+    UNIQUE (job_key, state_type)
+);
diff --git a/pom.xml b/pom.xml
index 42adf488..716fb9ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
         <module>flink-kubernetes-docs</module>
         <module>flink-autoscaler</module>
         <module>flink-autoscaler-standalone</module>
+        <module>flink-autoscaler-plugin-jdbc</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/flink-beam-example</module>
         <module>examples/kubernetes-client-examples</module>

Reply via email to