This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 733b1b54 [FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly 
writes update_time
733b1b54 is described below

commit 733b1b54e23c7486418bdca706f85a97b896f469
Author: Rui Fan <[email protected]>
AuthorDate: Tue Feb 6 18:37:59 2024 +0800

    [FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly writes 
update_time
---
 .../autoscaler/jdbc/state/JdbcStateInteractor.java | 23 ++++++++++++++--------
 .../src/main/resources/schema/derby_schema.sql     |  4 ++--
 .../src/main/resources/schema/mysql_schema.sql     |  2 +-
 .../src/main/resources/schema/postgres_schema.sql  | 16 +--------------
 .../testutils/databases/derby/DerbyExtension.java  |  4 ++--
 .../test/resources/test_schema/mysql_schema.sql    |  2 +-
 .../test/resources/test_schema/postgres_schema.sql |  2 +-
 7 files changed, 23 insertions(+), 30 deletions(-)

diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
index 4c2f9a0f..7c12bf83 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
@@ -21,6 +21,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
+import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -76,18 +78,20 @@ public class JdbcStateInteractor {
             String jobKey, List<StateType> createdStateTypes, Map<StateType, 
String> data)
             throws Exception {
         var query =
-                "INSERT INTO t_flink_autoscaler_state_store (job_key, 
state_type, state_value) values (?, ?, ?)";
+                "INSERT INTO t_flink_autoscaler_state_store (update_time, 
job_key, state_type, state_value) values (?, ?, ?, ?)";
+        var updateTime = Timestamp.from(Instant.now());
         try (var pstmt = conn.prepareStatement(query)) {
             for (var stateType : createdStateTypes) {
-                pstmt.setString(1, jobKey);
-                pstmt.setString(2, stateType.getIdentifier());
+                pstmt.setTimestamp(1, updateTime);
+                pstmt.setString(2, jobKey);
+                pstmt.setString(3, stateType.getIdentifier());
 
                 String stateValue = data.get(stateType);
                 checkState(
                         stateValue != null,
                         "The state value shouldn't be null during inserting. "
                                 + "It may be a bug, please raise a JIRA to 
Flink Community.");
-                pstmt.setString(3, stateValue);
+                pstmt.setString(4, stateValue);
                 pstmt.addBatch();
             }
             pstmt.executeBatch();
@@ -99,18 +103,21 @@ public class JdbcStateInteractor {
             String jobKey, List<StateType> updatedStateTypes, Map<StateType, 
String> data)
             throws Exception {
         var query =
-                "UPDATE t_flink_autoscaler_state_store set state_value = ? 
where job_key = ? and state_type = ?";
+                "UPDATE t_flink_autoscaler_state_store set update_time = ?, 
state_value = ? where job_key = ? and state_type = ?";
 
+        var updateTime = Timestamp.from(Instant.now());
         try (var pstmt = conn.prepareStatement(query)) {
             for (var stateType : updatedStateTypes) {
+                pstmt.setTimestamp(1, updateTime);
+
                 String stateValue = data.get(stateType);
                 checkState(
                         stateValue != null,
                         "The state value shouldn't be null during inserting. "
                                 + "It may be a bug, please raise a JIRA to 
Flink Community.");
-                pstmt.setString(1, stateValue);
-                pstmt.setString(2, jobKey);
-                pstmt.setString(3, stateType.getIdentifier());
+                pstmt.setString(2, stateValue);
+                pstmt.setString(3, jobKey);
+                pstmt.setString(4, stateType.getIdentifier());
                 pstmt.addBatch();
             }
             pstmt.executeBatch();
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
index c8defefe..eb5cab6a 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
@@ -17,8 +17,8 @@
 
 CREATE TABLE t_flink_autoscaler_state_store
 (
-    id            BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1),
-    update_time   TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    id            BIGINT       NOT NULL GENERATED ALWAYS AS IDENTITY (START 
WITH 1, INCREMENT BY 1),
+    update_time   TIMESTAMP    NOT NULL,
     job_key       VARCHAR(191) NOT NULL,
     state_type    VARCHAR(100) NOT NULL,
     state_value   CLOB NOT NULL,
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
index 7a6c2d04..5e72512e 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
@@ -22,7 +22,7 @@ use `flink_autoscaler`;
 create table `t_flink_autoscaler_state_store`
 (
     `id`            bigint       not null auto_increment,
-    `update_time`   datetime     not null default current_timestamp on update 
current_timestamp comment 'update time',
+    `update_time`   datetime     not null comment 'The update time',
     `job_key`       varchar(191) not null comment 'The job key',
     `state_type`    varchar(100) not null comment 'The state type',
     `state_value`   longtext     not null comment 'The real state',
diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
index d9fadfc0..4d8abc22 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
@@ -21,24 +21,10 @@ CREATE DATABASE flink_autoscaler;
 CREATE TABLE t_flink_autoscaler_state_store
 (
     id            BIGSERIAL     NOT NULL,
-    update_time   TIMESTAMP     NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    update_time   TIMESTAMP     NOT NULL,
     job_key       TEXT          NOT NULL,
     state_type    TEXT          NOT NULL,
     state_value   TEXT          NOT NULL,
     PRIMARY KEY (id),
     UNIQUE (job_key, state_type)
 );
-
-CREATE OR REPLACE FUNCTION update_flink_autoscaler_update_time_column()
-RETURNS TRIGGER AS $$
-BEGIN
-   NEW.update_time = CURRENT_TIMESTAMP;
-RETURN NEW;
-END;
-$$ language 'plpgsql';
-
-CREATE TRIGGER update_t_flink_autoscaler_state_store_modtime
-    BEFORE UPDATE ON t_flink_autoscaler_state_store
-    FOR EACH ROW
-    EXECUTE FUNCTION update_flink_autoscaler_update_time_column();
-
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
index b04c4df7..0c78d978 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -44,8 +44,8 @@ public class DerbyExtension implements BeforeAllCallback, 
AfterAllCallback, Afte
         var stateStoreDDL =
                 "CREATE TABLE t_flink_autoscaler_state_store\n"
                         + "(\n"
-                        + "    id            BIGINT NOT NULL GENERATED ALWAYS 
AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
-                        + "    update_time   TIMESTAMP NOT NULL DEFAULT 
CURRENT_TIMESTAMP,\n"
+                        + "    id            BIGINT       NOT NULL GENERATED 
ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
+                        + "    update_time   TIMESTAMP    NOT NULL,\n"
                         + "    job_key       VARCHAR(191) NOT NULL,\n"
                         + "    state_type    VARCHAR(100) NOT NULL,\n"
                         + "    state_value   CLOB NOT NULL,\n"
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
index 825038d5..2fcd468c 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
@@ -18,7 +18,7 @@
 create table `t_flink_autoscaler_state_store`
 (
     `id`            bigint       not null auto_increment,
-    `update_time`   datetime     not null default current_timestamp on update 
current_timestamp comment 'update time',
+    `update_time`   datetime     not null comment 'The update time',
     `job_key`       varchar(191) not null comment 'The job key',
     `state_type`    varchar(100) not null comment 'The state type',
     `state_value`   longtext     not null comment 'The real state',
diff --git 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
index beb90a75..0f405d10 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
+++ 
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
@@ -18,7 +18,7 @@
 CREATE TABLE t_flink_autoscaler_state_store
 (
     id            BIGSERIAL     NOT NULL,
-    update_time   TIMESTAMP     NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    update_time   TIMESTAMP     NOT NULL,
     job_key       TEXT          NOT NULL,
     state_type    TEXT          NOT NULL,
     state_value   TEXT          NOT NULL,

Reply via email to