This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit ed97ca3daee8bd3e71c4c5150a54381867a07756 Author: vv <[email protected]> AuthorDate: Mon Aug 2 12:03:59 2021 +0800 add channel-db module --- rocketmq-streams-channel-db/pom.xml | 21 ++ .../streams/db/sink/AbstractMultiTableSink.java | 150 +++++++++++++ .../apache/rocketmq/streams/db/sink/DBSink.java | 239 +++++++++++++++++++++ .../rocketmq/streams/db/sink/DBSinkBuilder.java | 76 +++++++ .../streams/db/sink/SelfMultiTableSink.java | 53 +++++ .../streams/db/sink/SplitBySerialNumber.java | 36 ++++ .../streams/db/sink/SplitByTimeMultiTableSink.java | 36 ++++ .../streams/db/sink/db/DBWriteOnlyChannelTest.java | 84 ++++++++ 8 files changed, 695 insertions(+) diff --git a/rocketmq-streams-channel-db/pom.xml b/rocketmq-streams-channel-db/pom.xml new file mode 100755 index 0000000..10d760e --- /dev/null +++ b/rocketmq-streams-channel-db/pom.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>rocketmq-streams-channel-db</artifactId> + <name>ROCKETMQ STREAMS :: channel-db</name> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-db-operator</artifactId> + </dependency> + </dependencies> +</project> diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java new file mode 100644 index 0000000..6547615 --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/AbstractMultiTableSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; +import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache; +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class AbstractMultiTableSink extends DBSink { + protected transient ConcurrentHashMap<String, DBSink> tableSinks = new ConcurrentHashMap(); + protected transient AtomicLong messageCount = new AtomicLong(0); + protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction; + + public AbstractMultiTableSink(String url, String userName, String password) { + this.url = url; + this.userName = userName; + this.password = password; + } + + @Override + public boolean batchAdd(IMessage message, ISplit split) { + + DBSink sink = getOrCreateDBSink(split.getQueueId()); + boolean success = sink.batchAdd(message, split); + long count = messageCount.incrementAndGet(); + if (count >= getBatchSize()) { + Set<String> queueIds = new HashSet<>(); + queueIds.add(split.getQueueId()); + flush(queueIds); + } + return success; + } + + @Override + public boolean batchAdd(IMessage message) { + ISplit split = getSplitFromMessage(message); + return batchAdd(message, split); + } + + @Override + public boolean batchSave(List<IMessage> messages) { + throw new RuntimeException("can not support this method"); + } + + @Override + public boolean flush(Set<String> splitIds) { + if (splitIds == null) { + return true; + } + for (String splitId : splitIds) { + DBSink sink = getOrCreateDBSink(splitId); + sink.flush(); + } + return true; + } + + @Override + public boolean flush() { + for (DBSink dbSink : tableSinks.values()) { + dbSink.flush(); + } + return true; + } + + @Override + public void openAutoFlush() { + for (DBSink dbSink : tableSinks.values()) { + dbSink.openAutoFlush(); + } + } + + @Override + public void closeAutoFlush() { + for (DBSink dbSink : tableSinks.values()) { + dbSink.closeAutoFlush(); + } + } + + protected DBSink getOrCreateDBSink(String splitId) { + DBSink sink = this.tableSinks.get(splitId); + if (sink != null) { + return sink; + } + sink = new DBSink(); + sink.setUrl(url); + sink.setPassword(password); + sink.setUserName(userName); + sink.setTableName(createTableName(splitId)); + sink.openAutoFlush(); + sink.setBatchSize(batchSize); + sink.setJdbcDriver(this.jdbcDriver); + sink.setMessageCache(new SingleDBSinkCache(sink)); + sink.init(); + DBSink existDBSink = this.tableSinks.putIfAbsent(splitId, sink); + if (existDBSink != null) { + return existDBSink; + } + + return sink; + } + + protected abstract String createTableName(String splitId); + + protected abstract ISplit getSplitFromMessage(IMessage message); + + protected class SingleDBSinkCache extends MessageCache<IMessage> { + + public SingleDBSinkCache( + IMessageFlushCallBack<IMessage> flushCallBack) { + super(flushCallBack); + } + + @Override + public int flush(Set<String> splitIds) { + int size = super.flush(splitIds); + messageCount.addAndGet(-size); + return size; + } + + @Override + public int flush() { + int size = super.flush(); + messageCount.addAndGet(-size); + return size; + } + } + +} diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java new file mode 100644 index 0000000..5732424 --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import com.alibaba.fastjson.JSONObject; + +import org.apache.rocketmq.streams.common.channel.IChannel; +import org.apache.rocketmq.streams.common.channel.sink.AbstractSink; +import org.apache.rocketmq.streams.common.component.AbstractComponent; +import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.metadata.MetaData; +import org.apache.rocketmq.streams.common.utils.SQLUtil; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.db.driver.DriverBuilder; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; + +import java.sql.*; +import java.util.List; + +/** + * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql + */ +public class DBSink extends AbstractSink { + + protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') + + protected MetaData metaData;//可以指定meta data,和insertSQL二选一 + + protected String tableName; //指定要插入的数据表 + + @ENVDependence + protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER; + @ENVDependence + protected String url; + @ENVDependence + protected String userName; + @ENVDependence + protected String password; + + /** + * db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名 + * + * @param insertSQL sql模版 + * @param dbInfoNamePrefix 参数可以是名字,这个是名字前缀.真实值可以配置在配置文件中 + */ + public DBSink(String insertSQL, String dbInfoNamePrefix) { + setType(IChannel.TYPE); + if (StringUtil.isEmpty(dbInfoNamePrefix)) { + dbInfoNamePrefix = getConfigureName(); + } + if (StringUtil.isEmpty(dbInfoNamePrefix)) { + dbInfoNamePrefix = this.getClass().getSimpleName(); + } + this.insertSQLTemplate = insertSQL; + this.url = dbInfoNamePrefix + ".url"; + this.password = dbInfoNamePrefix + ".password"; + this.userName = dbInfoNamePrefix + ".userName"; + } + + public DBSink() { + setType(IChannel.TYPE); + } + + public DBSink(String url, String userName, String password) { + setType(IChannel.TYPE); + this.url = url; + this.userName = userName; + this.password = password; + } + + public DBSink(String insertSQL, String url, String userName, String password) { + setType(IChannel.TYPE); + this.url = url; + this.userName = userName; + this.password = password; + this.insertSQLTemplate = insertSQL; + } + + @Override + protected boolean initConfigurable() { + try { + Class.forName("com.mysql.jdbc.Driver"); + if (StringUtil.isNotEmpty(this.tableName)) { + Connection connection = DriverManager.getConnection(url, userName, password); + DatabaseMetaData metaData = connection.getMetaData(); + ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null); + this.metaData = MetaData.createMetaData(metaResult); + this.metaData.setTableName(this.tableName); + } + return super.initConfigurable(); + } catch (ClassNotFoundException | SQLException e) { + e.printStackTrace(); + } + return false; + } + + @Override + protected boolean batchInsert(List<IMessage> messageList) { + JDBCDriver dbDataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password); + try { + if (messageList == null || messageList.size() == 0) { + return true; + } + List<JSONObject> messages = convertJsonObjectFromMessage(messageList); + if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) { + String sql = SQLUtil.createInsertSql(metaData, messages.get(0)); + sql = sql + SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size())); + executeSQL(dbDataSource, sql); + return true; + } + String insertValueSQL = parseInsertValues(insertSQLTemplate); + if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) { + for (JSONObject message : messages) { + String sql = parseSQL(message, insertSQLTemplate); + executeSQL(dbDataSource, sql); + } + return true; + } else { + StringBuilder sb = new StringBuilder(); + String insertSQL; + boolean isFirst = true; + int i = 0; + for (JSONObject message : messages) { + insertSQL = parseSQL(message, insertValueSQL); + if (isFirst) { + isFirst = false; + } else { + sb.append(","); + } + i++; + + sb.append(insertSQL); + } + insertSQL = this.insertSQLTemplate.replace(insertValueSQL, sb.toString()); + executeSQL(dbDataSource, insertSQL); + return true; + } + } finally { + dbDataSource.destroy(); + } + } + + protected void executeSQL(JDBCDriver dbDataSource, String sql) { + dbDataSource.execute(sql); + } + + /** + * 解析出insert value数据部分,对于批量的插入,效果会更佳 + */ + private static final String VALUES_NAME = "values"; + + protected String parseInsertValues(String insertSQL) { + int start = insertSQL.toLowerCase().indexOf(VALUES_NAME); + if (start == -1) { + return null; + } + String valuesSQL = insertSQL.substring(start + VALUES_NAME.length()); + int end = valuesSQL.toLowerCase().lastIndexOf(")"); + if (end == -1) { + return null; + } + return valuesSQL.substring(0, end + 1); + } + + protected String parseSQL(JSONObject message, String sql) { + return SQLUtil.parseIbatisSQL(message, sql); + } + + public String getInsertSQLTemplate() { + return insertSQLTemplate; + } + + public void setInsertSQLTemplate(String insertSQLTemplate) { + this.insertSQLTemplate = insertSQLTemplate; + } + + public String getJdbcDriver() { + return jdbcDriver; + } + + public void setJdbcDriver(String jdbcDriver) { + this.jdbcDriver = jdbcDriver; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public MetaData getMetaData() { + return metaData; + } + + public void setMetaData(MetaData metaData) { + this.metaData = metaData; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } +} diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java new file mode 100644 index 0000000..ef7ae28 --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import java.util.List; +import java.util.Properties; + +import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder; +import org.apache.rocketmq.streams.common.channel.sink.ISink; +import org.apache.rocketmq.streams.common.channel.source.ISource; +import org.apache.rocketmq.streams.common.model.ServiceName; +import org.apache.rocketmq.streams.common.metadata.MetaData; +import org.apache.rocketmq.streams.common.metadata.MetaDataField; +import org.apache.rocketmq.streams.common.utils.DataTypeUtil; +import com.google.auto.service.AutoService; + +@AutoService(IChannelBuilder.class) +@ServiceName(DBSinkBuilder.TYPE) +public class DBSinkBuilder implements IChannelBuilder { + public static final String TYPE = "rds"; + + @Override + public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) { + DBSink sink = new DBSink(); + sink.setUrl(properties.getProperty("url")); + sink.setUserName("userName"); + sink.setPassword("password"); + List<MetaDataField> fieldList = metaData.getMetaDataFields(); + StringBuilder insertSQL = new StringBuilder(); + StringBuilder insertValueSQL = new StringBuilder(); + boolean isFirst = true; + for (MetaDataField field : fieldList) { + String fieldName = field.getFieldName(); + if (isFirst) { + isFirst = false; + } else { + insertSQL.append(","); + insertValueSQL.append(","); + } + insertSQL.append(fieldName); + if (DataTypeUtil.isNumber(field.getDataType())) { + insertValueSQL.append(fieldName); + } else { + insertValueSQL.append("'#{" + fieldName + "}'"); + } + } + String sql = "insert into " + properties.getProperty("tableName") + "(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")"; + sink.setInsertSQLTemplate(sql); + return sink; + } + + @Override + public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) { + throw new RuntimeException("can not support this method"); + } + + @Override + public String getType() { + return TYPE; + } + +} diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java new file mode 100644 index 0000000..96922e6 --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner; +import org.apache.rocketmq.streams.common.configurable.IConfigurableService; +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction; +import org.apache.rocketmq.streams.common.utils.Base64Utils; +import org.apache.rocketmq.streams.common.utils.InstantiationUtil; + +public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner { + protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码 + protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction; + + public SelfMultiTableSink(String url, String userName, String password, MultiTableSplitFunction<IMessage> multiTableSplitFunction) { + super(url, userName, password); + this.multiTableSplitFunction = multiTableSplitFunction; + byte[] bytes = InstantiationUtil.serializeObject(multiTableSplitFunction); + multiTableSplitFunctionSerializeValue = Base64Utils.encode(bytes); + } + + @Override + protected String createTableName(String splitId) { + return multiTableSplitFunction.createTableFromSplitId(splitId); + } + + @Override + protected ISplit getSplitFromMessage(IMessage message) { + return multiTableSplitFunction.createSplit(message); + } + + @Override + public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) { + byte[] bytes = Base64Utils.decode(multiTableSplitFunctionSerializeValue); + this.multiTableSplitFunction = InstantiationUtil.deserializeObject(bytes); + } +} diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java new file mode 100644 index 0000000..c2a49b7 --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitBySerialNumber.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.context.IMessage; + +public class SplitBySerialNumber extends AbstractMultiTableSink { + public SplitBySerialNumber(String url, String userName, String password) { + super(url, userName, password); + } + + @Override + protected String createTableName(String splitId) { + return null; + } + + @Override + protected ISplit getSplitFromMessage(IMessage message) { + return null; + } +} diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java new file mode 100644 index 0000000..87a2b3e --- /dev/null +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SplitByTimeMultiTableSink.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink; + +import org.apache.rocketmq.streams.common.channel.split.ISplit; +import org.apache.rocketmq.streams.common.context.IMessage; + +public class SplitByTimeMultiTableSink extends AbstractMultiTableSink { + public SplitByTimeMultiTableSink(String url, String userName, String password) { + super(url, userName, password); + } + + @Override + protected String createTableName(String splitId) { + return null; + } + + @Override + protected ISplit getSplitFromMessage(IMessage message) { + return null; + } +} diff --git a/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java new file mode 100644 index 0000000..c14bbc3 --- /dev/null +++ b/rocketmq-streams-channel-db/src/test/java/org/apache/rocketmq/streams/db/sink/db/DBWriteOnlyChannelTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.streams.db.sink.db; + +import com.alibaba.fastjson.JSONObject; + +import org.apache.rocketmq.streams.common.context.IMessage; +import org.apache.rocketmq.streams.common.context.Message; +import org.apache.rocketmq.streams.common.metadata.MetaData; +import org.apache.rocketmq.streams.db.driver.JDBCDriver; +import org.apache.rocketmq.streams.db.sink.DBSink; +import org.junit.Test; + +public class DBWriteOnlyChannelTest { + + private String URL = "jdbc:mysql://XXXXX:3306/yundun_soc?useUnicode=true&characterEncoding=utf8&autoReconnect=true"; + protected String USER_NAME = "XXXX"; + protected String PASSWORD = "XXXX"; + + @Test + public void testOutputBySQL() { + String sql = "insert into table(name,age) values('#{name}',#{age})"; + DBSink sink = new DBSink(sql, URL, USER_NAME, PASSWORD) { + + /** + * 因为不是真实表,会报错,把执行sql,改成打印sql + */ + @Override + protected void executeSQL(JDBCDriver dbDataSource, String sql) { + System.out.println(sql); + } + }; + for (int i = 0; i < 10; i++) { + JSONObject msg = new JSONObject(); + msg.put("name", "chris" + i); + msg.put("age", i); + IMessage message = new Message(msg); + sink.batchAdd(message); + } + sink.flush(); + } + + @Test + public void testOutputByMetaData() { + DBSink sink = new DBSink() { + /** + * 因为不是真实表,会报错,把执行sql,改成打印sql + */ + @Override + protected void executeSQL(JDBCDriver dbDataSource, String sql) { + System.out.println(sql); + } + }; + JSONObject msg = new JSONObject(); + msg.put("name", "chris"); + msg.put("age", 18); + MetaData metaData = MetaData.createMetaData(msg); + metaData.setTableName("tableName"); + sink.setMetaData(metaData); + for (int i = 0; i < 10; i++) { + msg = new JSONObject(); + msg.put("name", "chris" + i); + msg.put("age", i); + IMessage message = new Message(msg); + sink.batchAdd(message); + } + sink.flush(); + } + +}
