This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 79a836f format code accroding to alibaba coding guidlines
new c140341 Merge pull request #16 from speak2me/main
79a836f is described below
commit 79a836f3e85818a663eb26163358607431c7566f
Author: arthur.lyj <[email protected]>
AuthorDate: Tue Aug 10 19:28:20 2021 +0800
format code accroding to alibaba coding guidlines
---
.../rocketmq/streams/client/DataStreamAction.java | 2 +
.../common/compiler/CustomJavaCompiler.java | 11 ++--
.../rocketmq/streams/common/context/Message.java | 1 +
.../common/logger/MyDailyRollingFileAppender.java | 2 +
.../common/monitor/group/MonitorCommander.java | 70 +++++++++++-----------
.../common/monitor/impl/NothingMonitorItem.java | 3 +
.../streams/filter/context/RuleContext.java | 10 +++-
.../operator/expression/ExpressionPerformance.java | 16 +++--
.../function/impl/condition/EqualsFunction.java | 2 +-
.../script/function/impl/date/GetDateFunction.java | 2 +-
.../impl/field/AdditionalFiledFunction.java | 2 +-
.../script/service/impl/ScriptServiceImpl.java | 4 --
12 files changed, 68 insertions(+), 57 deletions(-)
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index baffba7..91f16fa 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -54,6 +54,7 @@ public class DataStreamAction extends DataStream {
/**
* 启动流任务
*/
+ @Override
public void start() {
start(false);
}
@@ -65,6 +66,7 @@ public class DataStreamAction extends DataStream {
start(true);
}
+ @Override
protected void start(boolean isAsync) {
if (this.mainPipelineBuilder == null) {
return;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
index aa46b4a..d9d69d4 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
@@ -59,6 +59,10 @@ public class CustomJavaCompiler {
//运行耗时(单位ms)
private long runTakeTime;
+ private static Pattern pattern1 = Pattern.compile("package\\s+\\S+\\s*;");
+
+ private static Pattern pattern2 = Pattern.compile("class\\s+\\S+.*\\{");
+
/**
* @param sourceFile java 文件
*/
@@ -153,14 +157,11 @@ public class CustomJavaCompiler {
*/
public static String getFullClassName(String sourceCode) {
String className = "";
- Pattern pattern = Pattern.compile("package\\s+\\S+\\s*;");
- Matcher matcher = pattern.matcher(sourceCode);
+ Matcher matcher = pattern1.matcher(sourceCode);
if (matcher.find()) {
className = matcher.group().replaceFirst("package",
"").replace(";", "").trim() + ".";
}
-
- pattern = Pattern.compile("class\\s+\\S+.*\\{");
- matcher = pattern.matcher(sourceCode);
+ matcher = pattern2.matcher(sourceCode);
if (matcher.find()) {
className += matcher.group().replaceFirst("class",
"").replace("{", "").trim();
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
index 11ca6ea..9868cb5 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
@@ -111,6 +111,7 @@ public class Message implements IMessage {
isJsonMessage = jsonMessage;
}
+ @Override
public void setHeader(MessageHeader header) {
this.header = header;
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
index 9472d75..2933815 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
@@ -135,6 +135,7 @@ public class MyDailyRollingFileAppender extends
FileAppender {
return maxBackupIndex;
}
+ @Override
public void activateOptions() {
super.activateOptions();
@@ -308,6 +309,7 @@ public class MyDailyRollingFileAppender extends
FileAppender {
* <p>
* Before actually logging, this method will check whether it is time to
do a rollover. If it is, it will schedule the next rollover time and then
rollover.
*/
+ @Override
protected void subAppend(LoggingEvent event) {
// 根据文件大小roll over
rollOverBySize();
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
index f3ef147..87d4762 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -41,8 +43,9 @@ public class MonitorCommander {
private static final Log logger =
LogFactory.getLog(MonitorCommander.class);
- private static MonitorCommander monitorManager = new MonitorCommander();
- private static MetaData metaData = new MetaData();
+ private final static MonitorCommander monitorManager = new
MonitorCommander();
+
+ private final static MetaData metaData = new MetaData();
static {
metaData.setTableName("mq_monitor_data");
@@ -57,7 +60,7 @@ public class MonitorCommander {
metaData.getMetaDataFields().add(createMetaDataField("slowCount", new
BooleanDataType()));
}
- private Object object = new Object();
+ private final Object object = new Object();
private Object initObject = new Object();
private boolean inited = false;
private Map<String, GroupedMonitorInfo> groupedMonitorInfoMap = new
HashMap<String, GroupedMonitorInfo>();
@@ -94,41 +97,38 @@ public class MonitorCommander {
outputDataSourceList.add(source);
}
}
- Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new
Runnable() {
- @Override
- public void run() {
- Map<String, GroupedMonitorInfo> groupedMap = new
HashMap<>();
- synchronized (object) {
- groupedMap = groupedMonitorInfoMap;
- groupedMonitorInfoMap = new HashMap<String,
GroupedMonitorInfo>();
- }
-
- ISink loggerOutputDataSource =
MonitorFactory.createOrGetLogOutputDatasource(
- "dataprocess_info");
- for (String key : groupedMap.keySet()) {
- GroupedMonitorInfo gmi = groupedMap.get(key);
- JSONObject result = new JSONObject();
- result.put("name", gmi.getName());
- result.put("count", gmi.getCount());
- result.put("max", gmi.getMax());
- result.put("min", gmi.getMin());
- result.put("avg", gmi.getAvg());
- result.put("errorCount", gmi.getErrorCount());
- result.put("slowCount", gmi.getSlowCount());
- //本地打印
- loggerOutputDataSource.batchAdd(new Message(result));
- loggerOutputDataSource.flush();
- //远程输出
- for (ISink source : outputDataSourceList) {
- source.batchAdd(new Message(result));
- }
- }
- //flush出去
+ ScheduledExecutorService monitorService = new
ScheduledThreadPoolExecutor(1, new
BasicThreadFactory.Builder().namingPattern("monitor-schedule-pool-%d").build());
+ monitorService.scheduleWithFixedDelay(() -> {
+ Map<String, GroupedMonitorInfo> groupedMap;
+ synchronized (object) {
+ groupedMap = groupedMonitorInfoMap;
+ groupedMonitorInfoMap = new HashMap<>();
+ }
+ ISink loggerOutputDataSource =
MonitorFactory.createOrGetLogOutputDatasource(
+ "data_process_info");
+ for (String key : groupedMap.keySet()) {
+ GroupedMonitorInfo gmi = groupedMap.get(key);
+ JSONObject result = new JSONObject();
+ result.put("name", gmi.getName());
+ result.put("count", gmi.getCount());
+ result.put("max", gmi.getMax());
+ result.put("min", gmi.getMin());
+ result.put("avg", gmi.getAvg());
+ result.put("errorCount", gmi.getErrorCount());
+ result.put("slowCount", gmi.getSlowCount());
+ //本地打印
+ loggerOutputDataSource.batchAdd(new Message(result));
loggerOutputDataSource.flush();
+ //远程输出
for (ISink source : outputDataSourceList) {
- source.flush();
+ source.batchAdd(new Message(result));
}
}
+ //flush出去
+ loggerOutputDataSource.flush();
+ for (ISink source : outputDataSourceList) {
+ source.flush();
+ }
}, 0, 60, TimeUnit.SECONDS);
inited = true;
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
index fd3aaf8..314393d 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
@@ -30,6 +30,7 @@ public class NothingMonitorItem extends MonitorItem {
return this;
}
+ @Override
public NothingMonitorItem endMonitor() {
return this;
@@ -49,10 +50,12 @@ public class NothingMonitorItem extends MonitorItem {
return emptyJson;
}
+ @Override
public MonitorItem occureError(Exception e, String... messages) {
return this;
}
+ @Override
public void addMessage(String key, JSONObject value) {
}
diff --git
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index 2dc2a87..9080479 100644
---
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.filter.context;
import com.alibaba.fastjson.JSONObject;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Serializable;
import java.util.Properties;
import java.util.Vector;
@@ -24,6 +25,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
@@ -106,7 +111,10 @@ public class RuleContext extends AbstractContext<Message>
implements Serializabl
synchronized (RuleContext.class) {
if (!initflag) {
RuleContext staticRuleContext = new
RuleContext(DEFALUT_NAME_SPACE, contextConfigure);
- ExecutorService actionExecutor =
Executors.newFixedThreadPool(contextConfigure.getActionPoolSize());
+ ThreadFactory actionFactory = new
ThreadFactoryBuilder().setNameFormat("RuleContext-Action-Poo-%d").build();
+ int threadSize = contextConfigure.getActionPoolSize();
+ ExecutorService actionExecutor = new
ThreadPoolExecutor(threadSize, threadSize,0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1024), actionFactory, new
ThreadPoolExecutor.AbortPolicy());
staticRuleContext.actionExecutor = actionExecutor;
staticRuleContext.functionService.scanePackage("org.apache.rocketmq.streams.filter.function");
superRuleContext = staticRuleContext;
diff --git
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
index b583cad..bdd3966 100644
---
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
+++
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
@@ -23,10 +23,11 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
public class ExpressionPerformance implements Runnable {
protected static List<ExpressionPerformance> expressionPerformances = new
ArrayList<>();
@@ -35,16 +36,13 @@ public class ExpressionPerformance implements Runnable {
protected transient Map<String, ExpressionStatistic>
expressionStatisticMap = new HashMap<>();
protected transient long lastTime = System.currentTimeMillis();//最后一次的优化时间
protected transient final List<String> values;
- private static ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(10);
- static {
+ private static ScheduledExecutorService scheduledExecutorService = new
ScheduledThreadPoolExecutor(10, new
BasicThreadFactory.Builder().namingPattern("ExpressionPerformance-Performance-%d").build());
- scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- for (ExpressionPerformance expressionPerformance :
expressionPerformances) {
- expressionPerformance.run();
- }
+ static {
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ for (ExpressionPerformance expressionPerformance :
expressionPerformances) {
+ expressionPerformance.run();
}
}, 10, 3, TimeUnit.SECONDS);
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
index 71af638..dc7e2b1 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
@@ -65,7 +65,7 @@ public class EqualsFunction {
} else if (FunctionUtils.isLong(value)) {
Long left = FunctionUtils.getLong(leftValue);
Long right = FunctionUtils.getLong(value);
- return left == right;
+ return left.equals(right);
} else if (FunctionUtils.isDouble(value)) {
Double left = FunctionUtils.getDouble(leftValue);
Double right = FunctionUtils.getDouble(value);
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
index 3706dc9..f6ec724 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
@@ -34,7 +34,7 @@ public class GetDateFunction {
*/
@FunctionMethod(value = "getdate", alias = "gettime", comment =
"获取当前时间的时间戳")
public Long getDate(IMessage message, FunctionContext context) {
- return new Date().getTime();
+ return System.currentTimeMillis();
}
}
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
index de27326..633da3b 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
@@ -57,7 +57,7 @@ public class AdditionalFiledFunction {
@FunctionMethod(value = "random", alias = "createRandom", comment =
"产生一个随机数")
public String random(IMessage message, FunctionContext context,
@FunctionParamter(value = "string", comment =
"代表字段长度的字段名,数字或常量") String strLength) {
- Long length = 10l;
+ Long length = 10L;
if (!StringUtil.isEmpty(strLength) && !"null".equals(strLength)) {
Object object = FunctionUtils.getValue(message, context,
strLength);
if (FunctionUtils.isNumberObject(object)) {
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
index 314f67c..f88fb5d 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
@@ -21,8 +21,6 @@ import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.streams.common.cache.softreference.ICache;
import
org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
import org.apache.rocketmq.streams.common.classloader.FileClassLoader;
@@ -115,6 +113,4 @@ public class ScriptServiceImpl implements IScriptService {
functionService.scanClassDir(jarFile, packageName,
isolationClassLoader);
}
- private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(3);
-
}