This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 9c898ecf0c5301f1ae1d0c144f15f96d2151ba96 Author: muyang <[email protected]> AuthorDate: Mon Aug 2 12:21:28 2021 +0800 add module db-operator、transport-minio --- rocketmq-streams-db-operator/pom.xml | 34 ++ .../rocketmq-streams-db-operator.iml | 16 + .../streams/db/configuable/DBConfigureService.java | 282 ++++++++++++ .../DBSupportParentConfigureService.java | 37 ++ .../rocketmq/streams/db/driver/DriverBuilder.java | 111 +++++ .../rocketmq/streams/db/driver/IDriverBudiler.java | 36 ++ .../rocketmq/streams/db/driver/JDBCDriver.java | 277 ++++++++++++ .../db/driver/batchloader/BatchRowLoader.java | 179 ++++++++ .../db/driver/batchloader/IRowOperator.java | 33 ++ .../rocketmq/streams/db/driver/orm/ORMUtil.java | 490 +++++++++++++++++++++ .../rocketmq/streams/db/operator/SQLOperator.java | 178 ++++++++ .../org/apache/rocketmq/streams/db/Person.java | 110 +++++ .../DBSupportParentConfigureServiceTest.java | 74 ++++ .../streams/db/driver/orm/ORMUtilTest.java | 86 ++++ rocketmq-streams-transport-minio/pom.xml | 25 ++ .../rocketmq-streams-transport-minio.iml | 17 + .../transport/minio/MinioFileTransport.java | 141 ++++++ .../yundun/dipper/configurable/DataTpyeTest.java | 70 +++ .../streams/configuable/model/DataTpyeTest.java | 68 +++ .../rocketmq/streams/configuable/model/Person.java | 97 ++++ .../streams/configurable/model/Person.java | 97 ++++ .../component/ConfigurableComponent.properties | 7 + .../src/test/resources/log4j.xml | 20 + .../src/test/resources/pro-function.txt | 11 + .../src/test/resources/python_script.py | 22 + 25 files changed, 2518 insertions(+) diff --git a/rocketmq-streams-db-operator/pom.xml b/rocketmq-streams-db-operator/pom.xml new file mode 100755 index 0000000..9a9b17b --- /dev/null +++ b/rocketmq-streams-db-operator/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="utf-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>rocketmq-streams-db-operator</artifactId> + <name>ROCKETMQ STREAMS :: db-operator</name> + <packaging>jar</packaging> + <dependencies> + + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-configurable</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + </dependency> + + + </dependencies> +</project> diff --git a/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml new file mode 100644 index 0000000..38ffb14 --- /dev/null +++ b/rocketmq-streams-db-operator/rocketmq-streams-db-operator.iml @@ -0,0 +1,16 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> + <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5"> + <output url="file://$MODULE_DIR$/${project.build.directory}/classes" /> + <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> + <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" /> + <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> + </content> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + </component> +</module> \ No newline at end of file diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java new file mode 100644 index 0000000..ef319eb --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBConfigureService.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.configuable; + +import com.alibaba.fastjson.JSONObject; +import org.apache.rocketmq.streams.common.component.AbstractComponent; +import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable; +import org.apache.rocketmq.streams.common.configurable.IConfigurable; +import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService; +import org.apache.rocketmq.streams.configurable.model.Configure; + +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.common.interfaces.IPropertyEnable; +import org.apache.rocketmq.streams.common.utils.AESUtil; +import org.apache.rocketmq.streams.common.utils.MapKeyUtil; +import org.apache.rocketmq.streams.common.utils.SQLUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; + +import java.util.*; + +/** + * Configuable对象存储在db中,是生成环境常用的一种模式 数据库参数可以配置在配置文件中,ConfiguableComponent在启动时,会把参数封装在Properties中,调用DBConfigureService(Properties properties) 构造方法完成实例创建 + */ + +public class DBConfigureService extends AbstractConfigurableService implements IPropertyEnable { + + private static final Log LOG = LogFactory.getLog(DBConfigureService.class); + private String jdbcdriver; + private String url; + private String userName; + private String password; + private String tableName = "dipper_configure"; + @Deprecated + private boolean isCompatibilityOldRuleEngine = false;//兼容老规则引擎使用,正常场景不需要理会 + + public DBConfigureService(String jdbcdriver, String url, String userName, String password) { + this(jdbcdriver, url, userName, password, null); + } + + public DBConfigureService(String jdbcdriver, String url, String userName, String password, String tableName) { + this.url = url; + this.jdbcdriver = jdbcdriver; + this.userName = userName; + this.password = password; + this.tableName = tableName; + LOG.info("DBConfigureService resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + + ",username:" + userName + ",password:" + password); + regJdbcDriver(jdbcdriver); + } + + public DBConfigureService() { + } + + /** + * @param properties + */ + public DBConfigureService(Properties properties) { + super(properties); + initProperty(properties); + } + + @Override + protected GetConfigureResult loadConfigurable(String namespace) { + GetConfigureResult result = new GetConfigureResult(); + try { + List<Configure> configures = selectOpening(namespace); + List<IConfigurable> configurables = convert(configures); + result.setConfigurables(configurables); + result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置 + } catch (Exception e) { + result.setQuerySuccess(false); + LOG.error("load configurable error ", e); + } + return result; + } + + protected List<Configure> selectOpening(String namespace) { + return queryConfigureByNamespace(namespace); + } + + protected List<Configure> queryConfigureByNamespace(String... namespaces) { + return queryConfigureByNamespaceInner(null, namespaces); + } + + protected List<Configure> queryConfigureByNamespaceInner(String type, String... namespaces) { + JDBCDriver resource = createResouce(); + try { + String namespace = "namespace"; + if (isCompatibilityOldRuleEngine && AbstractComponent.JDBC_COMPATIBILITY_RULEENGINE_TABLE_NAME.equals(tableName)) { + namespace = "name_space"; + } + String sql = "SELECT * FROM `" + tableName + "` WHERE " + namespace + " in (" + SQLUtil.createInSql(namespaces) + ") and status =1"; + if (StringUtil.isNotEmpty(type)) { + sql = sql + " and type='" + type + "'"; + } + JSONObject jsonObject = new JSONObject(); + jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces)); + sql = SQLUtil.parseIbatisSQL(jsonObject, sql); + // String builder = "SELECT * FROM `" + tableName + "` WHERE namespace ='" + namespace + "' and status =1"; + List<Map<String, Object>> result = resource.queryForList(sql); + if (result == null) { + return new ArrayList<Configure>(); + } + // LOG.info("load configurable's count is " + result.size()); + return convert2Configure(result); + } finally { + if (resource != null) { + resource.destroy(); + } + } + } + + @Override + public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) { + List<Configure> configures = queryConfigureByNamespace(namespaces); + List<IConfigurable> configurables = convert(configures); + return configurables; + } + + public static void main(String[] args) { + String[] namespaces = new String[] {"rule1", null}; + String sql = "SELECT * FROM `dipper_configure` WHERE namespace in (" + SQLUtil.createInSql(namespaces) + ") and status =1"; + JSONObject jsonObject = new JSONObject(); + jsonObject.put("namespace", MapKeyUtil.createKeyBySign(",", namespaces)); + sql = SQLUtil.parseIbatisSQL(jsonObject, sql); + System.out.println(sql); + } + + protected void saveOrUpdate(IConfigurable configure) { + JDBCDriver jdbcDataSource = createResouce(); + String sql = AbstractConfigurable.createSQL(configure, this.tableName); + try { + jdbcDataSource.executeInsert(sql); + } catch (Exception e) { + LOG.error("DBConfigureService saveOrUpdate error,sqlnode:" + sql); + throw new RuntimeException(e); + } finally { + if (jdbcDataSource != null) { + jdbcDataSource.destroy(); + } + } + } + + protected List<Configure> convert2Configure(List<Map<String, Object>> rows) { + List<Configure> configures = new ArrayList<Configure>(); + for (Map<String, Object> row : rows) { + Configure configure = new Configure(); + Long id = getColumnValue(row, "id"); + configure.setId(id); + Date create = getColumnValue(row, "gmt_create"); + configure.setGmtCreate(create); + Date modify = getColumnValue(row, "gmt_modified"); + configure.setGmtModified(modify); + String namespace = getColumnValue(row, "namespace"); + if (StringUtil.isEmpty(namespace)) { + namespace = getColumnValue(row, "name_space"); + } + configure.setNameSpace(namespace); + String type = getColumnValue(row, "type"); + configure.setType(type); + String name = getColumnValue(row, "name"); + configure.setName(name); + String jsonValue = getColumnValue(row, "json_value"); + try { + jsonValue = AESUtil.aesDecrypt(jsonValue, ConfigureFileKey.SECRECY); + } catch (Exception e) { + LOG.error("can't decrypt the value, reason:\t" + e.getCause()); + throw new RuntimeException(e); + } + configure.setJsonValue(jsonValue); + configures.add(configure); + } + return configures; + } + + @SuppressWarnings("unchecked") + protected <T> T getColumnValue(Map<String, Object> row, String columnName) { + Object value = row.get(columnName); + if (value == null) { + return null; + } + if (java.math.BigInteger.class.isInstance(value)) { + return (T)Long.valueOf(value.toString()); + } + return (T)value; + + } + + protected JDBCDriver createResouce() { + JDBCDriver resource = DriverBuilder.createDriver(this.jdbcdriver, this.url, this.userName, this.password); + return resource; + } + + public void setJdbcdriver(String jdbcdriver) { + this.jdbcdriver = jdbcdriver; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public void setPassword(String password) { + this.password = password; + } + + private void regJdbcDriver(String jdbcdriver) { + try { + if (StringUtil.isEmpty(jdbcdriver)) { + jdbcdriver = AbstractComponent.DEFAULT_JDBC_DRIVER; + } + Class.forName(jdbcdriver); + } catch (ClassNotFoundException e) { + LOG.error("DBConfigureService regJdbcDriver ClassNotFoundException error", e); + } catch (Exception e) { + LOG.error("DBConfigureService regJdbcDriver error", e); + } + } + + @Override + public void initProperty(Properties properties) { + this.jdbcdriver = properties.getProperty(AbstractComponent.JDBC_DRIVER); + regJdbcDriver(jdbcdriver); + this.url = properties.getProperty(AbstractComponent.JDBC_URL); + this.userName = properties.getProperty(AbstractComponent.JDBC_USERNAME); + this.password = properties.getProperty(AbstractComponent.JDBC_PASSWORD); + String tableName = properties.getProperty(AbstractComponent.JDBC_TABLE_NAME); + String isCompatibilityOldRuleEngine = properties.getProperty(AbstractComponent.JDBC_COMPATIBILITY_OLD_RULEENGINE); + if (StringUtil.isNotEmpty(isCompatibilityOldRuleEngine)) { + this.isCompatibilityOldRuleEngine = true; + } + if (StringUtil.isNotEmpty(tableName)) { + this.tableName = tableName; + } + LOG.info( + "Properties resource ,the info is: driver:" + this.jdbcdriver + ",url:" + this.url + ",username:" + userName + + ",password:" + password); + } + + @Override + protected void insertConfigurable(IConfigurable configurable) { + saveOrUpdate(configurable); + } + + @Override + protected void updateConfigurable(IConfigurable configurable) { + saveOrUpdate(configurable); + } + + @Override + public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) { + + List<Configure> configures = queryConfigureByNamespaceInner(type, namespace); + List<IConfigurable> configurables = convert(configures); + List<T> result = new ArrayList<>(); + for (IConfigurable configurable : configurables) { + result.add((T)configurable); + } + return result; + } +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java new file mode 100644 index 0000000..77b82e0 --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.configuable; + +import org.apache.rocketmq.streams.common.configurable.IConfigurableService; +import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType; +import org.apache.rocketmq.streams.common.model.ServiceName; +import com.google.auto.service.AutoService; +import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService; + +import java.util.Properties; + +@AutoService(IConfigurableService.class) +@ServiceName(ConfigurableServcieType.DEFAULT_SERVICE_NAME) +public class DBSupportParentConfigureService extends AbstractSupportParentConfigureService { + + @Override + protected void initBeforeInitConfigurable(Properties property) { + this.parentConfigureService = new DBConfigureService(property); + this.configureService = new DBConfigureService(property); + + } +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java new file mode 100644 index 0000000..c0e9f53 --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/DriverBuilder.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.rocketmq.streams.common.component.AbstractComponent; +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.utils.ReflectUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * 创建JDBCDriver,如果没有 + */ +public class DriverBuilder { + + private static final Log LOG = LogFactory.getLog(DriverBuilder.class); + + public static final String DEFALUT_JDBC_DRIVER = "com.mysql.jdbc.Driver"; + + private static final Map<String, JDBCDriver> dataSourceMap = new ConcurrentHashMap<>(); + + private static AtomicInteger count = new AtomicInteger(0); + + /** + * 使用ConfiguableComponent在属性文件配置的jdbc信息,dipper默认都是使用这个数据库连接 如果需要连接其他库,需要使用带参数的createDriver + * + * @return + */ + public static JDBCDriver createDriver() { + String driver = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_DRIVER); + String url = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_URL); + String userName = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_USERNAME); + String password = ComponentCreator.getProperties().getProperty(AbstractComponent.JDBC_PASSWORD); + return createDriver(driver, url, userName, password); + } + + /** + * 根据数据库连接信息创建连接,并返回JDBCDriver + * + * @param driver 数据库驱动,如果为null,默认为mysql + * @param url 数据库连接url + * @param userName 用户名 + * @param password 密码 + * @return JDBCDriver + */ + public static JDBCDriver createDriver(String driver, final String url, final String userName, + final String password) { + if (StringUtil.isEmpty(driver)) { + driver = DEFALUT_JDBC_DRIVER; + } + String className = ComponentCreator.getDBProxyClassName(); + if (StringUtil.isNotEmpty(className)) { + Class clazz = ReflectUtil.forClass(className); + try { + Constructor constructor = clazz.getConstructor( + new Class[] {String.class, String.class, String.class, String.class}); + JDBCDriver abstractDBDataSource = (JDBCDriver)constructor.newInstance(url, userName, password, + driver); + abstractDBDataSource.init(); + return abstractDBDataSource; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + final String jdbcdriver = driver; + ReflectUtil.forClass(jdbcdriver); + JDBCDriver resource = new JDBCDriver(); + LOG.debug("jdbcdriver=" + jdbcdriver + ",url=" + url); + resource.setJdbcDriver(jdbcdriver); + resource.setUrl(url); + resource.setUserName(userName); + resource.setPassword(password); + resource.init(); + return resource; + } + + /** + * 生成拼接字符串 + * + * @param url + * @param userName + * @param password + * @return + */ + private static String genereateKey(String url, String userName, String password) { + return url + "_" + userName + "_" + password; + } + +} + diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java new file mode 100644 index 0000000..6be77eb --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/IDriverBudiler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver; + +import org.apache.rocketmq.streams.common.dboperator.IDBDriver; + +/** + * 返回操作数据库的driver对象,并且提供方法,判断driver是否有效,以及销毁的方法 + */ +public interface IDriverBudiler { + + /** + * 和dipper系统同数据源 + * + * @return + */ + IDBDriver createDBDriver(); + + boolean isValidate(); + + void destroy(); +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java new file mode 100644 index 0000000..356bce6 --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/JDBCDriver.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver; + +import org.apache.rocketmq.streams.common.channel.sink.ISink; +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; +import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; +import org.apache.rocketmq.streams.common.dboperator.IDBDriver; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.SingleConnectionDataSource; +import org.springframework.jdbc.support.GeneratedKeyHolder; +import org.springframework.jdbc.support.KeyHolder; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * 数据库常用操作的封装,核心实现的接口是IJdbcTemplate 这个对象实现了IConfigurable接口,可以序列化存储和网络传输 数据库参数,可以配置成名字,实际值在配置文件配置 + * <p> + */ +public class JDBCDriver extends BasedConfigurable implements IDriverBudiler, IDBDriver { + private String jdbcDriver = DriverBuilder.DEFALUT_JDBC_DRIVER; + @ENVDependence + protected String url; + @ENVDependence + protected String userName; + @ENVDependence + protected String password; + + protected transient javax.sql.DataSource dataSource; + private transient IDBDriver dbDriver = null; + + public JDBCDriver(String url, String userName, String password, + String driver) { + setType(ISink.TYPE); + this.url = url; + this.userName = userName; + this.password = password; + if (StringUtil.isNotEmpty(driver)) { + this.jdbcDriver = driver; + } + } + + public JDBCDriver() { + setType(ISink.TYPE); + } + + protected IDBDriver createOrGetDriver() { + if (dbDriver == null) { + synchronized (this) { + if (dbDriver == null) { + dbDriver = createDBDriver(); + if (dataSource == null) { + dataSource = createDBDataSource(); + } + } + } + } + return dbDriver; + } + + @Override + public IDBDriver createDBDriver() { + javax.sql.DataSource dataSource = createDBDataSource(); + return new IDBDriver() { + private final JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); + + @Override + public int update(String sql) { + return jdbcTemplate.update(sql); + } + + @Override + public void execute(String sql) { + jdbcTemplate.execute(sql); + } + + @Override + public List<Map<String, Object>> queryForList(String sql) { + return jdbcTemplate.queryForList(sql); + } + + @Override + public Map<String, Object> queryOneRow(String sql) { + return jdbcTemplate.queryForMap(sql); + } + + @Override + public long executeInsert(String sql) { + try { + KeyHolder keyHolder = new GeneratedKeyHolder(); + jdbcTemplate.update(con -> con.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS), keyHolder); + if (keyHolder.getKeyList() == null || keyHolder.getKeyList().size() > 1 || keyHolder.getKey() == null) { + return 0; + } + return keyHolder.getKey().longValue(); + } catch (Exception e) { + String errorMsg = "execute builder error ,the builder is " + sql + ". the error msg is " + e.getMessage(); + throw new RuntimeException(errorMsg, e); + } + } + + @Override + public void executSqls(String... sqls) { + jdbcTemplate.batchUpdate(sqls); + } + + @Override + public void executSqls(Collection<String> sqlCollection) { + if (sqlCollection == null || sqlCollection.size() == 0) { + return; + } + String[] sqls = new String[sqlCollection.size()]; + int i = 0; + Iterator<String> it = sqlCollection.iterator(); + while (it.hasNext()) { + String sql = it.next(); + sqls[i] = sql; + i++; + } + executSqls(sqls); + } + + /** + * 分批获取数据,最终获取全量数据 + * @param sql 可执行的SQL + * @return 结果数据 + */ + @Override + public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) { + List<Map<String, Object>> rows = new ArrayList<>(); + int startBatch; + String baseSql = sql; + if (sql.contains(";")) { + baseSql = sql.substring(0, sql.indexOf(";")); + } + String batchSQL = baseSql + " limit 0," + batchSize; + List<Map<String, Object>> batchResult = queryForList(batchSQL); + int index = 1; + while (batchResult.size() >= batchSize) { + rows.addAll(batchResult); + startBatch = batchSize * index; + batchSQL = baseSql + " limit " + startBatch + "," + batchSize; + batchResult = queryForList(batchSQL); + index++; + } + rows.addAll(batchResult); + + return rows; + } + }; + } + + protected javax.sql.DataSource createDBDataSource() { + + SingleConnectionDataSource dataSource = new SingleConnectionDataSource(url, userName, password, true); + + dataSource.setDriverClassName(jdbcDriver);// add by 林行0221 + // 专有云落地运维中心联调时发现独立打包时必须加这句话,否则会报找不到驱动。(MYSQL驱动包虽然已经打进去了但还要在这里显示指定) + dataSource.setSuppressClose(true); + this.dataSource = dataSource; + return dataSource; + } + + @Override + public boolean isValidate() { + try { + if (dataSource == null) { + dataSource = createDBDataSource(); + } + dataSource.getConnection(); + } catch (SQLException e) { + return false; + } + return true; + } + + @Override + public void destroy() { + if (dataSource instanceof SingleConnectionDataSource) { + SingleConnectionDataSource data = (SingleConnectionDataSource)dataSource; + data.destroy(); + } + } + + public String getJdbcDriver() { + return jdbcDriver; + } + + public void setJdbcDriver(String jdbcDriver) { + this.jdbcDriver = jdbcDriver; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + @Override + public int update(String sql) { + return createOrGetDriver().update(sql); + } + + @Override + public void execute(String sql) { + createOrGetDriver().execute(sql); + } + + @Override + public List<Map<String, Object>> queryForList(String sql) { + return createOrGetDriver().queryForList(sql); + } + + @Override + public Map<String, Object> queryOneRow(String sql) { + return createOrGetDriver().queryOneRow(sql); + } + + @Override + public long executeInsert(String sql) { + return createOrGetDriver().executeInsert(sql); + } + + @Override + public void executSqls(String... sqls) { + createOrGetDriver().executSqls(sqls); + } + + @Override + public void executSqls(Collection<String> sqls) { + createOrGetDriver().executSqls(sqls); + } + + @Override + public List<Map<String, Object>> batchQueryBySql(String sql, int batchSize) { + return createOrGetDriver().batchQueryBySql(sql, batchSize); + } +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java new file mode 100644 index 0000000..3e4548c --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/BatchRowLoader.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver.batchloader; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import com.alibaba.fastjson.JSONObject; + +import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; +import org.apache.rocketmq.streams.common.utils.SQLUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; + +/** + * 多线程批量加载数据,每加载一批数据后,通过IRowOperator回调接口处理数据 需要有递增的字段,这个字段有索引,不重复,如id字段 + */ +public class BatchRowLoader { + private static final Log LOG = LogFactory.getLog(BatchRowLoader.class); + protected static final int MAX_LINE = 5000;//每个批次最大行数,根据这个值划分并行任务 + protected static ExecutorService executorService = null; + protected String idFieldName;//配置字段名称,这个字段的值是数字的,且是递增的 + protected String sql;//查询的sql语句,类似select * from table where idFieldName>#{idFieldName=0} order by idFieldName.不要加limit,系统会自动添加 + protected int batchSize = 1000;//每批从数据库加载的数据量 + protected IRowOperator dataRowProcessor;//加载的数据由这个回调接口处理 + private JDBCDriver jdbcDriver; + + public BatchRowLoader(String idFieldName, String sql, IRowOperator dataRowProcessor) { + this.idFieldName = idFieldName; + this.sql = sql; + this.dataRowProcessor = dataRowProcessor; + this.jdbcDriver = DriverBuilder.createDriver(); + executorService = new ThreadPoolExecutor(20, 20, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(1000)); + } + + public void startLoadData() { + try { + String statisticalSQL = sql; + int startIndex = sql.toLowerCase().indexOf("from"); + statisticalSQL = "select count(1) as c, min(" + idFieldName + ") as min, max(" + idFieldName + ") as max " + + sql.substring(startIndex); + List<Map<String, Object>> rows = jdbcDriver.queryForList(statisticalSQL); + Map<String, Object> row = rows.get(0); + int count = Integer.valueOf(row.get("c").toString()); + if (count == 0) { + LOG.warn("there is no data during execute sql: " + statisticalSQL); + return; + } + + IntValueKV intValueKV = new IntValueKV(count); + //int maxBatch=count/maxSyncCount;//每1w条数据,一个并发。如果数据量比较大,为了提高性能,并行执行 + + long min = Long.valueOf(row.get("min").toString()); + long max = Long.valueOf(row.get("max").toString()); + int maxSyncCount = count / MAX_LINE + 1; + long step = (max - min + 1) / maxSyncCount; + CountDownLatch countDownLatch = new CountDownLatch(maxSyncCount + 1); + AtomicInteger finishedCount = new AtomicInteger(0); + String taskSQL = null; + if (sql.indexOf(" where ") != -1) { + taskSQL = sql + " and " + idFieldName + ">#{startIndex} and " + idFieldName + "<=#{endIndex} order by " + + idFieldName + " limit " + batchSize; + } else { + taskSQL = sql + " where " + idFieldName + ">#{startIndex} and " + idFieldName + + "<=#{endIndex} order by " + idFieldName + " limit " + batchSize; + } + + int i = 0; + for (; i < maxSyncCount; i++) { + FetchDataTask + fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i, + (min - 1) + step * (i + 1), countDownLatch, finishedCount, jdbcDriver, count); + executorService.execute(fetchDataTask); + } + FetchDataTask + fetchDataTask = new FetchDataTask(taskSQL, (min - 1) + step * i, (min - 1) + step * (i + 1), + countDownLatch, finishedCount, jdbcDriver, count); + executorService.execute(fetchDataTask); + + countDownLatch.await(); + + LOG.info(getClass().getSimpleName() + " load data finish, load data line size is " + count); + } catch (Exception e) { + LOG.error("failed loading data batch!", e); + } finally { + jdbcDriver.destroy(); + } + } + + protected class FetchDataTask implements Runnable { + long startIndex; + long endIndex; + String sql; + CountDownLatch countDownLatch; + JDBCDriver resource; + AtomicInteger finishedCount;//完成了多少条 + int totalSize;//一共有多少条数据 + + public FetchDataTask(String sql, long startIndex, long endIndex, CountDownLatch countDownLatch, + AtomicInteger finishedCount, JDBCDriver resource, int totalSize) { + this.startIndex = startIndex; + this.endIndex = endIndex; + this.countDownLatch = countDownLatch; + this.sql = sql; + this.finishedCount = finishedCount; + this.resource = resource; + this.totalSize = totalSize; + } + + @Override + public void run() { + long currentIndex = startIndex; + JSONObject msg = new JSONObject(); + msg.put("endIndex", endIndex); + while (true) { + try { + + msg.put("startIndex", currentIndex); + + String sql = SQLUtil.parseIbatisSQL(msg, this.sql); + List<Map<String, Object>> rows = resource.queryForList(sql); + if (rows == null || rows.size() == 0) { + break; + } + currentIndex = Long.valueOf(rows.get(rows.size() - 1).get(idFieldName).toString()); + + int size = rows.size(); + int count = finishedCount.addAndGet(size); + double progress = (double)count / (double)totalSize; + progress = progress * 100; + System.out.println(" finished count is " + count + " the total count is " + totalSize + ", the progress is " + String.format("%.2f", progress) + "%"); + if (size < batchSize) { + if (size > 0) { + + doProcess(rows); + } + break; + } + doProcess(rows); + } catch (Exception e) { + throw new RuntimeException("put data error ", e); + } + } + + countDownLatch.countDown(); + } + } + + private void doProcess(List<Map<String, Object>> rows) { + for (Map<String, Object> row : rows) { + dataRowProcessor.doProcess(row); + } + } +} + diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java new file mode 100644 index 0000000..f67393f --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/batchloader/IRowOperator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver.batchloader; + +import java.util.Map; + +/** + * 操作一行数据 + */ +public interface IRowOperator { + + /** + * 处理一行数据 + * + * @param row + */ + void doProcess(Map<String, Object> row); + +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java new file mode 100644 index 0000000..20529b0 --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtil.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver.orm; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.common.model.Entity; +import org.apache.rocketmq.streams.common.configurable.IFieldProcessor; +import org.apache.rocketmq.streams.common.datatype.DataType; +import org.apache.rocketmq.streams.common.metadata.MetaData; +import org.apache.rocketmq.streams.common.utils.CollectionUtil; +import org.apache.rocketmq.streams.common.utils.DataTypeUtil; +import org.apache.rocketmq.streams.common.utils.DateUtil; +import org.apache.rocketmq.streams.common.utils.ReflectUtil; +import org.apache.rocketmq.streams.common.utils.SQLUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; + +/** + * 轻量级的orm框架,如果pojo和table 符合驼峰的命名和下划线的命名规范,可以自动实现对象的orm + */ +public class ORMUtil { + private static final Log LOG = LogFactory.getLog(ORMUtil.class); + + public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass) { + return queryForObject(sql, paras, convertClass, null, null, null); + } + + /** + * 通过sql查询一个唯一的对象出来,返回数据多于一条会报错 + * + * @param sql 查询语句 + * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @param convertClass,如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @param url 数据库连接url + * @param userName 用户名 + * @param password 密码 + * @param <T> + * @return 转换后的对象 + */ + public static <T> T queryForObject(String sql, Object paras, Class<T> convertClass, String url, String userName, + String password) { + List<T> result = queryForList(sql, paras, convertClass, url, userName, password); + if (result == null || result.size() == 0) { + return null; + } + if (result.size() > 1) { + throw new RuntimeException("expect only one row, actual " + result.size() + ". the builder is " + sql); + } + return result.get(0); + } + + /** + * 根据sql查询一批对象 + * + * @param sql 查询语句 + * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @param convertClass 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @param <T> + * @return 返回对象列表 + */ + public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass) { + return queryForList(sql, paras, convertClass, null, null, null); + } + + public static boolean hasConfigueDB() { + return ComponentCreator.getProperties().getProperty(ConfigureFileKey.JDBC_URL) != null; + } + + /** + * @param sql 查询语句 + * @param paras 如果有变参,这里面有变参的参数,可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @param convertClass 需要转换成的对象类。类的字段应该符合列名的命名规范,下划线换成驼峰形式 + * @param url 数据库连接url + * @param userName 用户名 + * @param password 密码 + * @param <T> + * @return 返回对象列表 + */ + public static <T> List<T> queryForList(String sql, Object paras, Class<T> convertClass, String url, String userName, + String password) { + sql = SQLUtil.parseIbatisSQL(paras, sql); + JDBCDriver dataSource = null; + try { + if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) { + dataSource = DriverBuilder.createDriver(); + } else { + dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password); + } + + List<Map<String, Object>> rows = dataSource.queryForList(sql); + List<T> result = new ArrayList(); + for (Map<String, Object> row : rows) { + T t = convert(row, convertClass); + result.add(t); + } + return result; + } catch (Exception e) { + String errorMsg = ("query for list error ,the builder is " + sql + ". the error msg is " + e.getMessage()); + LOG.error(errorMsg); + e.printStackTrace(); + throw new RuntimeException(errorMsg, e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + + /** + * 执行sql,sql中可以有mybatis的参数#{name} + * + * @param sql insert语句 + * @param paras 可以是map,json或对象,只要key名或字段名和sql的参数名相同即可 + * @return + */ + public static boolean executeSQL(String sql, Object paras) { + if (paras != null) { + sql = SQLUtil.parseIbatisSQL(paras, sql); + } + JDBCDriver dataSource = null; + try { + dataSource = DriverBuilder.createDriver(); + dataSource.execute(sql); + return true; + } catch (Exception e) { + String errorMsg = ("execute sql error ,the sql is " + sql + ". the error msg is " + e.getMessage()); + LOG.error(errorMsg); + e.printStackTrace(); + throw new RuntimeException(errorMsg, e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + + /** + * 把一个对象的字段拼接成where条件,如果字段值为null,不拼接 + * + * @param object 带拼接的对象 + * @param fieldNames 需要拼接的字段名,如果为null,返回null + * @return where 部分的sql + */ + public static String createQueryWhereSql(Object object, String... fieldNames) { + if (fieldNames == null || fieldNames.length == 0) { + return ""; + } + StringBuilder stringBuilder = new StringBuilder(); + boolean isFirst = true; + for (String fieldName : fieldNames) { + Object value = ReflectUtil.getBeanFieldValue(object, fieldName); + if (object != null && value == null) { + continue; + } + if (isFirst) { + isFirst = false; + } else { + stringBuilder.append(" and "); + } + String columnName = getColumnNameFromFieldName(fieldName); + stringBuilder.append(" " + columnName + "=#{" + fieldName + "} "); + } + return stringBuilder.toString(); + } + + /** + * 把一行数据转换成一个对象,符合驼峰的命名和下划线的命名规范 + * + * @param row 一行数据 + * @param clazz 待转化的对象类型 + * @param <T> + * @return 转化对象 + */ + public static <T> T convert(Map<String, Object> row, Class<T> clazz) { + T t = ReflectUtil.forInstance(clazz); + Iterator<Map.Entry<String, Object>> it = row.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<String, Object> entry = it.next(); + String columnName = entry.getKey(); + Object value = entry.getValue(); + if (value == null) { + continue; + } + String fieldName = getFieldNameFromColumnName(columnName); + DataType datatype = DataTypeUtil.createFieldDataType(clazz, fieldName); + Object columnValue = datatype.convert(value); + ReflectUtil.setBeanFieldValue(t, fieldName, columnValue); + } + return t; + } + + /** + * 把列名转换成字段名称,把下划线转化成驼峰 + * + * @param columnName + * @return + */ + protected static String getFieldNameFromColumnName(String columnName) { + String[] values = columnName.split("_"); + if (values.length == 1) { + return columnName; + } + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(values[0]); + for (int i = 1; i < values.length; i++) { + String value = values[i]; + value = value.substring(0, 1).toUpperCase() + value.substring(1); + stringBuilder.append(value); + } + return stringBuilder.toString(); + } + + /** + * 对象批量替换,会生成replace into语句,多个对象会拼接成一个sql,提升效率 + * + * @param values 待插入对象 + */ + public static void batchReplaceInto(Collection values) { + List list = new ArrayList<>(); + list.addAll(values); + batchReplaceInto(list); + } + + public static void batchReplaceInto(Object... valueArray) { + List values = new ArrayList(); + if (valueArray != null) { + for (Object value : valueArray) { + values.add(value); + } + } + batchReplaceInto(values); + } + + public static void batchReplaceInto(List<?> values) { + batchIntoByFlag(values, 1); + } + + /** + * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率 + * + * @param values 待插入对象 + */ + public static void batchIgnoreInto(List<?> values) { + batchIntoByFlag(values, -1); + } + + /** + * 对象批量插入,如果主键冲突会忽略,会生成insert ignore into语句,多个对象会拼接成一个sql,提升效率 + * + * @param values 待插入对象 + */ + public static void batchInsertInto(List<?> values) { + batchIntoByFlag(values, 0); + } + + /** + * 批量插入对象,多个对象会拼接成一个sql flag==1 then replace into flag=-1 then insert ignore int flag=0 then insert int + * + * @param values + * @param flag + */ + protected static void batchIntoByFlag(List<?> values, int flag) { + if (CollectionUtil.isEmpty(values)) { + return; + } + Object object = values.get(0); + Map<String, Object> paras = new HashMap<>(16); + MetaData metaData = createMetaDate(object, paras); + boolean containsIdField = false; + if (metaData.getIdFieldName() != null) { + for (Object o : values) { + Object id = ReflectUtil.getDeclaredField(o, metaData.getIdFieldName()); + if (id == null) { + containsIdField = false; + break; + } + if (id instanceof Number) { + if (Long.valueOf(id.toString()) == 0) { + containsIdField = false; + break; + } + } + if (id instanceof String) { + String idStr = (String)id; + if (StringUtil.isEmpty(idStr)) { + containsIdField = false; + break; + } + } + } + } + + String sql = null; + if (flag == 0) { + sql = SQLUtil.createInsertSql(metaData, paras, containsIdField); + } else if (flag == 1) { + sql = SQLUtil.createInsertSql(metaData, paras, containsIdField); + } else if (flag == -1) { + sql = SQLUtil.createIgnoreInsertSql(metaData, paras, containsIdField); + } else { + throw new RuntimeException("the flag is not valdate " + flag); + } + + List<Map<String, Object>> rows = new ArrayList<>(); + for (int i = 1; i < values.size(); i++) { + Map<String, Object> row = createRow(metaData, values.get(i)); + rows.add(row); + } + String valuesSQL = SQLUtil.createInsertValuesSQL(metaData, rows); + sql = sql + valuesSQL + " ON DUPLICATE KEY UPDATE " + SQLUtil.createDuplicateKeyUpdateSQL(metaData); + ; + JDBCDriver dataSource = DriverBuilder.createDriver(); + try { + dataSource.execute(sql); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + + private static Map<String, Object> createRow(MetaData metaData, Object object) { + Map<String, Object> row = new HashMap<>(); + ReflectUtil.scanFields(object, new IFieldProcessor() { + @Override + public void doProcess(Object o, Field field) { + String fieldName = field.getName(); + String columnName = getColumnNameFromFieldName(fieldName); + Object value = ReflectUtil.getBeanFieldValue(o, fieldName); + row.put(columnName, value); + + } + }); + return row; + } + + public static void replaceInto(Object object) { + replaceInto(object, null, null, null); + } + + /** + * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线。如果有重复到会被替换成最新的 + * + * @param object + * @param url + * @param userName + * @param password + */ + public static void replaceInto(Object object, String url, String userName, String password) { + Map<String, Object> paras = new HashMap<>(); + if (Entity.class.isInstance(object)) { + Entity newEntity = (Entity)object; + newEntity.setGmtModified(new Date()); + if (newEntity.getGmtCreate() == null) { + newEntity.setGmtCreate(new Date()); + } + } + MetaData metaData = createMetaDate(object, paras); + String sql = SQLUtil.createReplacesInsertSql(metaData, paras, metaData.getIdFieldName() != null); + JDBCDriver dataSource = null; + try { + if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) { + dataSource = DriverBuilder.createDriver(); + } else { + dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password); + } + long id = dataSource.executeInsert(sql); + if (Entity.class.isInstance(object)) { + Entity newEntity = (Entity)object; + newEntity.setId(id); + } + } catch (Exception e) { + String errorMsg = ("replace into error ,the builder is " + sql + ". the error msg is " + e.getMessage()); + LOG.error(errorMsg); + throw new RuntimeException(errorMsg, e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + + public static void insertInto(Object object, boolean ignoreRepeateRow) { + insertInto(object, ignoreRepeateRow, null, null, null); + } + + /** + * 把一个对象插入到数据库,对象符合插入规范,表名是对象名转小写后加下划线 + * + * @param object + * @param ignoreRepeateRow,如果是重复数据,则不插入。基于唯一建做判断 + * @param url + * @param userName + * @param password + */ + public static void insertInto(Object object, boolean ignoreRepeateRow, String url, String userName, + String password) { + Map<String, Object> paras = new HashMap<>(); + if (Entity.class.isInstance(object)) { + Entity newEntity = (Entity)object; + newEntity.setGmtCreate(DateUtil.getCurrentTime()); + newEntity.setGmtModified(DateUtil.getCurrentTime()); + } + MetaData metaData = createMetaDate(object, paras); + String sql = null; + if (ignoreRepeateRow) { + sql = SQLUtil.createIgnoreInsertSql(metaData, paras, metaData.getIdFieldName() != null); + } else { + sql = SQLUtil.createInsertSql(metaData, paras, metaData.getIdFieldName() != null); + } + JDBCDriver dataSource = null; + try { + if (StringUtil.isEmpty(url) || StringUtil.isEmpty(userName)) { + dataSource = DriverBuilder.createDriver(); + } else { + dataSource = DriverBuilder.createDriver(DriverBuilder.DEFALUT_JDBC_DRIVER, url, userName, password); + } + long id = dataSource.executeInsert(sql); + if (Entity.class.isInstance(object)) { + Entity newEntity = (Entity)object; + newEntity.setId(id); + } + } catch (Exception e) { + String errorMsg = ("insert into error ,the builder is " + sql + ". the error msg is " + e.getMessage()); + LOG.error(errorMsg); + throw new RuntimeException(errorMsg, e); + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + } + + /** + * 创建meta信息 + * + * @param object + * @param paras + * @return + */ + public static MetaData createMetaDate(Object object, Map<String, Object> paras) { + MetaData metaData = MetaData.createMetaDate(object, paras); + return metaData; + } + + public static String getTableName(Class clazz) { + return getColumnNameFromFieldName(clazz.getSimpleName()); + } + + /** + * 把驼峰转换成下划线的形式 + * + * @param para + * @return + */ + protected static String getColumnNameFromFieldName(String para) { + return MetaData.getColumnNameFromFieldName(para); + } + +} diff --git a/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java new file mode 100644 index 0000000..1f42470 --- /dev/null +++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/operator/SQLOperator.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.operator; + +import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.streams.common.component.AbstractComponent; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; +import org.apache.rocketmq.streams.common.configurable.annotation.Changeable; +import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; +import org.apache.rocketmq.streams.common.interfaces.IStreamOperator; +import org.apache.rocketmq.streams.common.context.AbstractContext; +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.topology.ChainStage; +import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder; +import org.apache.rocketmq.streams.common.topology.stages.NewSQLChainStage; +import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder; +import org.apache.rocketmq.streams.common.utils.SQLUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * sql算法,执行一个sql,sql中可以有变量,会用message的值做替换。 + */ +public class SQLOperator extends BasedConfigurable implements IStreamOperator<IMessage, IMessage>, IStageBuilder<ChainStage> { + private static final Log LOG = LogFactory.getLog(SQLOperator.class); + public static final String DEFALUT_DATA_KEY = "data"; + + @ENVDependence + protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER; + @ENVDependence + protected String url; + @ENVDependence + protected String userName; + @ENVDependence + protected String password; + + @Changeable + protected String sql;//查询的sql,支持ibatis的语法和变量.因为会被替换,所以不自动感知。select * from table where name=#{name=chris} + + public SQLOperator() { + setType(IStreamOperator.TYPE); + } + + public SQLOperator(String sql, String url, String userName, String password) { + this(); + this.sql = sql; + this.url = url; + this.password = password; + this.userName = userName; + } + + /** + * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名 + * + * @param sql + * @param dbInfoNamePrex + */ + public SQLOperator(String sql, String dbInfoNamePrex) { + this(); + if (StringUtil.isEmpty(dbInfoNamePrex)) { + dbInfoNamePrex = getConfigureName(); + } + if (StringUtil.isEmpty(dbInfoNamePrex)) { + dbInfoNamePrex = this.getClass().getSimpleName(); + } + this.sql = sql; + this.url = dbInfoNamePrex + ".url"; + this.password = dbInfoNamePrex + ".password"; + ; + this.userName = dbInfoNamePrex + ".userName"; + } + + @Override + public IMessage doMessage(IMessage message, AbstractContext context) { + String querySQL = SQLUtil.parseIbatisSQL(message.getMessageBody(), sql); + List<Map<String, Object>> result = query(querySQL); + message.getMessageBody().put(DEFALUT_DATA_KEY, result); + return message; + } + + /** + * 查询数据库数据 + * + * @return + */ + protected List<Map<String, Object>> query(String querySQL) { + + JDBCDriver dataSource = null; + try { + dataSource = createDBDataSource(); + List<Map<String, Object>> result = null; + result = dataSource.queryForList(sql); + + return result; + } finally { + if (dataSource != null) { + dataSource.destroy(); + } + } + + } + + public JDBCDriver createDBDataSource() { + return DriverBuilder.createDriver(jdbcDriver, url, userName, password); + } + + @Override + public void addConfigurables(PipelineBuilder pipelineBuilder) { + pipelineBuilder.addConfigurables(this); + } + + @Override + public ChainStage createStageChain(PipelineBuilder pipelineBuilder) { + NewSQLChainStage sqlChainStage = new NewSQLChainStage(); + sqlChainStage.setMessageProcessor(this); + return sqlChainStage; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getJdbcDriver() { + return jdbcDriver; + } + + public void setJdbcDriver(String jdbcDriver) { + this.jdbcDriver = jdbcDriver; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + +} diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java new file mode 100644 index 0000000..3d5e51b --- /dev/null +++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/Person.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; +import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; + +public class Person extends BasedConfigurable { + @ENVDependence + private String name; + private int age; + private Boolean isMale; + private List<String> addresses; + private Map<String, Integer> childName2Age; + + public static Person createPerson(String namespace) { + Person person = new Person(); + person.setNameSpace(namespace); + person.setType("person"); + person.setConfigureName("Chris"); + person.setName("Chris"); + List<String> addresses = new ArrayList<>(); + addresses.add("huilongguan"); + addresses.add("shangdi"); + person.setAddresses(addresses); + Map<String, Integer> childName2Age = new HashMap<>(); + childName2Age.put("yuanyahan", 8); + childName2Age.put("yuanruxi", 4); + person.setChildName2Age(childName2Age); + person.setMale(true); + person.setAge(18); + return person; + } + + @Override + public String toString() { + return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses + + ", childName2Age=" + childName2Age + '}'; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public Boolean getMale() { + return isMale; + } + + public void setMale(Boolean male) { + isMale = male; + } + + public List<String> getAddresses() { + return addresses; + } + + public void setAddresses(List<String> addresses) { + this.addresses = addresses; + } + + public Map<String, Integer> getChildName2Age() { + return childName2Age; + } + + public void setChildName2Age(Map<String, Integer> childName2Age) { + this.childName2Age = childName2Age; + } + + @Override + public Object clone() { + Person person = null; + try { + person = (Person)super.clone(); + } catch (CloneNotSupportedException e) { + System.out.println("clone error " + e); + } + return person; + } +} diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java new file mode 100644 index 0000000..3baa65d --- /dev/null +++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/configuable/DBSupportParentConfigureServiceTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.configuable; + +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.configurable.ConfigurableComponent; +import org.apache.rocketmq.streams.configurable.model.Configure; +import org.apache.rocketmq.streams.db.Person; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; +import org.junit.Test; + +import static junit.framework.TestCase.assertTrue; + +/** + * 数据库的存储,需要配置存储的连接参数,请先完成配置,后执行单元用例 如果未建表,可以通过Configure.createTableSQL() 获取建表语句,创建表后,测试 + */ +public class DBSupportParentConfigureServiceTest { + private String URL = ""; + protected String USER_NAME = ""; + protected String PASSWORD = ""; + protected String TABLE_NAME = "dipper_configure_source"; + + @Test + public void testDBConfigurableService() { + String namespace = "streams.db.configuable"; + + //正式使用时,在配置文件配置 + ComponentCreator.getProperties().put(ConfigureFileKey.CONNECT_TYPE, "DB"); + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名 + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_TABLE_NAME, TABLE_NAME); + + //如果表不存在,创建表 + String sql = (Configure.createTableSQL(TABLE_NAME)); + DriverBuilder.createDriver().execute(sql); + ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace); + configurableComponent.insert(createPerson(namespace)); + configurableComponent.refreshConfigurable(namespace); + Person person = configurableComponent.queryConfigurable("person", "peronName"); + assertTrue(person != null); + } + + /** + * 创建configuable对象 + * + * @param namespace + * @return + */ + protected Person createPerson(String namespace) { + Person person = new Person(); + person.setName("chris"); + person.setAge(18); + person.setNameSpace(namespace); + person.setConfigureName("peronName"); + person.setType("person"); + return person; + } +} diff --git a/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java new file mode 100644 index 0000000..e9b1fa2 --- /dev/null +++ b/rocketmq-streams-db-operator/src/test/java/org/apache/rocketmq/streams/db/driver/orm/ORMUtilTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.driver.orm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.db.Person; +import org.junit.Test; + +public class ORMUtilTest { + private String URL = ""; + protected String USER_NAME = ""; + protected String PASSWORD = ""; + + public ORMUtilTest() { + //正式使用时,在配置文件配置 + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_URL, URL);//数据库连接url + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_USERNAME, USER_NAME);//用户名 + ComponentCreator.getProperties().put(ConfigureFileKey.JDBC_PASSWORD, PASSWORD);//password + } + + @Test + public void testInsert() { + String namespace = "org.apache.configuable.test"; + List<Person> personList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + personList.add(createPerson(namespace, "chris" + i)); + } + /** + * 不带数据库连接信息(url,userName,Password),使用ConfiguableComponet的连接信息 + */ + ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,替换 + ORMUtil.batchIgnoreInto(personList);//批量插入,如果有唯一键冲突,忽略 + ORMUtil.batchIntoByFlag(personList, 0);////批量插入,如果有唯一键冲突,跑错 + } + + @Test + public void testQueryList() { + Map<String, Integer> paras = new HashMap<>(); + paras.put("age", 18); + List<Person> personList = ORMUtil.queryForList("select * from person where age >${age} limit 100", paras, Person.class); + } + + @Test + public void testQueryOneRow() { + Person personPara = new Person(); + personPara.setAge(18); + personPara.setName("chris1"); + Person person = ORMUtil.queryForObject("select * from person where age =${age} and name='${name}' ", personPara, Person.class, URL, USER_NAME, PASSWORD); + } + + /** + * 创建configuable对象 + * + * @param namespace + * @return + */ + protected Person createPerson(String namespace, String name) { + Person person = new Person(); + person.setName(name); + person.setAge(18); + person.setNameSpace(namespace); + person.setConfigureName("peronName"); + person.setType("person"); + return person; + } +} diff --git a/rocketmq-streams-transport-minio/pom.xml b/rocketmq-streams-transport-minio/pom.xml new file mode 100755 index 0000000..510b8cc --- /dev/null +++ b/rocketmq-streams-transport-minio/pom.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="utf-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>rocketmq-streams-transport-minio</artifactId> + <packaging>jar</packaging> + <name>ROCKETMQ STREAMS :: transport-minio</name> + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-serviceloader</artifactId> + </dependency> + <dependency> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + </dependency> + </dependencies> +</project> diff --git a/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml new file mode 100644 index 0000000..af11f77 --- /dev/null +++ b/rocketmq-streams-transport-minio/rocketmq-streams-transport-minio.iml @@ -0,0 +1,17 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> + <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5"> + <output url="file://$MODULE_DIR$/${project.build.directory}/classes" /> + <output-test url="file://$MODULE_DIR$/${project.build.directory}/test-classes" /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" /> + <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/classes" /> + <excludeFolder url="file://$MODULE_DIR$/${project.build.directory}/test-classes" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> + </content> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + </component> +</module> \ No newline at end of file diff --git a/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java new file mode 100644 index 0000000..c2a6884 --- /dev/null +++ b/rocketmq-streams-transport-minio/src/main/java/org/apache/rocketmq/streams/transport/minio/MinioFileTransport.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.transport.minio; + +import org.apache.rocketmq.streams.common.component.ComponentCreator; +import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; +import org.apache.rocketmq.streams.common.model.ServiceName; +import org.apache.rocketmq.streams.common.transport.AbstractFileTransport; +import org.apache.rocketmq.streams.common.transport.IFileTransport; +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import com.google.auto.service.AutoService; +import io.minio.MinioClient; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +@AutoService(IFileTransport.class) +@ServiceName(MinioFileTransport.NAME) +public class MinioFileTransport extends AbstractFileTransport { + public static final String NAME = "minio"; + protected String ak; + protected String sk; + protected String endpoint; + protected String dirpperDir; + protected MinioClient minioClient; + + public MinioFileTransport() { + this.ak = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_AK); + this.sk = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_SK); + this.endpoint = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_ENDPOINT); + this.dirpperDir = ComponentCreator.getProperties().getProperty(ConfigureFileKey.FILE_TRANSPORT_DIPPER_DIR); + if (StringUtil.isEmpty(this.dirpperDir)) { + this.dirpperDir = "dipper_files"; + } + } + + @Override + public File download(String remoteFileName, String localDir, String localFileName) { + MinioClient minioClient = getOrCreateMinioClient(); + BufferedWriter bw = null; + BufferedReader br = null; + try { + InputStream input = minioClient.getObject(dirpperDir, remoteFileName); + File file = new File(FileUtil.concatFilePath(localDir, localFileName)); + bw = new BufferedWriter(new FileWriter(file)); + br = new BufferedReader(new InputStreamReader(input)); + String line = br.readLine(); + while (line != null) { + bw.write(line); + line = br.readLine(); + } + bw.flush(); + return file; + } catch (Exception e) { + throw new RuntimeException("dowload file error " + dirpperDir + "/" + remoteFileName, e); + + } finally { + if (bw != null) { + try { + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (br != null) { + try { + br.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + } + + @Override + public Boolean upload(File file, String remoteFileName) { + MinioClient minioClient = getOrCreateMinioClient(); + try { + minioClient.putObject(dirpperDir, remoteFileName, file.getAbsolutePath()); + } catch (Exception e) { + throw new RuntimeException("upload file error " + dirpperDir + "/" + remoteFileName, e); + } + return true; + } + + @Override + public boolean delete(String remoteFileName) { + MinioClient minioClient = getOrCreateMinioClient(); + try { + minioClient.removeObject(dirpperDir, remoteFileName); + } catch (Exception e) { + throw new RuntimeException("delete file error " + dirpperDir + "/" + remoteFileName, e); + } + return true; + } + + protected MinioClient getOrCreateMinioClient() { + if (this.minioClient == null) { + synchronized (this) { + if (minioClient == null) { + try { + MinioClient minioClient = new MinioClient(endpoint, ak, sk); + boolean existDir = minioClient.bucketExists(dirpperDir); + + if (!existDir) { + minioClient.makeBucket(dirpperDir); + } + this.minioClient = minioClient; + } catch (Exception e) { + throw new RuntimeException("create minio client error", e); + } + + } + } + } + return this.minioClient; + } +} + + diff --git a/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java new file mode 100644 index 0000000..a3ebf6b --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/java/com/aliyun/yundun/dipper/configurable/DataTpyeTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aliyun.yundun.dipper.configurable; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.apache.rocketmq.streams.configurable.model.Person; + +public class DataTpyeTest { + @Test + public void testDataType() { + Person person = Person.createPerson("com.dipper.test"); + String jsonValue = person.toJson(); + + Person person1 = new Person(); + person1.toObject(jsonValue); + System.out.println(person1); + } + + @Test + public void testV2() { + Set<String> set = new HashSet<>(); + set.add("北斗"); + set.add("福建jz"); + set.add("甘肃jz"); + set.add("广东省气象micaps云"); + set.add("贵州公安科信"); + set.add("贵州警务云"); + set.add("杭州税友"); + set.add("江西公安大数据平台"); + set.add("昆仑项目"); + set.add("新华网"); + set.add("浙江气象高时空分辨率气象预报专有云"); + List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt"); + List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt"); + int count = 0; + for (String v2Line : v2) { + boolean match = false; + for (String zyyLine : zyy) { + if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) { + match = true; + count++; + } + } + if (match == false) { + System.out.println(v2Line); + } + } + System.out.println(count); + } +} diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java new file mode 100644 index 0000000..a1570c3 --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/DataTpyeTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.streams.configuable.model; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.rocketmq.streams.common.utils.FileUtil; +import org.junit.Test; + +public class DataTpyeTest { + @Test + public void testDataType() { + Person person = Person.createPerson("com.dipper.test"); + String jsonValue = person.toJson(); + + Person person1 = new Person(); + person1.toObject(jsonValue); + System.out.println(person1); + } + + @Test + public void testV2() { + Set<String> set = new HashSet<>(); + set.add("北斗"); + set.add("福建jz"); + set.add("甘肃jz"); + set.add("广东省气象micaps云"); + set.add("贵州公安科信"); + set.add("贵州警务云"); + set.add("杭州税友"); + set.add("江西公安大数据平台"); + set.add("昆仑项目"); + set.add("新华网"); + set.add("浙江气象高时空分辨率气象预报专有云"); + List<String> v2 = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/项目名称.txt"); + List<String> zyy = FileUtil.loadFileLine("/Users/yuanxiaodong/Documents/workdir/专有云.txt"); + int count = 0; + for (String v2Line : v2) { + boolean match = false; + for (String zyyLine : zyy) { + if (zyyLine.indexOf(v2Line) != -1 || v2Line.indexOf(zyyLine) != -1) { + match = true; + count++; + } + } + if (match == false) { + System.out.println(v2Line); + } + } + System.out.println(count); + } +} diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java new file mode 100644 index 0000000..04b99bb --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.streams.configuable.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; + +public class Person extends BasedConfigurable { + private String name; + private int age; + private Boolean isMale; + private List<String> addresses; + private Map<String, Integer> childName2Age; + + public static Person createPerson(String namespace) { + Person person = new Person(); + person.setNameSpace(namespace); + person.setType("person"); + person.setConfigureName("Chris"); + person.setName("Chris"); + List<String> addresses = new ArrayList<>(); + addresses.add("huilongguan"); + addresses.add("shangdi"); + person.setAddresses(addresses); + Map<String, Integer> childName2Age = new HashMap<>(); + childName2Age.put("yuanyahan", 8); + childName2Age.put("yuanruxi", 4); + person.setChildName2Age(childName2Age); + person.setMale(true); + person.setAge(18); + return person; + } + + @Override + public String toString() { + return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses + + ", childName2Age=" + childName2Age + '}'; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public Boolean getMale() { + return isMale; + } + + public void setMale(Boolean male) { + isMale = male; + } + + public List<String> getAddresses() { + return addresses; + } + + public void setAddresses(List<String> addresses) { + this.addresses = addresses; + } + + public Map<String, Integer> getChildName2Age() { + return childName2Age; + } + + public void setChildName2Age(Map<String, Integer> childName2Age) { + this.childName2Age = childName2Age; + } +} diff --git a/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java new file mode 100644 index 0000000..709978a --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.configurable.model; + +import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Person extends BasedConfigurable { + private String name; + private int age; + private Boolean isMale; + private List<String> addresses; + private Map<String, Integer> childName2Age; + + public static Person createPerson(String namespace) { + Person person = new Person(); + person.setNameSpace(namespace); + person.setType("person"); + person.setConfigureName("Chris"); + person.setName("Chris"); + List<String> addresses = new ArrayList<>(); + addresses.add("huilongguan"); + addresses.add("shangdi"); + person.setAddresses(addresses); + Map<String, Integer> childName2Age = new HashMap<>(); + childName2Age.put("yuanyahan", 8); + childName2Age.put("yuanruxi", 4); + person.setChildName2Age(childName2Age); + person.setMale(true); + person.setAge(18); + return person; + } + + @Override + public String toString() { + return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses + + ", childName2Age=" + childName2Age + '}'; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + public Boolean getMale() { + return isMale; + } + + public void setMale(Boolean male) { + isMale = male; + } + + public List<String> getAddresses() { + return addresses; + } + + public void setAddresses(List<String> addresses) { + this.addresses = addresses; + } + + public Map<String, Integer> getChildName2Age() { + return childName2Age; + } + + public void setChildName2Age(Map<String, Integer> childName2Age) { + this.childName2Age = childName2Age; + } +} diff --git a/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties new file mode 100644 index 0000000..598511e --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/resources/component/ConfigurableComponent.properties @@ -0,0 +1,7 @@ +dipper.configurable.service.type=resource_support_parent +dipper.channle.ak=xxxxxx +dipper.channle.sk=xxxxxx +dipper.rds.jdbc.driver=com.mysql.jdbc.Driver +dipper.rds.jdbc.url=xxxxxxx +dipper.rds.jdbc.username=xxxxxx +dipper.rds.jdbc.password=xxxxx \ No newline at end of file diff --git a/rocketmq-streams-transport-minio/src/test/resources/log4j.xml b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml new file mode 100755 index 0000000..7812fe7 --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/resources/log4j.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="Console" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/> + </layout> + <filter class="org.apache.log4j.varia.LevelRangeFilter"> + <param name="LevelMin" value="INFO"/> + <param name="LevelMax" value="ERROR"/> + </filter> + </appender> + + <root> + <priority value="INFO"/> + <appender-ref ref="Console"/> + </root> + +</log4j:configuration> \ No newline at end of file diff --git a/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt new file mode 100644 index 0000000..34a186f --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/resources/pro-function.txt @@ -0,0 +1,11 @@ +paserBySplit(@,uuid,file_path,pid,ppid,pfile_path,group_name,group_id,user_name,uid,euid,egroup_id,time,cmd_line,index,perm,tty,pcmdline,sid,cwd,filename); +addRandom(messageId,10); +rename(groupname,group_name); +rename(username,user_name); +rename(seq,index); +rename(egourpid,egroup_id); +rename(filepath,file_path); +rename(groupid,group_id); +rename(pfilename,pfile_name); +rename(safe_mode,perm); +rename(cmdline,cmd_line); \ No newline at end of file diff --git a/rocketmq-streams-transport-minio/src/test/resources/python_script.py b/rocketmq-streams-transport-minio/src/test/resources/python_script.py new file mode 100644 index 0000000..f4e7252 --- /dev/null +++ b/rocketmq-streams-transport-minio/src/test/resources/python_script.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# coding=utf-8 +import json; +import re; +import time; +regex = '^/(.*)/\w+' +pattern = re.compile(regex) + +def pythonTest(*processLine): + try: + jsonObject = json.loads(processLine[0]) + + if jsonObject.has_key('filepath'): + filePath = jsonObject['filepath'] + match = pattern.search(filePath) + if match: + return match.group(1) + else: + pass # print "does not has key filepath" + except BaseException as e: + pass # print "process one line cause exception %s" %e + return "does not match"
