This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 7385f1c73 [Subtask]: add "bucket_id" field to the table_runtime table
to support balanced table partitioning in master-slave mode. (#3783)
7385f1c73 is described below
commit 7385f1c73b42a8920a19d8e97d55b6121490565e
Author: can <[email protected]>
AuthorDate: Thu Sep 25 10:37:16 2025 +0800
[Subtask]: add "bucket_id" field to the table_runtime table to support
balanced table partitioning in master-slave mode. (#3783)
* [Subtask]: add "bucket_id" field to the table_identifier table to support
balanced table partitioning in master-slave mode. #3782
* [Subtask]: add "bucket_id" field to the table_identifier table to support
balanced table partitioning in master-slave mode. #3782
* [Subtask]: add "bucket_id" field to the table_identifier table to support
balanced table partitioning in master-slave mode. #3782
* [Subtask]: add "bucket_id" field to the table_identifier table to support
balanced table partitioning in master-slave mode. #3782
* [Subtask]: add "bucket_id" field to the table_identifier table to support
balanced table partitioning in master-slave mode. #3782
---------
Co-authored-by: wardli <[email protected]>
---
.../amoro/server/persistence/TableRuntimeMeta.java | 10 ++++++++++
.../server/persistence/mapper/TableRuntimeMapper.java | 19 +++++++++++--------
amoro-ams/src/main/resources/derby/ams-derby-init.sql | 3 ++-
amoro-ams/src/main/resources/mysql/ams-mysql-init.sql | 1 +
amoro-ams/src/main/resources/mysql/upgrade.sql | 2 ++
.../src/main/resources/postgres/ams-postgres-init.sql | 5 +++--
amoro-ams/src/main/resources/postgres/upgrade.sql | 3 +++
7 files changed, 32 insertions(+), 11 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java
index b994efce3..d5979c3a0 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java
@@ -32,6 +32,7 @@ public class TableRuntimeMeta {
private long statusCodeUpdateTime = System.currentTimeMillis();
private Map<String, String> tableConfig = new HashMap<>();
private TableSummary tableSummary = new TableSummary();
+ private String bucketId;
public TableRuntimeMeta copy() {
TableRuntimeMeta meta = new TableRuntimeMeta();
@@ -41,6 +42,7 @@ public class TableRuntimeMeta {
meta.setStatusCodeUpdateTime(this.statusCodeUpdateTime);
meta.setTableConfig(Maps.newHashMap(this.tableConfig));
meta.setTableSummary(this.tableSummary.copy());
+ meta.setBucketId(this.bucketId);
return meta;
}
@@ -68,6 +70,10 @@ public class TableRuntimeMeta {
return tableSummary == null ? new TableSummary() : tableSummary;
}
+ public String getBucketId() {
+ return bucketId;
+ }
+
public void setTableId(Long tableId) {
this.tableId = tableId;
}
@@ -92,4 +98,8 @@ public class TableRuntimeMeta {
public void setTableSummary(TableSummary tableSummary) {
this.tableSummary = tableSummary;
}
+
+ public void setBucketId(String bucketId) {
+ this.bucketId = bucketId;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
index d93599057..3cfb69f2e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
@@ -39,10 +39,11 @@ public interface TableRuntimeMapper {
@Insert(
"INSERT INTO "
+ TABLE_NAME
- + " (table_id, group_name, status_code, table_config, table_summary)
"
+ + " (table_id, group_name, status_code, table_config, table_summary,
bucket_id) "
+ "VALUES (#{tableId}, #{groupName}, #{statusCode}, "
+
"#{tableConfig,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter},
"
- +
"#{tableSummary,typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter})")
+ +
"#{tableSummary,typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter},
"
+ + "#{bucketId, jdbcType=VARCHAR})")
int insertRuntime(TableRuntimeMeta meta);
/* ---------- update ---------- */
@@ -53,7 +54,8 @@ public interface TableRuntimeMapper {
+ " status_code = #{statusCode}, "
+ " status_code_update_time = #{statusCodeUpdateTime,
typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, "
+ " table_config =
#{tableConfig,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter},
"
- + " table_summary =
#{tableSummary,typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter}
"
+ + " table_summary =
#{tableSummary,typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter},
"
+ + " bucket_id = #{bucketId, jdbcType=VARCHAR} "
+ " WHERE table_id = #{tableId}")
int updateRuntime(TableRuntimeMeta meta);
@@ -64,7 +66,7 @@ public interface TableRuntimeMapper {
/* ---------- select ---------- */
@Select(
"SELECT table_id, group_name, status_code, status_code_update_time, "
- + " table_config, table_summary "
+ + " table_config, table_summary, bucket_id "
+ "FROM "
+ TABLE_NAME
+ " WHERE table_id = #{tableId}")
@@ -73,7 +75,6 @@ public interface TableRuntimeMapper {
value = {
@Result(column = "group_name", property = "groupName"),
@Result(column = "status_code", property = "statusCode"),
- @Result(column = "table_id", property = "tableId"),
@Result(
column = "status_code_update_time",
property = "statusCodeUpdateTime",
@@ -88,12 +89,14 @@ public interface TableRuntimeMapper {
column = "table_summary",
property = "tableSummary",
typeHandler =
org.apache.amoro.server.persistence.converter.JsonObjectConverter.class,
- jdbcType = JdbcType.VARCHAR)
+ jdbcType = JdbcType.VARCHAR),
+ @Result(column = "table_id", property = "tableId"),
+ @Result(column = "bucket_id", property = "bucketId")
})
TableRuntimeMeta selectRuntime(@Param("tableId") Long tableId);
String SELECT_COLS =
- " table_id, group_name, status_code, status_code_update_time,
table_config, table_summary ";
+ " table_id, group_name, status_code, status_code_update_time,
table_config, table_summary, bucket_id ";
@Select("SELECT " + SELECT_COLS + "FROM " + TABLE_NAME)
@ResultMap("tableRuntimeMeta")
@@ -104,7 +107,7 @@ public interface TableRuntimeMapper {
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'mysql'\" />"
+ "<bind name=\"isPostgreSQL\" value=\"_databaseId == 'postgres'\"
/>"
+ "<bind name=\"isDerby\" value=\"_databaseId == 'derby'\" />"
- + "SELECT r.table_id, group_name, status_code,
status_code_update_time, table_config, table_summary FROM "
+ + "SELECT r.table_id, group_name, status_code,
status_code_update_time, table_config, table_summary, bucket_id FROM "
+ TABLE_NAME
+ " r JOIN table_identifier i "
+ " ON r.table_id = i.table_id "
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 84c490ba0..9cbd133e4 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -106,7 +106,8 @@ CREATE TABLE table_runtime (
status_code INTEGER NOT NULL DEFAULT 700,
status_code_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
table_config CLOB,
- table_summary CLOB
+ table_summary CLOB,
+ bucket_id VARCHAR(4)
);
CREATE INDEX idx_status_time_desc
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index c916b93f0..3f4c9031e 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -117,6 +117,7 @@ CREATE TABLE `table_runtime`
`status_code_update_time` timestamp(3) default CURRENT_TIMESTAMP(3)
COMMENT 'Table runtime status code update time',
`table_config` mediumtext COMMENT 'table configuration
cached from table.properties',
`table_summary` mediumtext COMMENT 'table summary for ams',
+ `bucket_id` VARCHAR(4) DEFAULT NULL COMMENT 'Bucket id to which
the record table belongs',
PRIMARY KEY (`table_id`),
INDEX idx_status_and_time (status_code, status_code_update_time DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Table running information of
each table' ROW_FORMAT=DYNAMIC;
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index 634ba9696..266f43045 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -19,6 +19,8 @@
-- Update the precision from s level to ms.
ALTER TABLE `table_runtime` MODIFY COLUMN `optimizing_status_start_time`
TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'Table optimize status start
time';
+ALTER TABLE `table_runtime` ADD COLUMN `bucket_id` VARCHAR(4) DEFAULT NULL
COMMENT 'Bucket number to which the record table belongs'
+
-- Update processId to SnowflakeId
UPDATE `table_optimizing_process` SET `process_id` = `process_id` /10 << 13;
UPDATE `task_runtime` SET `process_id` = `process_id` /10 << 13;
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 20eca96fa..bc4375cd8 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -176,7 +176,8 @@ create table if not exists table_runtime (
status_code int not null default 700,
status_code_update_time timestamptz not null default now(),
table_config text,
- table_summary text
+ table_summary text,
+ bucket_id varchar(4)
);
create index if not exists idx_status_and_time
@@ -187,7 +188,7 @@ comment on column table_runtime.status_code is 'Table
runtime status code.';
comment on column table_runtime.status_code_update_time is 'Table runtime
status code update time';
comment on column table_runtime.table_config is 'table configuration cached
from table.properties';
comment on column table_runtime.table_summary is 'table summary for ams';
-
+comment on column table_runtime.bucket_id is 'Bucket number to which the
record table belongs';
create table if not exists table_runtime_state (
state_id bigserial primary key,
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 5fb79348e..401b62b2f 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -212,3 +212,6 @@ SELECT table_id,'optimizing_state',
FROM table_runtime_old;
DROP TABLE IF EXISTS table_runtime_old;
+
+-- ADD bucket_id to table_runtime
+ALTER TABLE table_runtime ADD COLUMN bucket_id varchar(4);
\ No newline at end of file