This is an automated email from the ASF dual-hosted git repository.
seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new ed76a95 SDK添加去重机制和指纹机制
new 6a3ea18 Merge pull request #77 from programer-0/develop
ed76a95 is described below
commit ed76a955795a0edcbb46de35eb36cb16205d09b1
Author: junjie.cheng <[email protected]>
AuthorDate: Mon Sep 27 17:25:33 2021 +0800
SDK添加去重机制和指纹机制
---
.../streams/client/source/DataStreamSource.java | 18 +++-
.../streams/client/strategy/WindowStrategy.java | 4 +-
.../streams/client/transform/DataStream.java | 32 ++++++-
.../rocketmq/streams/client/DataStreamTest.java | 94 ++++++++++---------
.../streams/common/context/MessageHeader.java | 2 +-
.../streams/common/functions/FilterFunction.java | 1 +
.../common/optimization/HyperscanRegex.java | 74 ++++++++++-----
.../streams/common/topology/ChainPipeline.java | 12 +--
.../streams/common/topology/ChainStage.java | 4 +-
.../common/topology/builder/PipelineBuilder.java | 3 +-
.../common/topology/model/AbstractStage.java | 74 ++++++++++++++-
.../common/topology/stages/FilterChainStage.java | 102 +++------------------
.../common/topology/stages/udf/StageBuilder.java | 1 +
.../common/topology/stages/udf/UDFChainStage.java | 28 +++++-
.../rocketmq/streams/common/utils/TraceUtil.java | 10 +-
.../configurable/ConfigurableComponent.java | 4 +-
16 files changed, 277 insertions(+), 186 deletions(-)
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index e30acdb..1f564f6 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -17,9 +17,12 @@
package org.apache.rocketmq.streams.client.source;
+import java.util.Properties;
+import javax.sql.DataSource;
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.source.RocketMQSource;
@@ -32,10 +35,23 @@ public class DataStreamSource implements Serializable {
this.mainPipelineBuilder = new PipelineBuilder(namespace,
pipelineName);
}
+ public DataStreamSource(String namespace, String pipelineName, String[]
duplicateKeys, Long windowSize) {
+ this.mainPipelineBuilder = new PipelineBuilder(namespace,
pipelineName);
+ Properties properties = new Properties();
+ properties.setProperty(pipelineName + ".duplicate.fields.names",
String.join(";", duplicateKeys));
+ properties.setProperty(pipelineName + ".duplicate.expiration.time",
String.valueOf(windowSize));
+ ComponentCreator.createProperties(properties);
+ }
+
public static DataStreamSource create(String namespace, String
pipelineName) {
return new DataStreamSource(namespace, pipelineName);
}
+ public static DataStreamSource create(String namespace, String
pipelineName, String[] duplicateKeys,
+ Long expirationTime) {
+ return new DataStreamSource(namespace, pipelineName, duplicateKeys,
expirationTime);
+ }
+
public DataStream fromFile(String filePath) {
return fromFile(filePath, true);
}
@@ -68,7 +84,7 @@ public class DataStreamSource implements Serializable {
public DataStream from(ISource<?> source) {
this.mainPipelineBuilder.setSource(source);
- return new DataStream(this.mainPipelineBuilder,null);
+ return new DataStream(this.mainPipelineBuilder, null);
}
}
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
index 5b99131..ce42385 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
@@ -53,8 +53,8 @@ public class WindowStrategy implements Strategy {
return new WindowStrategy();
}
- public static Strategy windowDefaultSiZe(int defualtSize){
-
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,defualtSize);
+ public static Strategy windowDefaultSiZe(int defualtSize) {
+
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,
defualtSize);
return null;
}
diff --git
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index ece41f6..8049968 100644
---
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -143,7 +143,6 @@ public class DataStream implements Serializable {
splitMessages.add(subMessage);
}
context.openSplitModel();
- ;
context.setSplitMessages(splitMessages);
return null;
} catch (Exception e) {
@@ -158,13 +157,17 @@ public class DataStream implements Serializable {
}
public <O> DataStream filter(final FilterFunction<O> filterFunction) {
- StageBuilder mapUDFOperator = new StageBuilder() {
+ return filter(filterFunction, new String[] {});
+ }
+ public <O> DataStream filter(final FilterFunction<O> filterFunction,
String... fingerprints) {
+ StageBuilder mapUDFOperator = new StageBuilder() {
@Override
protected <T> T operate(IMessage message, AbstractContext context)
{
try {
- boolean isFilter = filterFunction.filter((O)
message.getMessageValue());
- if (isFilter) {
+ boolean tag = filterFunction.filter((O)
message.getMessageValue());
+ if (!tag) {
+ context.put("NEED_USE_FINGER_PRINT", true);
context.breakExecute();
}
} catch (Exception e) {
@@ -175,6 +178,24 @@ public class DataStream implements Serializable {
};
ChainStage stage =
this.mainPipelineBuilder.createStage(mapUDFOperator);
this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
+
+ if (fingerprints.length > 0) {
+ ChainPipeline<?> pipeline = this.mainPipelineBuilder.getPipeline();
+ String filterName = stage.getLabel();
+ if (!pipeline.isTopology()) {
+ List<?> stages = pipeline.getStages();
+ int i = 0;
+ for (Object st : stages) {
+ if (st == stage) {
+ break;
+ }
+ i++;
+ }
+ filterName = i + "";
+ }
+ String key = MapKeyUtil.createKeyBySign(".",
pipeline.getNameSpace(), pipeline.getConfigureName(), filterName);
+ ComponentCreator.getProperties().setProperty(key, String.join(",",
fingerprints));
+ }
return new DataStream(this.mainPipelineBuilder,
this.otherPipelineBuilders, stage);
}
@@ -463,7 +484,8 @@ public class DataStream implements Serializable {
return toRocketmq(topic, tags, groupName, -1, nameServerAddress,
clusterName, order);
}
- public DataStreamAction toRocketmq(String topic, String tags, String
groupName, int batchSize, String nameServerAddress,
+ public DataStreamAction toRocketmq(String topic, String tags, String
groupName, int batchSize,
+ String nameServerAddress,
String clusterName, boolean order) {
RocketMQSink rocketMQSink = new RocketMQSink();
if (StringUtils.isNotBlank(topic)) {
diff --git
a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 2684c18..39aec16 100644
---
a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++
b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -46,74 +46,86 @@ public class DataStreamTest implements Serializable {
@Test
public void testFromFile() {
dataStream
- .fromFile("/Users/junjie.cheng/text.txt", false)
- .map(message -> message + "--")
- .toPrint(1)
- .start();
+ .fromFile("/Users/junjie.cheng/text.txt", false)
+ .map(message -> message + "--")
+ .toPrint(1)
+ .start();
}
@Test
public void testRocketmq() {
DataStreamSource dataStream =
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
dataStream
- .fromRocketmq("topic_xxxx01", "consumer_xxxx01",
"127.0.0.1:9876")
- .map(message -> message + "--")
- .toPrint(1)
- .start();
+ .fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .toPrint(1)
+ .start();
}
@Test
public void testDBCheckPoint() {
dataStream
- .fromRocketmq("topic_xxxx02", "consumer_xxxx02",
"127.0.0.1:9876")
- .map(message -> message + "--")
- .toPrint(1)
- .with(WindowStrategy.exactlyOnce("", "", ""))
- .start();
+ .fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .toPrint(1)
+ .with(WindowStrategy.exactlyOnce("", "", ""))
+ .start();
}
@Test
public void testFileCheckPoint() {
dataStream
- .fromFile("/Users/junjie.cheng/text.txt", false)
- .map(message -> message + "--")
- .toPrint(1)
- .with(WindowStrategy.highPerformance())
- .start();
+ .fromFile("/Users/junjie.cheng/text.txt", false)
+ .map(message -> message + "--")
+ .toPrint(1)
+ .with(WindowStrategy.highPerformance())
+ .start();
}
-
@Test
public void testWindow() {
DataStreamSource dataStream =
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
dataStream
- .fromRocketmq("topic_xxxx03", "consumer_xxxx03",
"127.0.0.1:9876")
- .map(new MapFunction<JSONObject, String>() {
-
- @Override
- public JSONObject map(String message) throws Exception {
- JSONObject msg = JSONObject.parseObject(message);
- return msg;
- }
- })
- .window(TumblingWindow.of(Time.seconds(5)))
- .groupBy("name", "age")
- .count("c")
- .sum("score", "scoreValue")
- .toDataSteam()
- .toPrint(1)
- .with(WindowStrategy.exactlyOnce("", "", ""))
- .start();
+ .fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
+ .map(new MapFunction<JSONObject, String>() {
+
+ @Override
+ public JSONObject map(String message) throws Exception {
+ JSONObject msg = JSONObject.parseObject(message);
+ return msg;
+ }
+ })
+ .window(TumblingWindow.of(Time.seconds(5)))
+ .groupBy("name", "age")
+ .count("c")
+ .sum("score", "scoreValue")
+ .toDataSteam()
+ .toPrint(1)
+ .with(WindowStrategy.exactlyOnce("", "", ""))
+ .start();
+ }
+
+ @Test
+ public void testFingerPrintStrategy() {
+ dataStream
+ .fromFile("/Users/junjie.cheng/text.txt", false)
+ .map(message -> message + "--")
+ .toPrint(1)
+ .start();
+
}
@Test
public void testBothStrategy() {
dataStream
- .fromRocketmq("topic_xxxx04", "consumer_xxxx04",
"127.0.0.1:9876")
- .map(message -> message + "--")
- .toPrint(1)
- .with()
- .start();
+ .fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
+ .map(message -> message + "--")
+ .filter(message -> {
+ return true;
+ })
+ .toPrint(1)
+ .with()
+ .start();
}
@Test
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 4044b06..2613461 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -126,7 +126,7 @@ public class MessageHeader {
header.msgRouteFromLable = msgRouteFromLable;
header.logFingerprintValue = logFingerprintValue;
header.messageQueue = messageQueue;
- header.checkpointQueueIds=checkpointQueueIds;
+ header.checkpointQueueIds = checkpointQueueIds;
return header;
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
index 350b6fd..f7df80e 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
@@ -19,4 +19,5 @@ package org.apache.rocketmq.streams.common.functions;
public interface FilterFunction<T> extends Function {
boolean filter(T value) throws Exception;
+
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
index ed37e1c..736b0a6 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.streams.common.optimization;
+import com.gliwka.hyperscan.wrapper.CompileErrorException;
import com.gliwka.hyperscan.wrapper.Database;
import com.gliwka.hyperscan.wrapper.Expression;
import com.gliwka.hyperscan.wrapper.ExpressionFlag;
@@ -27,13 +28,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
public class HyperscanRegex<T> {
- protected List<Expression> regexs = new ArrayList<>();
+ protected List<Expression> allRegexes = new ArrayList<>();//all registe
regex
+
protected Database db;
protected Scanner scanner;
protected AtomicBoolean hasCompile = new AtomicBoolean(false);
- protected List<T> list = new ArrayList<>();
+ protected List<T> expressionContextList = new ArrayList<>();
+
+ protected List<Expression> notSupportCompileExpression = new
ArrayList<>();//can not comile expressions
+ protected List<Expression> supportCompileExpression = new
ArrayList<>();//all regex exclude not support compile
/**
* 把多个表达式放到库里
@@ -41,9 +47,10 @@ public class HyperscanRegex<T> {
* @param regex
*/
public void addRegex(String regex, T context) {
- list.add(context);
- Expression expression = new Expression(regex,
EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS,
ExpressionFlag.SINGLEMATCH), list.size() - 1);
- regexs.add(expression);
+ expressionContextList.add(context);
+ Expression expression = new Expression(regex,
EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS,
ExpressionFlag.SINGLEMATCH), expressionContextList.size() - 1);
+ allRegexes.add(expression);
+ supportCompileExpression.add(expression);
db = null;
scanner = null;
hasCompile.set(false);
@@ -53,17 +60,25 @@ public class HyperscanRegex<T> {
* 完成编译
*/
public void compile() {
- try {
- if (hasCompile.compareAndSet(false, true) && regexs.size() > 0) {
- Database db = Database.compile(regexs);
+ if (!hasCompile.compareAndSet(false, true) ||
supportCompileExpression.size() == 0) {
+ return;
+ }
+ while (true) {
+ try {
+ if (supportCompileExpression.size() == 0) {
+ break;
+ }
+ Database db = Database.compile(supportCompileExpression);
Scanner scanner = new Scanner();
scanner.allocScratch(db);
this.db = db;
this.scanner = scanner;
+ break;
+ } catch (CompileErrorException e) {
+ Expression expression = e.getFailedExpression();
+ this.supportCompileExpression.remove(expression);
+ this.notSupportCompileExpression.add(expression);
}
-
- } catch (Exception e) {
- System.out.println("can not support this regex " + e.getMessage());
}
}
@@ -75,15 +90,14 @@ public class HyperscanRegex<T> {
* @return
*/
public boolean match(String content) {
- if (scanner == null || db == null || hasCompile.get() == false) {
+ if (scanner == null || db == null || !hasCompile.get()) {
compile();
}
- List<Match> matches = scanner.scan(db, content);
- if (matches.size() > 0) {
- return true;
- } else {
+ if (content == null) {
return false;
}
+ List<Match> matches = scanner.scan(db, content);
+ return matches.size() > 0;
}
/**
@@ -93,18 +107,34 @@ public class HyperscanRegex<T> {
* @return
*/
public Set<T> matchExpression(String content) {
- if (scanner == null || db == null || hasCompile.get() == false) {
+ if (scanner == null || db == null || !hasCompile.get()) {
compile();
}
+ if (content == null) {
+ return new HashSet<>();
+ }
List<Match> matches = scanner.scan(db, content);
Set<T> fireExpressions = new HashSet<>();
- if (matches.size() == 0) {
- return fireExpressions;
+ if (this.notSupportCompileExpression.size() > 0) {
+ for (Expression expression : this.notSupportCompileExpression) {
+ String regex = expression.getExpression();
+ boolean isMatch =
StringUtil.matchRegexCaseInsensitive(content, regex);
+ if (isMatch) {
+ int index = expression.getId();
+ fireExpressions.add(expressionContextList.get(index));
+ }
+ }
}
- for (Match match : matches) {
- Integer index = match.getMatchedExpression().getId();
- fireExpressions.add(list.get(index));
+ if (matches.size() > 0) {
+ for (Match match : matches) {
+ Integer index = match.getMatchedExpression().getId();
+ fireExpressions.add(expressionContextList.get(index));
+ }
}
return fireExpressions;
}
+
+ public int size() {
+ return allRegexes.size();
+ }
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 244ed3b..6762e8e 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -152,9 +152,6 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
return MapKeyUtil.createKeyBySign(".", getType(), getNameSpace(),
getConfigureName());
}
- private static AtomicInteger total = new AtomicInteger(0);
- private static AtomicInteger hitCache = new AtomicInteger(0);
-
/**
* 可以替换某个阶段的阶段,而不用配置的阶段
*
@@ -166,12 +163,10 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
@Override
protected T doMessageInner(T t, AbstractContext context, AbstractStage...
replaceStage) {
if (this.duplicateCache != null && this.duplicateFields != null &&
!this.duplicateFields.isEmpty() && !t.getHeader().isSystemMessage()) {
- total.incrementAndGet();
String duplicateKey = createDuplicateKey(t);
Long cacheTime = this.duplicateCache.get(duplicateKey);
Long currentTime = System.currentTimeMillis();
if (cacheTime != null && currentTime - cacheTime <
this.duplicateCacheExpirationTime) {
- hitCache.incrementAndGet();
context.breakExecute();
return t;
} else {
@@ -180,9 +175,6 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
this.duplicateCache = new
LongValueKV(this.duplicateCacheSize);
}
}
- if (total.get() % 5000 == 0) {
- System.out.printf("total: %s, hit: %s%n", total.get(),
hitCache.get());
- }
}
if (!t.getHeader().isSystemMessage()) {
@@ -450,9 +442,9 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
public String toString() {
String LINE = PrintUtil.LINE;
StringBuilder sb = new StringBuilder();
- sb.append("###namespace=" + getNameSpace() + "###" + LINE);
+
sb.append("###namespace=").append(getNameSpace()).append("###").append(LINE);
if (source != null) {
- sb.append(source.toString() + LINE);
+ sb.append(source.toString()).append(LINE);
}
if (stages != null) {
for (AbstractStage stage : stages) {
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
index d1b7ab6..264ac6b 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
@@ -66,7 +66,7 @@ public abstract class ChainStage<T extends IMessage> extends
AbstractStage<T> {
* @return
*/
public PiplineRecieverAfterCurrentNode getReceiverAfterCurrentNode() {
- ChainPipeline pipeline = (ChainPipeline)getPipeline();
+ ChainPipeline pipeline = (ChainPipeline) getPipeline();
return new PiplineRecieverAfterCurrentNode(pipeline);
}
@@ -153,7 +153,7 @@ public abstract class ChainStage<T extends IMessage>
extends AbstractStage<T> {
Set<ChainPipeline> set = new HashSet<>();
for (Pipeline pipeline : pipelines) {
if (pipeline != null) {
- set.add((ChainPipeline)pipeline);
+ set.add((ChainPipeline) pipeline);
}
}
sendSystem(message, context, set);
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
index 99e7ebe..c417a41 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
@@ -101,13 +101,14 @@ public class PipelineBuilder implements Serializable {
chainStage.setLabel(createConfigurableName(chainStage.getType()));
}
this.pipeline.addChainStage(chainStage);
+
return chainStage;
}
public List<String> createSQL() {
List<String> sqls = new ArrayList<>();
for (IConfigurable configurable : configurables) {
- AbstractConfigurable abstractConfigurable =
(AbstractConfigurable)configurable;
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)
configurable;
sqls.add(AbstractConfigurable.createSQL(configurable));
}
return sqls;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
index 1a2d8b1..f39a971 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
@@ -19,8 +19,10 @@ package org.apache.rocketmq.streams.common.topology.model;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -28,14 +30,14 @@ import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
import org.apache.rocketmq.streams.common.optimization.SQLLogFingerprintFilter;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
public abstract class AbstractStage<T extends IMessage> extends
BasedConfigurable
implements IStreamOperator<T, T>, ISystemMessageProcessor {
-
-
private static final Log LOG = LogFactory.getLog(AbstractStage.class);
public static final String TYPE = "stage";
@@ -50,6 +52,11 @@ public abstract class AbstractStage<T extends IMessage>
extends BasedConfigurabl
protected transient Pipeline pipeline;
/**
+ * 源头stage
+ */
+ protected transient AbstractStage sourceStage;
+
+ /**
* 设置路由label,当需要做路由选择时需要设置
*/
protected String label;
@@ -85,7 +92,6 @@ public abstract class AbstractStage<T extends IMessage>
extends BasedConfigurabl
return null;
}
try {
-
TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage",
label, t.getMessageBody().toJSONString());
} catch (Exception e) {
LOG.error("t.getMessageBody() parse error", e);
@@ -97,9 +103,9 @@ public abstract class AbstractStage<T extends IMessage>
extends BasedConfigurabl
Object result = handle.doMessage(t, context);
//
if (!context.isContinue() || result == null) {
- return (T)context.breakExecute();
+ return (T) context.breakExecute();
}
- return (T)result;
+ return (T) result;
}
/**
@@ -232,6 +238,64 @@ public abstract class AbstractStage<T extends IMessage>
extends BasedConfigurabl
}
}
+ /**
+ * 为最源头的stage加载指纹信息
+ */
+ protected void loadLogFinger() {
+ ChainPipeline<?> pipeline = (ChainPipeline<?>) getPipeline();
+ String filterName = getLabel();
+ if (!pipeline.isTopology()) {
+ List<?> stages = pipeline.getStages();
+ int i = 0;
+ for (Object stage : stages) {
+ if (stage == this) {
+ break;
+ }
+ i++;
+ }
+ filterName = i + "";
+ }
+ String key = MapKeyUtil.createKeyBySign(".", pipeline.getNameSpace(),
pipeline.getConfigureName(), filterName);
+ String logFingerFieldNames =
ComponentCreator.getProperties().getProperty(key);
+ if (logFingerFieldNames == null) {
+ return;
+ }
+ sourceStage = getSourceStage();
+ sourceStage.setLogFingerFieldNames(logFingerFieldNames);
+ sourceStage.setLogFingerFilterStageName(key);
+
sourceStage.setLogFingerprintFilter(SQLLogFingerprintFilter.getInstance());
+ }
+
+ /**
+ * 发现最源头的stage
+ *
+ * @return
+ */
+ protected AbstractStage getSourceStage() {
+ ChainPipeline pipline = (ChainPipeline) getPipeline();
+ if (pipline.isTopology()) {
+ Map<String, AbstractStage> stageMap = pipline.createStageMap();
+ AbstractStage currentStage = this;
+ List<String> prewLables = currentStage.getPrevStageLabels();
+ while (prewLables != null && prewLables.size() > 0) {
+ if (prewLables.size() > 1) {
+ return null;
+ }
+ String lable = prewLables.get(0);
+ AbstractStage stage = (AbstractStage) stageMap.get(lable);
+ if (stage != null) {
+ currentStage = stage;
+ } else {
+ return currentStage;
+ }
+ prewLables = currentStage.getPrevStageLabels();
+ }
+ return currentStage;
+ } else {
+ return (AbstractStage) pipline.getStages().get(0);
+ }
+ }
+
public List<String> getNextStageLabels() {
return nextStageLabels;
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
index ff5e6da..a040e3d 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
@@ -54,8 +54,6 @@ public class FilterChainStage<T extends IMessage, R extends
AbstractRule> extend
private transient Map<String, JSONObject> ruleName2JsonObject = new
HashMap<>();
public static transient Class componentClass =
ReflectUtil.forClass("org.apache.rocketmq.streams.filter.FilterComponent");
- protected transient AbstractStage sourceStage;
-
public FilterChainStage() {
setEntityName("filter");
}
@@ -84,7 +82,10 @@ public class FilterChainStage<T extends IMessage, R extends
AbstractRule> extend
List<R> fireRules = component.getService().executeRule(message,
context, rules);
//System.out.println("规则花费时间是:"+(System.currentTimeMillis()-start));
if (fireRules == null || fireRules.size() == 0) {
- addLogFingerprintFilter(message);
+ //增加日志指纹
+ if (sourceStage != null) {
+ sourceStage.addLogFingerprint(message);
+ }
TopologyFilterMonitor monitor =
message.getHeader().getPiplineExecutorMonitor();
if (monitor != null) {
if (monitor.getNotFireExpression2DependentFields() !=
null) {
@@ -142,17 +143,6 @@ public class FilterChainStage<T extends IMessage, R
extends AbstractRule> extend
};
/**
- * 增加日志指纹
- *
- * @param message
- */
- protected void addLogFingerprintFilter(IMessage message) {
- if (sourceStage != null) {
- sourceStage.addLogFingerprint(message);
- }
- }
-
- /**
* 如果是表达式,把表达式的值也提取出来
*
* @param expression
@@ -179,7 +169,7 @@ public class FilterChainStage<T extends IMessage, R extends
AbstractRule> extend
}
protected ScriptChainStage findScriptChainStage(AbstractStage stage) {
- ChainPipeline pipline = (ChainPipeline)stage.getPipeline();
+ ChainPipeline pipline = (ChainPipeline) stage.getPipeline();
if (pipline.isTopology()) {
List<String> lableNames = stage.getPrevStageLabels();
if (lableNames != null) {
@@ -187,7 +177,7 @@ public class FilterChainStage<T extends IMessage, R extends
AbstractRule> extend
Map<String, AbstractStage> stageMap =
pipline.getStageMap();
AbstractStage prewStage = stageMap.get(lableName);
if (prewStage != null &&
ScriptChainStage.class.isInstance(prewStage)) {
- return (ScriptChainStage)prewStage;
+ return (ScriptChainStage) prewStage;
}
if (prewStage != null) {
return findScriptChainStage(prewStage);
@@ -206,7 +196,7 @@ public class FilterChainStage<T extends IMessage, R extends
AbstractRule> extend
for (; i >= 0; i--) {
AbstractStage prewStage = stages.get(i);
if (ScriptChainStage.class.isInstance(prewStage)) {
- return (ScriptChainStage)prewStage;
+ return (ScriptChainStage) prewStage;
}
}
return null;
@@ -235,41 +225,40 @@ public class FilterChainStage<T extends IMessage, R
extends AbstractRule> extend
//rules[i]=(R)ruleList.get(i);
ruleName2JsonObject.put(ruleList.get(i).getConfigureName(),
ruleList.get(i).toOutputJson());
}
- rules = (R[])Array.newInstance(ruleList.get(0).getClass(),
matchedRules.size());
+ rules = (R[]) Array.newInstance(ruleList.get(0).getClass(),
matchedRules.size());
for (int i = 0; i < rules.length; i++) {
- rules[i] = (R)matchedRules.get(i);
+ rules[i] = (R) matchedRules.get(i);
}
}
} else {
if (names != null && names.size() > 0) {
AbstractRule rule =
configurableService.queryConfigurable(AbstractRule.TYPE, names.get(0));
- rules = (R[])Array.newInstance(rule.getClass(), names.size());
+ rules = (R[]) Array.newInstance(rule.getClass(), names.size());
}
int i = 0;
for (String name : names) {
AbstractRule rule =
configurableService.queryConfigurable(AbstractRule.TYPE, name);
- rules[i] = (R)rule;
+ rules[i] = (R) rule;
ruleName2JsonObject.put(rules[i].getConfigureName(),
rules[i].toOutputJson());
i++;
}
}
-
+ //加载指纹信息
loadLogFinger();
-
}
public void setRule(AbstractRule... rules) {
if (rules == null || rules.length == 0) {
return;
}
- this.rules = (R[])Array.newInstance(rules[0].getClass(), rules.length);
+ this.rules = (R[]) Array.newInstance(rules[0].getClass(),
rules.length);
if (names == null) {
names = new ArrayList<>();
}
int i = 0;
for (AbstractRule rule : rules) {
- this.rules[i] = (R)rules[i];
+ this.rules[i] = (R) rules[i];
names.add(rules[i].getConfigureName());
ruleName2JsonObject.put(rules[i].getConfigureName(),
rules[i].toOutputJson());
i++;
@@ -278,69 +267,6 @@ public class FilterChainStage<T extends IMessage, R
extends AbstractRule> extend
}
- /**
- * 从配置文件加载日志指纹信息,如果存在做指纹优化
- */
- protected void loadLogFinger() {
- ChainPipeline pipline = (ChainPipeline)getPipeline();
- String filterName = getLabel();
- if (pipline.isTopology() == false) {
- List<AbstractStage> stages = pipline.getStages();
- int i = 0;
- for (AbstractStage stage : stages) {
- if (stage == this) {
- break;
- }
- i++;
- }
- filterName = i + "";
- }
- String key = MapKeyUtil.createKeyBySign(".", pipline.getNameSpace(),
pipline.getConfigureName(), filterName);
- String logFingerFieldNames =
ComponentCreator.getProperties().getProperty(key);
- if (logFingerFieldNames == null) {
- return;
- }
- sourceStage = getSourceStage();
- int index = logFingerFieldNames.indexOf(";");
- if (index != -1) {
- String filterIndex = logFingerFieldNames.substring(0, index);
- logFingerFieldNames = logFingerFieldNames.substring(index + 1);
- }
- sourceStage.setLogFingerFieldNames(logFingerFieldNames);
- sourceStage.setLogFingerFilterStageName(key);
-
sourceStage.setLogFingerprintFilter(SQLLogFingerprintFilter.getInstance());
- }
-
- /**
- * 发现最源头的stage
- *
- * @return
- */
- protected AbstractStage getSourceStage() {
- ChainPipeline pipline = (ChainPipeline)getPipeline();
- if (pipline.isTopology()) {
- Map<String, AbstractStage> stageMap = pipline.createStageMap();
- AbstractStage currentStage = this;
- List<String> prewLables = currentStage.getPrevStageLabels();
- while (prewLables != null && prewLables.size() > 0) {
- if (prewLables.size() > 1) {
- return null;
- }
- String lable = prewLables.get(0);
- AbstractStage stage = (AbstractStage)stageMap.get(lable);
- if (stage != null) {
- currentStage = stage;
- } else {
- return currentStage;
- }
- prewLables = currentStage.getPrevStageLabels();
- }
- return currentStage;
- } else {
- return (AbstractStage)pipline.getStages().get(0);
- }
- }
-
public List<String> getNames() {
return names;
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
index 95925e8..c152945 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
@@ -64,6 +64,7 @@ public abstract class StageBuilder extends
AbstractStatelessChainStage<IMessage>
};
}
+
@Override
public boolean isAsyncNode() {
return false;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
index f6653ba..6c12753 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
@@ -30,10 +30,12 @@ import
org.apache.rocketmq.streams.common.utils.InstantiationUtil;
* 所有给用户自定义代码的通用类,会转化成这个stage
*/
public class UDFChainStage extends AbstractStatelessChainStage implements
IAfterConfigurableRefreshListener {
+
protected String
udfOperatorClassSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
protected transient StageBuilder selfChainStage;
- public UDFChainStage() {}
+ public UDFChainStage() {
+ }
public UDFChainStage(StageBuilder selfOperator) {
this.selfChainStage = selfOperator;
@@ -68,6 +70,30 @@ public class UDFChainStage extends
AbstractStatelessChainStage implements IAfter
public void doProcessAfterRefreshConfigurable(IConfigurableService
configurableService) {
byte[] bytes = Base64Utils.decode(udfOperatorClassSerializeValue);
selfChainStage = InstantiationUtil.deserializeObject(bytes);
+ loadLogFinger();
+ }
+
+ @Override public IMessage doMessage(IMessage t, AbstractContext context) {
+ if (filterByLogFingerprint(t)) {
+ context.breakExecute();
+ return null;
+ }
+ IStageHandle handle = selectHandle(t, context);
+ if (handle == null) {
+ return t;
+ }
+ IMessage result = handle.doMessage(t, context);
+ if (!context.isContinue() || result == null) {
+ if (context.get("NEED_USE_FINGER_PRINT") != null &&
Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
+ sourceStage.addLogFingerprint(t);
+ context.remove("NEED_USE_FINGER_PRINT");
+ }
+ return context.breakExecute();
+ }
+ if (context.get("NEED_USE_FINGER_PRINT") != null) {
+ context.remove("NEED_USE_FINGER_PRINT");
+ }
+ return result;
}
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
index 848e09a..7f89c8b 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
@@ -51,31 +51,31 @@ public class TraceUtil {
public static void debug(String traceId, String... messages) {
if (hit(traceId) && LOG.isDebugEnabled()) {
- LOG.debug(envelope(traceId, messages));
+// LOG.debug(envelope(traceId, messages));
}
}
public static void info(String traceId, String... messages) {
if (!skip(traceId) && LOG.isInfoEnabled()) {
- LOG.info(envelope(traceId, messages));
+// LOG.info(envelope(traceId, messages));
}
}
public static void warn(String traceId, String... messages) {
if (LOG.isWarnEnabled()) {
- LOG.warn(envelope(traceId, messages));
+// LOG.warn(envelope(traceId, messages));
}
}
public static void error(String traceId, String... messages) {
if (LOG.isErrorEnabled()) {
- LOG.error(envelope(traceId, messages));
+// LOG.error(envelope(traceId, messages));
}
}
public static void error(String traceId, Throwable throwable, String...
messages) {
if (LOG.isErrorEnabled()) {
- LOG.error(envelope(traceId, messages), throwable);
+// LOG.error(envelope(traceId, messages), throwable);
}
}
diff --git
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
index 4edc20a..00dd6be 100644
---
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
+++
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -160,7 +160,7 @@ public class ConfigurableComponent extends
AbstractComponent<IConfigurableServic
@SuppressWarnings("unchecked")
@Override
public <T> T queryConfigurable(String configurableType, String name) {
- return (T)queryConfigurableByIdent(configurableType, name);
+ return (T) queryConfigurableByIdent(configurableType, name);
}
//protected void insertConfigurable(JSONObject message, IConfigurable
configurable) {
@@ -170,7 +170,7 @@ public class ConfigurableComponent extends
AbstractComponent<IConfigurableServic
@Override
public String getNamespace() {
if (AbstractConfigurableService.class.isInstance(configureService)) {
- return
((AbstractConfigurableService)configureService).getNamespace();
+ return ((AbstractConfigurableService)
configureService).getNamespace();
}
return namespace;
}