This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 320904471e1b4c975404598dcf045beaa284f6cf Author: xstorm1 <[email protected]> AuthorDate: Tue Aug 3 14:42:10 2021 +0800 add dbinit module, it will create tables into user's database, the first version can support MySql only --- pom.xml | 2 +- .../streams/common/configure/ConfigureFileKey.java | 1 + .../rocketmq/streams/db/driver/orm/ORMUtil.java | 23 +++ rocketmq-streams-dbinit/pom.xml | 38 ++++ .../streams/dbinit/mysql/delegate/DBDelegate.java | 9 + .../dbinit/mysql/delegate/DBDelegateFactory.java | 27 +++ .../streams/dbinit/mysql/delegate/DBType.java | 6 + .../dbinit/mysql/delegate/MysqlDelegate.java | 48 +++++ .../src/main/resources/tables_mysql_innodb.sql | 199 +++++++++++++++++++++ 9 files changed, 352 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3675792..2969816 100644 --- a/pom.xml +++ b/pom.xml @@ -31,13 +31,13 @@ <module>rocketmq-streams-dim</module> <module>rocketmq-streams-transport-minio</module> <module>rocketmq-streams-script</module> - <module>rocketmq-streams-script-python</module> <module>rocketmq-streams-configurable</module> <module>rocketmq-streams-serviceloader</module> <module>rocketmq-streams-filter</module> <module>rocketmq-streams-schedule</module> <module>rocketmq-streams-lease</module> <module>rocketmq-streams-db-operator</module> + <module>rocketmq-streams-dbinit</module> <module>rocketmq-streams-window</module> <module>rocketmq-streams-clients</module> <module>rocketmq-streams-channel-rocketmq</module> diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java index 316a3b2..e6a796d 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java @@ -27,6 +27,7 @@ public interface ConfigureFileKey { /** * 数据库url */ + String DB_TYPE = "dipper.rds.jdbc.type"; String JDBC_URL = "dipper.rds.jdbc.url"; String JDBC_USERNAME = "dipper.rds.jdbc.username"; String JDBC_PASSWORD = "dipper.rds.jdbc.password"; diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java index 20529b0..670cb76 100644 --- a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java @@ -161,6 +161,29 @@ public class ORMUtil { } } + + public static boolean executeSQL(String sql, Object paras, String driver, final String url, final String userName, + final String password) { + if (paras != null) { + sql = SQLUtil.parseIbatisSQL(paras, sql); + } + JDBCDriver dataSource = null; + try { + dataSource = DriverBuilder.createDriver(driver, url, userName, password); + dataSource.execute(sql); + return true; + } catch (Exception e) { + String errorMsg = ("execute sql error ,the sql is " + sql + ". the error msg is " + e.getMessage()); + LOG.error(errorMsg); + e.printStackTrace(); + throw new RuntimeException(errorMsg, e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + /** * 把一个对象的字段拼接成where条件,如果字段值为null,不拼接 * diff --git a/rocketmq-streams-dbinit/pom.xml b/rocketmq-streams-dbinit/pom.xml new file mode 100644 index 0000000..3434616 --- /dev/null +++ b/rocketmq-streams-dbinit/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-streams</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>rocketmq-streams-dbinit</artifactId> + <name>ROCKETMQ STREAMS :: dbinit</name> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + + <includes> + <include>**/*.sql</include> + <include>**/*.properties</include> + </includes> + + + <filtering>true</filtering> + </resource> + </resources> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-db-operator</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java new file mode 100644 index 0000000..2737fb5 --- /dev/null +++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegate.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.streams.dbinit.mysql.delegate; + +public interface DBDelegate { + + public void init(String driver, String url, String userName, + String password); + + public void init(); +} diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java new file mode 100644 index 0000000..3b02516 --- /dev/null +++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBDelegateFactory.java @@ -0,0 +1,27 @@ +package org.apache.rocketmq.streams.dbinit.mysql.delegate; + +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; + +public class DBDelegateFactory { + + public static DBDelegate getDelegate() { + String dbType = ComponentCreator.getProperties().getProperty(ConfigureFileKey.DB_TYPE); + if (dbType == null || "".equalsIgnoreCase(dbType)) { + dbType = DBType.DB_MYSQL; + } + if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) { + return new MysqlDelegate(); + } + + return new MysqlDelegate(); + } + + public static DBDelegate getDelegate(String dbType) { + if (DBType.DB_MYSQL.equalsIgnoreCase(dbType)) { + return new MysqlDelegate(); + } + + return new MysqlDelegate(); + } +} diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java new file mode 100644 index 0000000..d8908f8 --- /dev/null +++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/DBType.java @@ -0,0 +1,6 @@ +package org.apache.rocketmq.streams.dbinit.mysql.delegate; + +public class DBType { + + public static final String DB_MYSQL = "MYSQL"; +} diff --git a/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java new file mode 100644 index 0000000..1295368 --- /dev/null +++ b/rocketmq-streams-dbinit/src/main/java/org/apache/rocketmq/streams/dbinit/mysql/delegate/MysqlDelegate.java @@ -0,0 +1,48 @@ +package org.apache.rocketmq.streams.dbinit.mysql.delegate; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; + +import java.io.IOException; +import java.net.URL; + +public class MysqlDelegate implements DBDelegate { + + public static final Log LOG = LogFactory.getLog(MysqlDelegate.class); + + + @Override + public void init(String driver, final String url, final String userName, + final String password) { + String[] sqls = loadSqls(); + for (String sql : sqls) { + ORMUtil.executeSQL(sql, null, driver, url, userName, password); + } + } + + @Override + public void init() { + String[] sqls = loadSqls(); + for (String sql : sqls) { + ORMUtil.executeSQL(sql, null); + } + } + + private String[] loadSqls() { + String[] sqls = null; + URL url = this.getClass().getClassLoader().getResource("tables_mysql_innodb.sql"); + try { + String tables = FileUtil.loadFileContent(url.openStream()); + sqls = tables.split(";"); + if (LOG.isDebugEnabled()) { + LOG.debug("Init db sqls : " + tables); + } + } catch (IOException e) { + e.printStackTrace(); + } + return sqls; + } + +} diff --git a/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql new file mode 100644 index 0000000..dc5771f --- /dev/null +++ b/rocketmq-streams-dbinit/src/main/resources/tables_mysql_innodb.sql @@ -0,0 +1,199 @@ +CREATE TABLE IF NOT EXISTS `window_max_value` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `max_value` bigint(20) unsigned NOT NULL, + `max_event_time` bigint(20) unsigned NOT NULL, + `msg_key` varchar(256) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk__ket` (`msg_key`(250)), + KEY `idx_modifytime` (`gmt_modified`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `window_value` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `start_time` varchar(20) NOT NULL, + `end_time` varchar(20) NOT NULL, + `max_offset` longtext, + `group_by` text, + `agg_column_result` longtext, + `computed_column_result` longtext, + `version` varchar(64) DEFAULT NULL, + `name_space` varchar(256) DEFAULT NULL, + `configure_name` varchar(256) DEFAULT NULL, + `msg_key` varchar(64) NOT NULL, + `window_instance_id` varchar(64) NOT NULL, + `partition` varchar(512) DEFAULT NULL, + `partition_num` bigint(20) DEFAULT NULL, + `fire_time` varchar(20) DEFAULT NULL, + `update_version` bigint(20) unsigned DEFAULT NULL, + `update_flag` bigint(20) DEFAULT NULL, + `window_instance_partition_id` varchar(64) DEFAULT NULL, + `type` varchar(64) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_window_state` (`msg_key`), + KEY `idx_window_instance_shuffle` (`window_instance_partition_id`,`partition_num`), + KEY `idx_window_instance_firetime` (`window_instance_partition_id`,`fire_time`), + KEY `idx_window` (`name_space`(128),`configure_name`(128),`partition`(128)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `window_task` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + `task_id` varchar(64) NOT NULL, + `untreated_flag` int(11) NOT NULL DEFAULT '0', + `group_by_value` varchar(1024) NOT NULL, + `task_owner` varchar(256) NOT NULL, + `task_send_time` datetime DEFAULT NULL, + `send_task_msg` text NOT NULL, + `msg_send_time` bigint(20) DEFAULT NULL, + `name` varchar(128) NOT NULL, + `start_time` varchar(20) NOT NULL, + `end_time` varchar(20) NOT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_taskid` (`task_id`), + KEY `idx_flag_modifytime` (`name`,`untreated_flag`,`gmt_modified`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `window_instance` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `start_time` varchar(20) NOT NULL, + `end_time` varchar(20) NOT NULL, + `fire_time` varchar(20) NOT NULL, + `window_name` varchar(128) NOT NULL, + `window_name_space` varchar(128) NOT NULL, + `status` tinyint(4) NOT NULL DEFAULT '0', + `version` int(11) DEFAULT '0', + `window_instance_key` varchar(128) DEFAULT NULL, + `window_instance_name` varchar(128) DEFAULT NULL, + `window_Instance_split_name` varchar(128) DEFAULT NULL, + `split_id` varchar(128) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_window_instance_uniq_index` (`window_instance_key`), + KEY `idx_gmt_modified` (`fire_time`,`window_name`,`window_name_space`,`status`), + KEY `idx_windowinstance_name` (`window_instance_name`), + KEY `idx_windowinstance_split_name` (`window_Instance_split_name`), + KEY `idx_windowinstance_split_name_firetime` (`window_Instance_split_name`,`fire_time`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `lease_info` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `lease_name` varchar(255) NOT NULL, + `lease_user_ip` varchar(255) NOT NULL, + `lease_end_time` varchar(255) NOT NULL, + `status` int(11) NOT NULL DEFAULT '1', + `version` bigint(20) NOT NULL, + `candidate_lease_ip` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_name` (`lease_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `dipper_sql_configure` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `namespace` varchar(32) NOT NULL, + `type` varchar(32) NOT NULL, + `name` varchar(128) NOT NULL, + `json_value` longtext NOT NULL, + `request_id` varchar(128) NOT NULL, + `account_id` varchar(32) NOT NULL, + `account_name` varchar(32) NOT NULL, + `account_nickname` varchar(32) NOT NULL, + `client_ip` varchar(64) NOT NULL, + `status` tinyint(3) unsigned NOT NULL DEFAULT '0', + `is_publish` int(11) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`), + KEY `idx_namespace` (`namespace`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `dipper_configure` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `gmt_create` datetime NOT NULL, + `gmt_modified` datetime NOT NULL, + `namespace` varchar(32) NOT NULL, + `type` varchar(32) NOT NULL, + `name` varchar(128) NOT NULL, + `json_value` text NOT NULL, + `request_id` varchar(128) NOT NULL, + `account_id` varchar(32) NOT NULL, + `account_name` varchar(32) NOT NULL, + `account_nickname` varchar(32) NOT NULL, + `client_ip` varchar(64) NOT NULL, + `status` tinyint(3) unsigned NOT NULL DEFAULT '0', + `isPublish` int(1) NOT NULL DEFAULT '0', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`), + KEY `idx_namespace` (`namespace`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `join_right_state` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `gmt_create` datetime DEFAULT NULL, + `gmt_modified` datetime DEFAULT NULL, + `window_id` bigint(20) DEFAULT NULL, + `window_name` varchar(200) DEFAULT NULL, + `window_name_space` varchar(45) DEFAULT NULL, + `message_id` varchar(200) DEFAULT NULL, + `message_key` varchar(32) DEFAULT NULL, + `message_time` datetime DEFAULT NULL, + `message_body` longtext, + `msg_key` varchar(400) DEFAULT NULL, + `window_instance_id` varchar(200) DEFAULT NULL, + `partition` varchar(200) DEFAULT NULL, + `partition_num` bigint(20) DEFAULT NULL, + `window_instance_partition_id` varchar(200) DEFAULT NULL, + `version` varchar(64) DEFAULT NULL, + `update_flag` bigint(20) DEFAULT NULL, + `name_space` varchar(256) DEFAULT NULL, + `configure_name` varchar(256) DEFAULT NULL, + `type` varchar(64) DEFAULT NULL, + `name` varchar(64) DEFAULT NULL, + `update_version` bigint(20) unsigned DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_message_id_unique` (`message_id`), + KEY `idx_message_key_index` (`message_key`), + KEY `idx_gmt_create_index` (`gmt_create`), + KEY `idx_window_name_index` (`window_name`(70)), + KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `join_left_state` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `gmt_create` datetime DEFAULT NULL, + `gmt_modified` datetime DEFAULT NULL, + `window_id` bigint(20) DEFAULT NULL, + `window_name` varchar(200) DEFAULT NULL, + `window_name_space` varchar(45) DEFAULT NULL, + `message_id` varchar(200) DEFAULT NULL, + `message_key` varchar(32) DEFAULT NULL, + `message_time` datetime DEFAULT NULL, + `message_body` longtext, + `msg_key` varchar(400) DEFAULT NULL, + `window_instance_id` varchar(200) DEFAULT NULL, + `partition` varchar(200) DEFAULT NULL, + `partition_num` bigint(20) DEFAULT NULL, + `window_instance_partition_id` varchar(200) DEFAULT NULL, + `version` varchar(64) DEFAULT NULL, + `update_flag` bigint(20) DEFAULT NULL, + `name_space` varchar(256) DEFAULT NULL, + `configure_name` varchar(256) DEFAULT NULL, + `type` varchar(64) DEFAULT NULL, + `name` varchar(64) DEFAULT NULL, + `update_version` bigint(20) unsigned DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_message_id_unique` (`message_id`), + KEY `idx_message_key_index` (`message_key`), + KEY `idx_gmt_create_index` (`gmt_create`), + KEY `idx_window_name_index` (`window_name`(70)), + KEY `idx_message_key_gmt_create_index` (`message_key`,`gmt_create`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +
