This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 733b1b54 [FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly
writes update_time
733b1b54 is described below
commit 733b1b54e23c7486418bdca706f85a97b896f469
Author: Rui Fan <[email protected]>
AuthorDate: Tue Feb 6 18:37:59 2024 +0800
[FLINK-34389][autoscaler] JdbcAutoscalerStateStore explicitly writes
update_time
---
.../autoscaler/jdbc/state/JdbcStateInteractor.java | 23 ++++++++++++++--------
.../src/main/resources/schema/derby_schema.sql | 4 ++--
.../src/main/resources/schema/mysql_schema.sql | 2 +-
.../src/main/resources/schema/postgres_schema.sql | 16 +--------------
.../testutils/databases/derby/DerbyExtension.java | 4 ++--
.../test/resources/test_schema/mysql_schema.sql | 2 +-
.../test/resources/test_schema/postgres_schema.sql | 2 +-
7 files changed, 23 insertions(+), 30 deletions(-)
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
index 4c2f9a0f..7c12bf83 100644
---
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
+++
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java
@@ -21,6 +21,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.sql.Timestamp;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -76,18 +78,20 @@ public class JdbcStateInteractor {
String jobKey, List<StateType> createdStateTypes, Map<StateType,
String> data)
throws Exception {
var query =
- "INSERT INTO t_flink_autoscaler_state_store (job_key,
state_type, state_value) values (?, ?, ?)";
+ "INSERT INTO t_flink_autoscaler_state_store (update_time,
job_key, state_type, state_value) values (?, ?, ?, ?)";
+ var updateTime = Timestamp.from(Instant.now());
try (var pstmt = conn.prepareStatement(query)) {
for (var stateType : createdStateTypes) {
- pstmt.setString(1, jobKey);
- pstmt.setString(2, stateType.getIdentifier());
+ pstmt.setTimestamp(1, updateTime);
+ pstmt.setString(2, jobKey);
+ pstmt.setString(3, stateType.getIdentifier());
String stateValue = data.get(stateType);
checkState(
stateValue != null,
"The state value shouldn't be null during inserting. "
+ "It may be a bug, please raise a JIRA to
Flink Community.");
- pstmt.setString(3, stateValue);
+ pstmt.setString(4, stateValue);
pstmt.addBatch();
}
pstmt.executeBatch();
@@ -99,18 +103,21 @@ public class JdbcStateInteractor {
String jobKey, List<StateType> updatedStateTypes, Map<StateType,
String> data)
throws Exception {
var query =
- "UPDATE t_flink_autoscaler_state_store set state_value = ?
where job_key = ? and state_type = ?";
+ "UPDATE t_flink_autoscaler_state_store set update_time = ?,
state_value = ? where job_key = ? and state_type = ?";
+ var updateTime = Timestamp.from(Instant.now());
try (var pstmt = conn.prepareStatement(query)) {
for (var stateType : updatedStateTypes) {
+ pstmt.setTimestamp(1, updateTime);
+
String stateValue = data.get(stateType);
checkState(
stateValue != null,
"The state value shouldn't be null during inserting. "
+ "It may be a bug, please raise a JIRA to
Flink Community.");
- pstmt.setString(1, stateValue);
- pstmt.setString(2, jobKey);
- pstmt.setString(3, stateType.getIdentifier());
+ pstmt.setString(2, stateValue);
+ pstmt.setString(3, jobKey);
+ pstmt.setString(4, stateType.getIdentifier());
pstmt.addBatch();
}
pstmt.executeBatch();
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
index c8defefe..eb5cab6a 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/derby_schema.sql
@@ -17,8 +17,8 @@
CREATE TABLE t_flink_autoscaler_state_store
(
- id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1,
INCREMENT BY 1),
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START
WITH 1, INCREMENT BY 1),
+ update_time TIMESTAMP NOT NULL,
job_key VARCHAR(191) NOT NULL,
state_type VARCHAR(100) NOT NULL,
state_value CLOB NOT NULL,
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
index 7a6c2d04..5e72512e 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/mysql_schema.sql
@@ -22,7 +22,7 @@ use `flink_autoscaler`;
create table `t_flink_autoscaler_state_store`
(
`id` bigint not null auto_increment,
- `update_time` datetime not null default current_timestamp on update
current_timestamp comment 'update time',
+ `update_time` datetime not null comment 'The update time',
`job_key` varchar(191) not null comment 'The job key',
`state_type` varchar(100) not null comment 'The state type',
`state_value` longtext not null comment 'The real state',
diff --git
a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
index d9fadfc0..4d8abc22 100644
--- a/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
+++ b/flink-autoscaler-plugin-jdbc/src/main/resources/schema/postgres_schema.sql
@@ -21,24 +21,10 @@ CREATE DATABASE flink_autoscaler;
CREATE TABLE t_flink_autoscaler_state_store
(
id BIGSERIAL NOT NULL,
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time TIMESTAMP NOT NULL,
job_key TEXT NOT NULL,
state_type TEXT NOT NULL,
state_value TEXT NOT NULL,
PRIMARY KEY (id),
UNIQUE (job_key, state_type)
);
-
-CREATE OR REPLACE FUNCTION update_flink_autoscaler_update_time_column()
-RETURNS TRIGGER AS $$
-BEGIN
- NEW.update_time = CURRENT_TIMESTAMP;
-RETURN NEW;
-END;
-$$ language 'plpgsql';
-
-CREATE TRIGGER update_t_flink_autoscaler_state_store_modtime
- BEFORE UPDATE ON t_flink_autoscaler_state_store
- FOR EACH ROW
- EXECUTE FUNCTION update_flink_autoscaler_update_time_column();
-
diff --git
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
index b04c4df7..0c78d978 100644
---
a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
+++
b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -44,8 +44,8 @@ public class DerbyExtension implements BeforeAllCallback,
AfterAllCallback, Afte
var stateStoreDDL =
"CREATE TABLE t_flink_autoscaler_state_store\n"
+ "(\n"
- + " id BIGINT NOT NULL GENERATED ALWAYS
AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
- + " update_time TIMESTAMP NOT NULL DEFAULT
CURRENT_TIMESTAMP,\n"
+ + " id BIGINT NOT NULL GENERATED
ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
+ + " update_time TIMESTAMP NOT NULL,\n"
+ " job_key VARCHAR(191) NOT NULL,\n"
+ " state_type VARCHAR(100) NOT NULL,\n"
+ " state_value CLOB NOT NULL,\n"
diff --git
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
index 825038d5..2fcd468c 100644
---
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
+++
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/mysql_schema.sql
@@ -18,7 +18,7 @@
create table `t_flink_autoscaler_state_store`
(
`id` bigint not null auto_increment,
- `update_time` datetime not null default current_timestamp on update
current_timestamp comment 'update time',
+ `update_time` datetime not null comment 'The update time',
`job_key` varchar(191) not null comment 'The job key',
`state_type` varchar(100) not null comment 'The state type',
`state_value` longtext not null comment 'The real state',
diff --git
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
index beb90a75..0f405d10 100644
---
a/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
+++
b/flink-autoscaler-plugin-jdbc/src/test/resources/test_schema/postgres_schema.sql
@@ -18,7 +18,7 @@
CREATE TABLE t_flink_autoscaler_state_store
(
id BIGSERIAL NOT NULL,
- update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time TIMESTAMP NOT NULL,
job_key TEXT NOT NULL,
state_type TEXT NOT NULL,
state_value TEXT NOT NULL,