This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 79a836f  format code accroding to alibaba coding guidlines
     new c140341  Merge pull request #16 from speak2me/main
79a836f is described below

commit 79a836f3e85818a663eb26163358607431c7566f
Author: arthur.lyj <[email protected]>
AuthorDate: Tue Aug 10 19:28:20 2021 +0800

    format code accroding to alibaba coding guidlines
---
 .../rocketmq/streams/client/DataStreamAction.java  |  2 +
 .../common/compiler/CustomJavaCompiler.java        | 11 ++--
 .../rocketmq/streams/common/context/Message.java   |  1 +
 .../common/logger/MyDailyRollingFileAppender.java  |  2 +
 .../common/monitor/group/MonitorCommander.java     | 70 +++++++++++-----------
 .../common/monitor/impl/NothingMonitorItem.java    |  3 +
 .../streams/filter/context/RuleContext.java        | 10 +++-
 .../operator/expression/ExpressionPerformance.java | 16 +++--
 .../function/impl/condition/EqualsFunction.java    |  2 +-
 .../script/function/impl/date/GetDateFunction.java |  2 +-
 .../impl/field/AdditionalFiledFunction.java        |  2 +-
 .../script/service/impl/ScriptServiceImpl.java     |  4 --
 12 files changed, 68 insertions(+), 57 deletions(-)

diff --git 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index baffba7..91f16fa 100644
--- 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++ 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -54,6 +54,7 @@ public class DataStreamAction extends DataStream {
     /**
      * 启动流任务
      */
+    @Override
     public void start() {
         start(false);
     }
@@ -65,6 +66,7 @@ public class DataStreamAction extends DataStream {
         start(true);
     }
 
+    @Override
     protected void start(boolean isAsync) {
         if (this.mainPipelineBuilder == null) {
             return;
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
index aa46b4a..d9d69d4 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/compiler/CustomJavaCompiler.java
@@ -59,6 +59,10 @@ public class CustomJavaCompiler {
     //运行耗时(单位ms)
     private long runTakeTime;
 
+    private static Pattern pattern1 = Pattern.compile("package\\s+\\S+\\s*;");
+
+    private static Pattern pattern2 = Pattern.compile("class\\s+\\S+.*\\{");
+
     /**
      * @param sourceFile java 文件
      */
@@ -153,14 +157,11 @@ public class CustomJavaCompiler {
      */
     public static String getFullClassName(String sourceCode) {
         String className = "";
-        Pattern pattern = Pattern.compile("package\\s+\\S+\\s*;");
-        Matcher matcher = pattern.matcher(sourceCode);
+        Matcher matcher = pattern1.matcher(sourceCode);
         if (matcher.find()) {
             className = matcher.group().replaceFirst("package", 
"").replace(";", "").trim() + ".";
         }
-
-        pattern = Pattern.compile("class\\s+\\S+.*\\{");
-        matcher = pattern.matcher(sourceCode);
+        matcher = pattern2.matcher(sourceCode);
         if (matcher.find()) {
             className += matcher.group().replaceFirst("class", 
"").replace("{", "").trim();
         }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
index 11ca6ea..9868cb5 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
@@ -111,6 +111,7 @@ public class Message implements IMessage {
         isJsonMessage = jsonMessage;
     }
 
+    @Override
     public void setHeader(MessageHeader header) {
         this.header = header;
     }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
index 9472d75..2933815 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/logger/MyDailyRollingFileAppender.java
@@ -135,6 +135,7 @@ public class MyDailyRollingFileAppender extends 
FileAppender {
         return maxBackupIndex;
     }
 
+    @Override
     public void activateOptions() {
         super.activateOptions();
 
@@ -308,6 +309,7 @@ public class MyDailyRollingFileAppender extends 
FileAppender {
      * <p>
      * Before actually logging, this method will check whether it is time to 
do a rollover. If it is, it will schedule the next rollover time and then 
rollover.
      */
+    @Override
     protected void subAppend(LoggingEvent event) {
         // 根据文件大小roll over
         rollOverBySize();
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
index f3ef147..87d4762 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/group/MonitorCommander.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -41,8 +43,9 @@ public class MonitorCommander {
 
     private static final Log logger = 
LogFactory.getLog(MonitorCommander.class);
 
-    private static MonitorCommander monitorManager = new MonitorCommander();
-    private static MetaData metaData = new MetaData();
+    private final static MonitorCommander monitorManager = new 
MonitorCommander();
+
+    private final static MetaData metaData = new MetaData();
 
     static {
         metaData.setTableName("mq_monitor_data");
@@ -57,7 +60,7 @@ public class MonitorCommander {
         metaData.getMetaDataFields().add(createMetaDataField("slowCount", new 
BooleanDataType()));
     }
 
-    private Object object = new Object();
+    private final Object object = new Object();
     private Object initObject = new Object();
     private boolean inited = false;
     private Map<String, GroupedMonitorInfo> groupedMonitorInfoMap = new 
HashMap<String, GroupedMonitorInfo>();
@@ -94,41 +97,38 @@ public class MonitorCommander {
                     outputDataSourceList.add(source);
                 }
             }
-            Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new 
Runnable() {
-                @Override
-                public void run() {
-                    Map<String, GroupedMonitorInfo> groupedMap = new 
HashMap<>();
-                    synchronized (object) {
-                        groupedMap = groupedMonitorInfoMap;
-                        groupedMonitorInfoMap = new HashMap<String, 
GroupedMonitorInfo>();
-                    }
-
-                    ISink loggerOutputDataSource = 
MonitorFactory.createOrGetLogOutputDatasource(
-                        "dataprocess_info");
-                    for (String key : groupedMap.keySet()) {
-                        GroupedMonitorInfo gmi = groupedMap.get(key);
-                        JSONObject result = new JSONObject();
-                        result.put("name", gmi.getName());
-                        result.put("count", gmi.getCount());
-                        result.put("max", gmi.getMax());
-                        result.put("min", gmi.getMin());
-                        result.put("avg", gmi.getAvg());
-                        result.put("errorCount", gmi.getErrorCount());
-                        result.put("slowCount", gmi.getSlowCount());
-                        //本地打印
-                        loggerOutputDataSource.batchAdd(new Message(result));
-                        loggerOutputDataSource.flush();
-                        //远程输出
-                        for (ISink source : outputDataSourceList) {
-                            source.batchAdd(new Message(result));
-                        }
-                    }
-                    //flush出去
+            ScheduledExecutorService monitorService = new 
ScheduledThreadPoolExecutor(1, new 
BasicThreadFactory.Builder().namingPattern("monitor-schedule-pool-%d").build());
+            monitorService.scheduleWithFixedDelay(() -> {
+                Map<String, GroupedMonitorInfo> groupedMap;
+                synchronized (object) {
+                    groupedMap = groupedMonitorInfoMap;
+                    groupedMonitorInfoMap = new HashMap<>();
+                }
+                ISink loggerOutputDataSource = 
MonitorFactory.createOrGetLogOutputDatasource(
+                    "data_process_info");
+                for (String key : groupedMap.keySet()) {
+                    GroupedMonitorInfo gmi = groupedMap.get(key);
+                    JSONObject result = new JSONObject();
+                    result.put("name", gmi.getName());
+                    result.put("count", gmi.getCount());
+                    result.put("max", gmi.getMax());
+                    result.put("min", gmi.getMin());
+                    result.put("avg", gmi.getAvg());
+                    result.put("errorCount", gmi.getErrorCount());
+                    result.put("slowCount", gmi.getSlowCount());
+                    //本地打印
+                    loggerOutputDataSource.batchAdd(new Message(result));
                     loggerOutputDataSource.flush();
+                    //远程输出
                     for (ISink source : outputDataSourceList) {
-                        source.flush();
+                        source.batchAdd(new Message(result));
                     }
                 }
+                //flush出去
+                loggerOutputDataSource.flush();
+                for (ISink source : outputDataSourceList) {
+                    source.flush();
+                }
             }, 0, 60, TimeUnit.SECONDS);
             inited = true;
         }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
index fd3aaf8..314393d 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/monitor/impl/NothingMonitorItem.java
@@ -30,6 +30,7 @@ public class NothingMonitorItem extends MonitorItem {
         return this;
     }
 
+    @Override
     public NothingMonitorItem endMonitor() {
 
         return this;
@@ -49,10 +50,12 @@ public class NothingMonitorItem extends MonitorItem {
         return emptyJson;
     }
 
+    @Override
     public MonitorItem occureError(Exception e, String... messages) {
         return this;
     }
 
+    @Override
     public void addMessage(String key, JSONObject value) {
 
     }
diff --git 
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
 
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index 2dc2a87..9080479 100644
--- 
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++ 
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.streams.filter.context;
 
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.Serializable;
 import java.util.Properties;
 import java.util.Vector;
@@ -24,6 +25,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
@@ -106,7 +111,10 @@ public class RuleContext extends AbstractContext<Message> 
implements Serializabl
             synchronized (RuleContext.class) {
                 if (!initflag) {
                     RuleContext staticRuleContext = new 
RuleContext(DEFALUT_NAME_SPACE, contextConfigure);
-                    ExecutorService actionExecutor = 
Executors.newFixedThreadPool(contextConfigure.getActionPoolSize());
+                    ThreadFactory actionFactory = new 
ThreadFactoryBuilder().setNameFormat("RuleContext-Action-Poo-%d").build();
+                    int threadSize = contextConfigure.getActionPoolSize();
+                    ExecutorService actionExecutor = new 
ThreadPoolExecutor(threadSize, threadSize,0L, TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(1024), actionFactory, new 
ThreadPoolExecutor.AbortPolicy());
                     staticRuleContext.actionExecutor = actionExecutor;
                     
staticRuleContext.functionService.scanePackage("org.apache.rocketmq.streams.filter.function");
                     superRuleContext = staticRuleContext;
diff --git 
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
 
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
index b583cad..bdd3966 100644
--- 
a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
+++ 
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/operator/expression/ExpressionPerformance.java
@@ -23,10 +23,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 
 public class ExpressionPerformance implements Runnable {
     protected static List<ExpressionPerformance> expressionPerformances = new 
ArrayList<>();
@@ -35,16 +36,13 @@ public class ExpressionPerformance implements Runnable {
     protected transient Map<String, ExpressionStatistic> 
expressionStatisticMap = new HashMap<>();
     protected transient long lastTime = System.currentTimeMillis();//最后一次的优化时间
     protected transient final List<String> values;
-    private static ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(10);
 
-    static {
+    private static ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(10, new 
BasicThreadFactory.Builder().namingPattern("ExpressionPerformance-Performance-%d").build());
 
-        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                for (ExpressionPerformance expressionPerformance : 
expressionPerformances) {
-                    expressionPerformance.run();
-                }
+    static {
+        scheduledExecutorService.scheduleWithFixedDelay(() -> {
+            for (ExpressionPerformance expressionPerformance : 
expressionPerformances) {
+                expressionPerformance.run();
             }
         }, 10, 3, TimeUnit.SECONDS);
     }
diff --git 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
index 71af638..dc7e2b1 100644
--- 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
+++ 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/condition/EqualsFunction.java
@@ -65,7 +65,7 @@ public class EqualsFunction {
         } else if (FunctionUtils.isLong(value)) {
             Long left = FunctionUtils.getLong(leftValue);
             Long right = FunctionUtils.getLong(value);
-            return left == right;
+            return left.equals(right);
         } else if (FunctionUtils.isDouble(value)) {
             Double left = FunctionUtils.getDouble(leftValue);
             Double right = FunctionUtils.getDouble(value);
diff --git 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
index 3706dc9..f6ec724 100644
--- 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
+++ 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/date/GetDateFunction.java
@@ -34,7 +34,7 @@ public class GetDateFunction {
      */
     @FunctionMethod(value = "getdate", alias = "gettime", comment = 
"获取当前时间的时间戳")
     public Long getDate(IMessage message, FunctionContext context) {
-        return new Date().getTime();
+        return System.currentTimeMillis();
     }
 
 }
diff --git 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
index de27326..633da3b 100644
--- 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
+++ 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/AdditionalFiledFunction.java
@@ -57,7 +57,7 @@ public class AdditionalFiledFunction {
     @FunctionMethod(value = "random", alias = "createRandom", comment = 
"产生一个随机数")
     public String random(IMessage message, FunctionContext context,
                          @FunctionParamter(value = "string", comment = 
"代表字段长度的字段名,数字或常量") String strLength) {
-        Long length = 10l;
+        Long length = 10L;
         if (!StringUtil.isEmpty(strLength) && !"null".equals(strLength)) {
             Object object = FunctionUtils.getValue(message, context, 
strLength);
             if (FunctionUtils.isNumberObject(object)) {
diff --git 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
index 314f67c..f88fb5d 100644
--- 
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
+++ 
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/service/impl/ScriptServiceImpl.java
@@ -21,8 +21,6 @@ import com.alibaba.fastjson.JSONObject;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.streams.common.cache.softreference.ICache;
 import 
org.apache.rocketmq.streams.common.cache.softreference.impl.SoftReferenceCache;
 import org.apache.rocketmq.streams.common.classloader.FileClassLoader;
@@ -115,6 +113,4 @@ public class ScriptServiceImpl implements IScriptService {
         functionService.scanClassDir(jarFile, packageName, 
isolationClassLoader);
     }
 
-    private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(3);
-
 }

Reply via email to