This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 87a3222 Add Sqlmode (#63)
87a3222 is described below
commit 87a32224d2a874ba50dac4aee881237f0f62c8ae
Author: 零号程序 <[email protected]>
AuthorDate: Wed Sep 15 11:45:39 2021 +0800
Add Sqlmode (#63)
* add update logic for the DBSinker 、 upgrade the concat_ws function
* Add the field level cache to reduce duplicate data entry #60
* Sqlmode can be used to write data to the database as specified
#62
Co-authored-by: junjie.cheng <[email protected]>
---
.../apache/rocketmq/streams/db/sink/DBSink.java | 184 ++++++++++++---------
.../rocketmq/streams/db/sink/DBSinkBuilder.java | 24 +--
.../streams/client/transform/DataStream.java | 70 ++++----
.../streams/common/metadata/AbstractMetaData.java | 2 +-
.../rocketmq/streams/common/metadata/MetaData.java | 2 +-
.../streams/common/metadata/MetaDataField.java | 32 ++--
.../common/topology/stages/OutputChainStage.java | 5 +-
7 files changed, 167 insertions(+), 152 deletions(-)
diff --git
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index 1aab2ba..886e23a 100644
---
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -18,40 +18,34 @@ package org.apache.rocketmq.streams.db.sink;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
-import
org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import
org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Set;
-
/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql
模版:insert into
table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
MetaData:主要是描述每个字段的类型,是否必须
二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {
-
- protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into
table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
-
- protected String duplicateSQLTemplate; //通过on duplicate key update
来对已经存在的信息进行更新
-
- protected MetaData metaData;//可以指定meta data,和insertSQL二选一
-
- protected String tableName; //指定要插入的数据表
+ public static final String SQL_MODE_DEFAULT = "default";
+ public static final String SQL_MODE_REPLACE = "replace";
+ public static final String SQL_MODE_IGNORE = "ignore";
@ENVDependence
protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
@@ -60,7 +54,15 @@ public class DBSink extends AbstractSink {
@ENVDependence
protected String userName;
@ENVDependence
+ protected String tableName; //指定要插入的数据表
+ @ENVDependence
protected String password;
+ @ENVDependence
+ protected String sqlMode;
+
+ protected MetaData metaData;//可以指定meta data,和insertSQL二选一
+
+ protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into
table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
protected boolean openSqlCache = true;
@@ -87,60 +89,83 @@ public class DBSink extends AbstractSink {
}
public DBSink() {
- setType(IChannel.TYPE);
+ this(null, null, null, null);
}
- public DBSink(String url, String userName, String password) {
- setType(IChannel.TYPE);
- this.url = url;
- this.userName = userName;
- this.password = password;
+ public DBSink(String url, String userName, String password, String
tableName) {
+ this(url, userName, password, tableName, SQL_MODE_DEFAULT);
}
- public DBSink(String insertSQL, String url, String userName, String
password) {
+ public DBSink(String url, String userName, String password, String
tableName, String sqlMode) {
+ this(url, userName, password, tableName, sqlMode, null);
+ }
+
+ public DBSink(String url, String userName, String password, String
tableName, String sqlMode, MetaData metaData) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
- this.insertSQLTemplate = insertSQL;
+ this.tableName = tableName;
+ this.sqlMode = sqlMode;
+ this.metaData = metaData;
}
@Override
protected boolean initConfigurable() {
- try {
- Class.forName("com.mysql.jdbc.Driver");
- if (StringUtil.isNotEmpty(this.tableName)) {
- Connection connection = DriverManager.getConnection(url,
userName, password);
- DatabaseMetaData metaData = connection.getMetaData();
- ResultSet metaResult =
metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
- this.metaData = MetaData.createMetaData(metaResult);
- this.metaData.setTableName(this.tableName);
- }
- sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>() {
- @Override
- public boolean flushMessage(List<String> sqls) {
- JDBCDriver dataSource =
DriverBuilder.createDriver(jdbcDriver, url, userName, password);
- try {
- dataSource.executSqls(sqls);
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (dataSource != null) {
- dataSource.destroy();
- }
- }
- return true;
+ if (this.metaData == null) {
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ if (StringUtil.isNotEmpty(this.tableName)) {
+ Connection connection =
DriverManager.getConnection(this.url, this.userName, this.password);
+ DatabaseMetaData connectionMetaData =
connection.getMetaData();
+ ResultSet metaResult =
connectionMetaData.getColumns(connection.getCatalog(), "%", this.tableName,
null);
+ this.metaData = MetaData.createMetaData(metaResult);
+ this.metaData.setTableName(this.tableName);
}
- });
- ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
- ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
- sqlCache.openAutoFlush();
- return super.initConfigurable();
- } catch (ClassNotFoundException | SQLException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ List<MetaDataField> fieldList = this.metaData.getMetaDataFields();
+ List<String> insertFields = Lists.newArrayList();
+ List<String> insertValues = Lists.newArrayList();
+ List<String> duplicateKeys = Lists.newArrayList();
+ fieldList.forEach(field -> {
+ String fieldName = field.getFieldName();
+ insertFields.add(fieldName);
+ insertValues.add("'#{" + fieldName + "}'");
+ duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
+ });
+
+ String sql = "insert";
+ if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) {
+ sql = sql + " into ";
+ } else if (SQL_MODE_IGNORE.equals(sqlMode)) {
+ sql = sql + " ignore into ";
+ } else if (SQL_MODE_REPLACE.equals(sqlMode)) {
+ sql = sql + " into ";
}
- return false;
+ sql = sql + tableName + "(" + String.join(",", insertFields) + ")
values (" + String.join(",", insertValues) + ") ";
+ if (SQL_MODE_REPLACE.equals(sqlMode)) {
+ sql = sql + " on duplicate key update " + String.join(",",
duplicateKeys);
+ }
+ this.insertSQLTemplate = sql;
+ this.sqlCache = new MessageCache<>(sqls -> {
+ JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver,
url, userName, password);
+ try {
+ dataSource.executSqls(sqls);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ dataSource.destroy();
+ }
+ return true;
+ });
+ ((MessageCache<String>) this.sqlCache).setAutoFlushTimeGap(100000);
+ ((MessageCache<String>) this.sqlCache).setAutoFlushSize(50);
+ this.sqlCache.openAutoFlush();
+ return super.initConfigurable();
}
@Override
@@ -154,7 +179,6 @@ public class DBSink extends AbstractSink {
if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
String sql = SQLUtil.createInsertSql(metaData,
messages.get(0));
sql += SQLUtil.createInsertValuesSQL(metaData,
messages.subList(1, messages.size()));
- sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
return true;
}
@@ -162,7 +186,6 @@ public class DBSink extends AbstractSink {
if (StringUtil.isEmpty(insertValueSQL) ||
insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
for (JSONObject message : messages) {
String sql = parseSQL(message, insertSQLTemplate);
- sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
}
return true;
@@ -172,7 +195,6 @@ public class DBSink extends AbstractSink {
subInsert.add(parseSQL(message, insertValueSQL));
}
String insertSQL =
this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
- insertSQL += this.duplicateSQLTemplate;
executeSQL(dbDataSource, insertSQL);
return true;
}
@@ -195,7 +217,16 @@ public class DBSink extends AbstractSink {
} else {
dbDataSource.execute(sql);
}
+ }
+ protected void executeSQL(JDBCDriver dbDataSource, List<String> sqls) {
+ if (isOpenSqlCache()) {
+ for (String sql : sqls) {
+ this.sqlCache.addCache(sql);
+ }
+ } else {
+ dbDataSource.executSqls(sqls);
+ }
}
/**
@@ -209,7 +240,7 @@ public class DBSink extends AbstractSink {
return null;
}
String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
- int end = valuesSQL.toLowerCase().lastIndexOf(")");
+ int end = valuesSQL.toLowerCase().indexOf(")");
if (end == -1) {
return null;
}
@@ -260,14 +291,6 @@ public class DBSink extends AbstractSink {
this.password = password;
}
- public MetaData getMetaData() {
- return metaData;
- }
-
- public void setMetaData(MetaData metaData) {
- this.metaData = metaData;
- }
-
public String getTableName() {
return tableName;
}
@@ -276,6 +299,22 @@ public class DBSink extends AbstractSink {
this.tableName = tableName;
}
+ public String getSqlMode() {
+ return sqlMode;
+ }
+
+ public void setSqlMode(String sqlMode) {
+ this.sqlMode = sqlMode;
+ }
+
+ public MetaData getMetaData() {
+ return metaData;
+ }
+
+ public void setMetaData(MetaData metaData) {
+ this.metaData = metaData;
+ }
+
public boolean isOpenSqlCache() {
return openSqlCache;
}
@@ -284,11 +323,4 @@ public class DBSink extends AbstractSink {
this.openSqlCache = openSqlCache;
}
- public String getDuplicateSQLTemplate() {
- return duplicateSQLTemplate;
- }
-
- public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
- this.duplicateSQLTemplate = duplicateSQLTemplate;
- }
}
diff --git
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
index 87f5fb7..1743f62 100644
---
a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
+++
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -17,17 +17,13 @@
package org.apache.rocketmq.streams.db.sink;
import com.google.auto.service.AutoService;
-import com.google.common.collect.Lists;
+import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;
-import java.util.List;
-import java.util.Properties;
-
@AutoService(IChannelBuilder.class)
@ServiceName(DBSinkBuilder.TYPE)
public class DBSinkBuilder implements IChannelBuilder {
@@ -39,21 +35,9 @@ public class DBSinkBuilder implements IChannelBuilder {
sink.setUrl(properties.getProperty("url"));
sink.setUserName(properties.getProperty("userName"));
sink.setPassword(properties.getProperty("password"));
- List<MetaDataField> fieldList = metaData.getMetaDataFields();
-
- List<String> insertFields = Lists.newArrayList();
- List<String> insertValues = Lists.newArrayList();
- List<String> duplicateKeys = Lists.newArrayList();
- fieldList.forEach(field -> {
- String fieldName = field.getFieldName();
- insertFields.add(fieldName);
- insertValues.add("'#{" + fieldName + "}'");
- duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
- });
-
- String sql = "insert into " + properties.getProperty("tableName") +
"(" + String.join(",", insertFields) + ") values (" + String.join(",",
insertValues) + ") ";
- sink.setInsertSQLTemplate(sql);
- sink.setDuplicateSQLTemplate(" on duplicate key update " +
String.join(",", duplicateKeys));
+ sink.setTableName(properties.getProperty("tableName"));
+ sink.setSqlMode(properties.getProperty("sqlMode"));
+ sink.setMetaData(metaData);
return sink;
}
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index f9260ea..08bfe7f 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -73,7 +73,8 @@ public class DataStream implements Serializable {
this.currentChainStage = currentChainStage;
}
- public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder>
pipelineBuilders, ChainStage<?> currentChainStage) {
+ public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder>
pipelineBuilders,
+ ChainStage<?> currentChainStage) {
this.mainPipelineBuilder = pipelineBuilder;
this.otherPipelineBuilders = pipelineBuilders;
this.currentChainStage = currentChainStage;
@@ -97,11 +98,11 @@ public class DataStream implements Serializable {
@Override
protected <T> T operate(IMessage message, AbstractContext context)
{
try {
- O o = (O)(message.getMessageValue());
- T result = (T)mapFunction.map(o);
+ O o = (O) (message.getMessageValue());
+ T result = (T) mapFunction.map(o);
if (result != message.getMessageValue()) {
if (result instanceof JSONObject) {
- message.setMessageBody((JSONObject)result);
+ message.setMessageBody((JSONObject) result);
} else {
message.setMessageBody(new
UserDefinedMessage(result));
}
@@ -118,28 +119,28 @@ public class DataStream implements Serializable {
return new DataStream(this.mainPipelineBuilder,
this.otherPipelineBuilders, stage);
}
-
public <T, O> DataStream flatMap(FlatMapFunction<T, O> mapFunction) {
StageBuilder stageBuilder = new StageBuilder() {
@Override
protected <T> T operate(IMessage message, AbstractContext context)
{
try {
- O o = (O)(message.getMessageValue());
- List<T> result =(List<T>)mapFunction.flatMap(o);
- if(result==null||result.size()==0){
+ O o = (O) (message.getMessageValue());
+ List<T> result = (List<T>) mapFunction.flatMap(o);
+ if (result == null || result.size() == 0) {
context.breakExecute();
}
- List<IMessage> splitMessages=new ArrayList<>();
- for(T t:result){
- Message subMessage=null;
+ List<IMessage> splitMessages = new ArrayList<>();
+ for (T t : result) {
+ Message subMessage = null;
if (result instanceof JSONObject) {
- subMessage=new Message((JSONObject)t);
+ subMessage = new Message((JSONObject) t);
} else {
- subMessage=new Message(new UserDefinedMessage(t));
+ subMessage = new Message(new
UserDefinedMessage(t));
}
splitMessages.add(subMessage);
}
- context.openSplitModel();;
+ context.openSplitModel();
+ ;
context.setSplitMessages(splitMessages);
return null;
} catch (Exception e) {
@@ -159,7 +160,7 @@ public class DataStream implements Serializable {
@Override
protected <T> T operate(IMessage message, AbstractContext context)
{
try {
- boolean isFilter =
filterFunction.filter((O)message.getMessageValue());
+ boolean isFilter = filterFunction.filter((O)
message.getMessageValue());
if (isFilter) {
context.breakExecute();
}
@@ -232,7 +233,7 @@ public class DataStream implements Serializable {
Union union = new Union();
//处理左流,做流的isMain设置成true
- UDFUnionChainStage chainStage =
(UDFUnionChainStage)this.mainPipelineBuilder.createStage(union);
+ UDFUnionChainStage chainStage = (UDFUnionChainStage)
this.mainPipelineBuilder.createStage(union);
chainStage.setMainStream(true);
this.mainPipelineBuilder.setTopologyStages(currentChainStage,
chainStage);
@@ -272,7 +273,8 @@ public class DataStream implements Serializable {
* @param sqlOrTableName
* @return
*/
- public JoinStream join(String url, String userName, String password,
String sqlOrTableName, long pollingTimeMintue) {
+ public JoinStream join(String url, String userName, String password,
String sqlOrTableName,
+ long pollingTimeMintue) {
return join(url, userName, password, sqlOrTableName, null,
pollingTimeMintue);
}
@@ -285,7 +287,8 @@ public class DataStream implements Serializable {
* @param sqlOrTableName
* @return
*/
- public JoinStream join(String url, String userName, String password,
String sqlOrTableName, String jdbcDriver, long pollingTimeMinute) {
+ public JoinStream join(String url, String userName, String password,
String sqlOrTableName, String jdbcDriver,
+ long pollingTimeMinute) {
DBDim dbDim = new DBDim();
dbDim.setUrl(url);
dbDim.setUserName(userName);
@@ -308,7 +311,7 @@ public class DataStream implements Serializable {
StageBuilder selfChainStage = new StageBuilder() {
@Override
protected <T> T operate(IMessage message, AbstractContext context)
{
- forEachFunction.foreach((O)message.getMessageValue());
+ forEachFunction.foreach((O) message.getMessageValue());
return null;
}
};
@@ -367,7 +370,6 @@ public class DataStream implements Serializable {
return;
}
-
ConfigurableComponent configurableComponent =
ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(),
ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory");
ChainPipeline pipeline =
this.mainPipelineBuilder.build(configurableComponent.getService());
pipeline.startChannel();
@@ -403,21 +405,23 @@ public class DataStream implements Serializable {
this.otherPipelineBuilders.addAll(rightSource.otherPipelineBuilders);
}
- public DataStreamAction toFile(String filePath,int batchSize,boolean
isAppend) {
- FileSink fileChannel = new FileSink(filePath,isAppend);
- if(batchSize>0){
+ public DataStreamAction toFile(String filePath, int batchSize, boolean
isAppend) {
+ FileSink fileChannel = new FileSink(filePath, isAppend);
+ if (batchSize > 0) {
fileChannel.setBatchSize(batchSize);
}
ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
mainPipelineBuilder.setTopologyStages(currentChainStage, output);
return new DataStreamAction(this.mainPipelineBuilder,
this.otherPipelineBuilders, output);
}
- public DataStreamAction toFile(String filePath,boolean isAppend) {
- FileSink fileChannel = new FileSink(filePath,isAppend);
+
+ public DataStreamAction toFile(String filePath, boolean isAppend) {
+ FileSink fileChannel = new FileSink(filePath, isAppend);
ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
mainPipelineBuilder.setTopologyStages(currentChainStage, output);
return new DataStreamAction(this.mainPipelineBuilder,
this.otherPipelineBuilders, output);
}
+
public DataStreamAction toFile(String filePath) {
FileSink fileChannel = new FileSink(filePath);
ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
@@ -440,28 +444,26 @@ public class DataStream implements Serializable {
}
public DataStreamAction toDB(String url, String userName, String password,
String tableName) {
- DBSink dbChannel = new DBSink(url, userName, password);
- dbChannel.setTableName(tableName);
+ DBSink dbChannel = new DBSink(url, userName, password, tableName);
ChainStage<?> output = this.mainPipelineBuilder.createStage(dbChannel);
this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
return new DataStreamAction(this.mainPipelineBuilder,
this.otherPipelineBuilders, output);
}
public DataStreamAction toRocketmq(String topic) {
- return toRocketmq(topic, "*", null,-1, null);
+ return toRocketmq(topic, "*", null, -1, null);
}
-
- public DataStreamAction toRocketmq(String topic,String namesrvAddr) {
- return toRocketmq(topic, "*", null,-1, namesrvAddr);
+ public DataStreamAction toRocketmq(String topic, String namesrvAddr) {
+ return toRocketmq(topic, "*", null, -1, namesrvAddr);
}
public DataStreamAction toRocketmq(String topic, String tags,
- String namesrvAddr) {
- return toRocketmq(topic, tags,null,-1, namesrvAddr);
+ String namesrvAddr) {
+ return toRocketmq(topic, tags, null, -1, namesrvAddr);
}
- public DataStreamAction toRocketmq(String topic, String tags,String
groupName, int batchSize, String namesrvAddr) {
+ public DataStreamAction toRocketmq(String topic, String tags, String
groupName, int batchSize, String namesrvAddr) {
RocketMQSink rocketMQSink = new RocketMQSink();
rocketMQSink.setTopic(topic);
rocketMQSink.setTags(tags);
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
index 7efd6d3..59f7464 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
@@ -197,7 +197,7 @@ public abstract class AbstractMetaData<T> extends
BasedConfigurable
Iterator i$ = this.metaDataFields.iterator();
while (i$.hasNext()) {
- MetaDataField<T> field = (MetaDataField)i$.next();
+ MetaDataField<T> field = (MetaDataField) i$.next();
jsonArray.add(field.toJson());
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
index 3deec90..b4f04d3 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
@@ -164,7 +164,7 @@ public class MetaData extends AbstractMetaData {
}
public static String getTableName(Class clazz) {
- TableClassName tableClass =
(TableClassName)clazz.getAnnotation(TableClassName.class);
+ TableClassName tableClass = (TableClassName)
clazz.getAnnotation(TableClassName.class);
if (tableClass != null) {
String className = tableClass.value();
if (StringUtil.isNotEmpty(className)) {
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
index 9ec882b..5a035ae 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
@@ -116,7 +116,7 @@ public class MetaDataField<T> extends Entity implements
IJsonable {
JSONObject jsonObject = new JSONObject();
jsonObject.put("fieldName", fieldName);
if (dataType == null) {
- dataType = (DataType)new StringDataType();
+ dataType = (DataType) new StringDataType();
}
jsonObject.put("dataType", dataType.toJson());
jsonObject.put("isRequired", isRequired);
@@ -134,8 +134,8 @@ public class MetaDataField<T> extends Entity implements
IJsonable {
this.isPrimary = jsonObject.getBoolean("isPrimary");
}
- public static DataType getDataTypeByStr(String dataType) {
- DataType dt = null;
+ public static DataType<?> getDataTypeByStr(String dataType) {
+ DataType<?> dt = null;
if ("String".equals(dataType)) {
dt = new StringDataType();
} else if ("long".equals(dataType)) {
@@ -152,22 +152,20 @@ public class MetaDataField<T> extends Entity implements
IJsonable {
return dt;
}
- public static String getDataTypeStrByType(DataType dataType) {
- String dataTypestr = "";
- if (StringDataType.class.isInstance(dataType)) {
- dataTypestr = "String";
- } else if (LongDataType.class.isInstance(dataType)) {
- dataTypestr = "long";
- } else if (IntDataType.class.isInstance(dataType)) {
- dataTypestr = "int";
- } else if (FloatDataType.class.isInstance(dataType)) {
- dataTypestr = "float";
- } else if (Boolean.class.isInstance(dataType)) {
- dataTypestr = "boolean";
+ public static String getDataTypeStrByType(DataType<?> dataType) {
+ String dataTypeStr = "";
+ if (dataType instanceof StringDataType) {
+ dataTypeStr = "String";
+ } else if (dataType instanceof LongDataType) {
+ dataTypeStr = "long";
+ } else if (dataType instanceof IntDataType) {
+ dataTypeStr = "int";
+ } else if (dataType instanceof FloatDataType) {
+ dataTypeStr = "float";
} else {
- dataTypestr = "String";
+ dataTypeStr = "String";
}
- return dataTypestr;
+ return dataTypeStr;
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 5cbac0f..a715ca3 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -82,13 +82,12 @@ public class OutputChainStage<T extends IMessage> extends
ChainStage<T> implemen
*/
if (openMockChannel()) {
if (mockSink != null) {
- mockSink.batchAdd(message.deepCopy());
+ mockSink.batchAdd(message);
return message;
}
return message;
}
- sink.batchAdd(message.deepCopy());
-
+ sink.batchAdd(message);
return message;
}