This is an automated email from the ASF dual-hosted git repository.

seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new ed76a95  SDK添加去重机制和指纹机制
     new 6a3ea18  Merge pull request #77 from programer-0/develop
ed76a95 is described below

commit ed76a955795a0edcbb46de35eb36cb16205d09b1
Author: junjie.cheng <[email protected]>
AuthorDate: Mon Sep 27 17:25:33 2021 +0800

    SDK添加去重机制和指纹机制
---
 .../streams/client/source/DataStreamSource.java    |  18 +++-
 .../streams/client/strategy/WindowStrategy.java    |   4 +-
 .../streams/client/transform/DataStream.java       |  32 ++++++-
 .../rocketmq/streams/client/DataStreamTest.java    |  94 ++++++++++---------
 .../streams/common/context/MessageHeader.java      |   2 +-
 .../streams/common/functions/FilterFunction.java   |   1 +
 .../common/optimization/HyperscanRegex.java        |  74 ++++++++++-----
 .../streams/common/topology/ChainPipeline.java     |  12 +--
 .../streams/common/topology/ChainStage.java        |   4 +-
 .../common/topology/builder/PipelineBuilder.java   |   3 +-
 .../common/topology/model/AbstractStage.java       |  74 ++++++++++++++-
 .../common/topology/stages/FilterChainStage.java   | 102 +++------------------
 .../common/topology/stages/udf/StageBuilder.java   |   1 +
 .../common/topology/stages/udf/UDFChainStage.java  |  28 +++++-
 .../rocketmq/streams/common/utils/TraceUtil.java   |  10 +-
 .../configurable/ConfigurableComponent.java        |   4 +-
 16 files changed, 277 insertions(+), 186 deletions(-)

diff --git 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index e30acdb..1f564f6 100644
--- 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -17,9 +17,12 @@
 
 package org.apache.rocketmq.streams.client.source;
 
+import java.util.Properties;
+import javax.sql.DataSource;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.source.RocketMQSource;
 
@@ -32,10 +35,23 @@ public class DataStreamSource implements Serializable {
         this.mainPipelineBuilder = new PipelineBuilder(namespace, 
pipelineName);
     }
 
+    public DataStreamSource(String namespace, String pipelineName, String[] 
duplicateKeys, Long windowSize) {
+        this.mainPipelineBuilder = new PipelineBuilder(namespace, 
pipelineName);
+        Properties properties = new Properties();
+        properties.setProperty(pipelineName + ".duplicate.fields.names", 
String.join(";", duplicateKeys));
+        properties.setProperty(pipelineName + ".duplicate.expiration.time", 
String.valueOf(windowSize));
+        ComponentCreator.createProperties(properties);
+    }
+
     public static DataStreamSource create(String namespace, String 
pipelineName) {
         return new DataStreamSource(namespace, pipelineName);
     }
 
+    public static DataStreamSource create(String namespace, String 
pipelineName, String[] duplicateKeys,
+        Long expirationTime) {
+        return new DataStreamSource(namespace, pipelineName, duplicateKeys, 
expirationTime);
+    }
+
     public DataStream fromFile(String filePath) {
         return fromFile(filePath, true);
     }
@@ -68,7 +84,7 @@ public class DataStreamSource implements Serializable {
 
     public DataStream from(ISource<?> source) {
         this.mainPipelineBuilder.setSource(source);
-        return new DataStream(this.mainPipelineBuilder,null);
+        return new DataStream(this.mainPipelineBuilder, null);
     }
 
 }
diff --git 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
index 5b99131..ce42385 100644
--- 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
+++ 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
@@ -53,8 +53,8 @@ public class WindowStrategy implements Strategy {
         return new WindowStrategy();
     }
 
-    public static Strategy windowDefaultSiZe(int defualtSize){
-        
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,defualtSize);
+    public static Strategy windowDefaultSiZe(int defualtSize) {
+        
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,
 defualtSize);
         return null;
     }
 
diff --git 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index ece41f6..8049968 100644
--- 
a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ 
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -143,7 +143,6 @@ public class DataStream implements Serializable {
                         splitMessages.add(subMessage);
                     }
                     context.openSplitModel();
-                    ;
                     context.setSplitMessages(splitMessages);
                     return null;
                 } catch (Exception e) {
@@ -158,13 +157,17 @@ public class DataStream implements Serializable {
     }
 
     public <O> DataStream filter(final FilterFunction<O> filterFunction) {
-        StageBuilder mapUDFOperator = new StageBuilder() {
+        return filter(filterFunction, new String[] {});
+    }
 
+    public <O> DataStream filter(final FilterFunction<O> filterFunction, 
String... fingerprints) {
+        StageBuilder mapUDFOperator = new StageBuilder() {
             @Override
             protected <T> T operate(IMessage message, AbstractContext context) 
{
                 try {
-                    boolean isFilter = filterFunction.filter((O) 
message.getMessageValue());
-                    if (isFilter) {
+                    boolean tag = filterFunction.filter((O) 
message.getMessageValue());
+                    if (!tag) {
+                        context.put("NEED_USE_FINGER_PRINT", true);
                         context.breakExecute();
                     }
                 } catch (Exception e) {
@@ -175,6 +178,24 @@ public class DataStream implements Serializable {
         };
         ChainStage stage = 
this.mainPipelineBuilder.createStage(mapUDFOperator);
         this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
+
+        if (fingerprints.length > 0) {
+            ChainPipeline<?> pipeline = this.mainPipelineBuilder.getPipeline();
+            String filterName = stage.getLabel();
+            if (!pipeline.isTopology()) {
+                List<?> stages = pipeline.getStages();
+                int i = 0;
+                for (Object st : stages) {
+                    if (st == stage) {
+                        break;
+                    }
+                    i++;
+                }
+                filterName = i + "";
+            }
+            String key = MapKeyUtil.createKeyBySign(".", 
pipeline.getNameSpace(), pipeline.getConfigureName(), filterName);
+            ComponentCreator.getProperties().setProperty(key, String.join(",", 
fingerprints));
+        }
         return new DataStream(this.mainPipelineBuilder, 
this.otherPipelineBuilders, stage);
     }
 
@@ -463,7 +484,8 @@ public class DataStream implements Serializable {
         return toRocketmq(topic, tags, groupName, -1, nameServerAddress, 
clusterName, order);
     }
 
-    public DataStreamAction toRocketmq(String topic, String tags, String 
groupName, int batchSize, String nameServerAddress,
+    public DataStreamAction toRocketmq(String topic, String tags, String 
groupName, int batchSize,
+        String nameServerAddress,
         String clusterName, boolean order) {
         RocketMQSink rocketMQSink = new RocketMQSink();
         if (StringUtils.isNotBlank(topic)) {
diff --git 
a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
 
b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 2684c18..39aec16 100644
--- 
a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ 
b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -46,74 +46,86 @@ public class DataStreamTest implements Serializable {
     @Test
     public void testFromFile() {
         dataStream
-                .fromFile("/Users/junjie.cheng/text.txt", false)
-                .map(message -> message + "--")
-                .toPrint(1)
-                .start();
+            .fromFile("/Users/junjie.cheng/text.txt", false)
+            .map(message -> message + "--")
+            .toPrint(1)
+            .start();
     }
 
     @Test
     public void testRocketmq() {
         DataStreamSource dataStream = 
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
         dataStream
-                .fromRocketmq("topic_xxxx01", "consumer_xxxx01", 
"127.0.0.1:9876")
-                .map(message -> message + "--")
-                .toPrint(1)
-                .start();
+            .fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
+            .map(message -> message + "--")
+            .toPrint(1)
+            .start();
     }
 
     @Test
     public void testDBCheckPoint() {
         dataStream
-                .fromRocketmq("topic_xxxx02", "consumer_xxxx02", 
"127.0.0.1:9876")
-                .map(message -> message + "--")
-                .toPrint(1)
-                .with(WindowStrategy.exactlyOnce("", "", ""))
-                .start();
+            .fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
+            .map(message -> message + "--")
+            .toPrint(1)
+            .with(WindowStrategy.exactlyOnce("", "", ""))
+            .start();
     }
 
     @Test
     public void testFileCheckPoint() {
         dataStream
-                .fromFile("/Users/junjie.cheng/text.txt", false)
-                .map(message -> message + "--")
-                .toPrint(1)
-                .with(WindowStrategy.highPerformance())
-                .start();
+            .fromFile("/Users/junjie.cheng/text.txt", false)
+            .map(message -> message + "--")
+            .toPrint(1)
+            .with(WindowStrategy.highPerformance())
+            .start();
     }
 
-
     @Test
     public void testWindow() {
         DataStreamSource dataStream = 
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
         dataStream
-                .fromRocketmq("topic_xxxx03", "consumer_xxxx03", 
"127.0.0.1:9876")
-                .map(new MapFunction<JSONObject, String>() {
-
-                    @Override
-                    public JSONObject map(String message) throws Exception {
-                        JSONObject msg = JSONObject.parseObject(message);
-                        return msg;
-                    }
-                })
-                .window(TumblingWindow.of(Time.seconds(5)))
-                .groupBy("name", "age")
-                .count("c")
-                .sum("score", "scoreValue")
-                .toDataSteam()
-                .toPrint(1)
-                .with(WindowStrategy.exactlyOnce("", "", ""))
-                .start();
+            .fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
+            .map(new MapFunction<JSONObject, String>() {
+
+                @Override
+                public JSONObject map(String message) throws Exception {
+                    JSONObject msg = JSONObject.parseObject(message);
+                    return msg;
+                }
+            })
+            .window(TumblingWindow.of(Time.seconds(5)))
+            .groupBy("name", "age")
+            .count("c")
+            .sum("score", "scoreValue")
+            .toDataSteam()
+            .toPrint(1)
+            .with(WindowStrategy.exactlyOnce("", "", ""))
+            .start();
+    }
+
+    @Test
+    public void testFingerPrintStrategy() {
+        dataStream
+            .fromFile("/Users/junjie.cheng/text.txt", false)
+            .map(message -> message + "--")
+            .toPrint(1)
+            .start();
+
     }
 
     @Test
     public void testBothStrategy() {
         dataStream
-                .fromRocketmq("topic_xxxx04", "consumer_xxxx04", 
"127.0.0.1:9876")
-                .map(message -> message + "--")
-                .toPrint(1)
-                .with()
-                .start();
+            .fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
+            .map(message -> message + "--")
+            .filter(message -> {
+                return true;
+            })
+            .toPrint(1)
+            .with()
+            .start();
     }
 
     @Test
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 4044b06..2613461 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -126,7 +126,7 @@ public class MessageHeader {
         header.msgRouteFromLable = msgRouteFromLable;
         header.logFingerprintValue = logFingerprintValue;
         header.messageQueue = messageQueue;
-        header.checkpointQueueIds=checkpointQueueIds;
+        header.checkpointQueueIds = checkpointQueueIds;
         return header;
     }
 
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
index 350b6fd..f7df80e 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java
@@ -19,4 +19,5 @@ package org.apache.rocketmq.streams.common.functions;
 public interface FilterFunction<T> extends Function {
 
     boolean filter(T value) throws Exception;
+
 }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
index ed37e1c..736b0a6 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.streams.common.optimization;
 
+import com.gliwka.hyperscan.wrapper.CompileErrorException;
 import com.gliwka.hyperscan.wrapper.Database;
 import com.gliwka.hyperscan.wrapper.Expression;
 import com.gliwka.hyperscan.wrapper.ExpressionFlag;
@@ -27,13 +28,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
 
 public class HyperscanRegex<T> {
-    protected List<Expression> regexs = new ArrayList<>();
+    protected List<Expression> allRegexes = new ArrayList<>();//all registe 
regex
+
     protected Database db;
     protected Scanner scanner;
     protected AtomicBoolean hasCompile = new AtomicBoolean(false);
-    protected List<T> list = new ArrayList<>();
+    protected List<T> expressionContextList = new ArrayList<>();
+
+    protected List<Expression> notSupportCompileExpression = new 
ArrayList<>();//can not comile expressions
+    protected List<Expression> supportCompileExpression = new 
ArrayList<>();//all regex exclude not support compile
 
     /**
      * 把多个表达式放到库里
@@ -41,9 +47,10 @@ public class HyperscanRegex<T> {
      * @param regex
      */
     public void addRegex(String regex, T context) {
-        list.add(context);
-        Expression expression = new Expression(regex, 
EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS, 
ExpressionFlag.SINGLEMATCH), list.size() - 1);
-        regexs.add(expression);
+        expressionContextList.add(context);
+        Expression expression = new Expression(regex, 
EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS, 
ExpressionFlag.SINGLEMATCH), expressionContextList.size() - 1);
+        allRegexes.add(expression);
+        supportCompileExpression.add(expression);
         db = null;
         scanner = null;
         hasCompile.set(false);
@@ -53,17 +60,25 @@ public class HyperscanRegex<T> {
      * 完成编译
      */
     public void compile() {
-        try {
-            if (hasCompile.compareAndSet(false, true) && regexs.size() > 0) {
-                Database db = Database.compile(regexs);
+        if (!hasCompile.compareAndSet(false, true) || 
supportCompileExpression.size() == 0) {
+            return;
+        }
+        while (true) {
+            try {
+                if (supportCompileExpression.size() == 0) {
+                    break;
+                }
+                Database db = Database.compile(supportCompileExpression);
                 Scanner scanner = new Scanner();
                 scanner.allocScratch(db);
                 this.db = db;
                 this.scanner = scanner;
+                break;
+            } catch (CompileErrorException e) {
+                Expression expression = e.getFailedExpression();
+                this.supportCompileExpression.remove(expression);
+                this.notSupportCompileExpression.add(expression);
             }
-
-        } catch (Exception e) {
-            System.out.println("can not support this regex " + e.getMessage());
         }
 
     }
@@ -75,15 +90,14 @@ public class HyperscanRegex<T> {
      * @return
      */
     public boolean match(String content) {
-        if (scanner == null || db == null || hasCompile.get() == false) {
+        if (scanner == null || db == null || !hasCompile.get()) {
             compile();
         }
-        List<Match> matches = scanner.scan(db, content);
-        if (matches.size() > 0) {
-            return true;
-        } else {
+        if (content == null) {
             return false;
         }
+        List<Match> matches = scanner.scan(db, content);
+        return matches.size() > 0;
     }
 
     /**
@@ -93,18 +107,34 @@ public class HyperscanRegex<T> {
      * @return
      */
     public Set<T> matchExpression(String content) {
-        if (scanner == null || db == null || hasCompile.get() == false) {
+        if (scanner == null || db == null || !hasCompile.get()) {
             compile();
         }
+        if (content == null) {
+            return new HashSet<>();
+        }
         List<Match> matches = scanner.scan(db, content);
         Set<T> fireExpressions = new HashSet<>();
-        if (matches.size() == 0) {
-            return fireExpressions;
+        if (this.notSupportCompileExpression.size() > 0) {
+            for (Expression expression : this.notSupportCompileExpression) {
+                String regex = expression.getExpression();
+                boolean isMatch = 
StringUtil.matchRegexCaseInsensitive(content, regex);
+                if (isMatch) {
+                    int index = expression.getId();
+                    fireExpressions.add(expressionContextList.get(index));
+                }
+            }
         }
-        for (Match match : matches) {
-            Integer index = match.getMatchedExpression().getId();
-            fireExpressions.add(list.get(index));
+        if (matches.size() > 0) {
+            for (Match match : matches) {
+                Integer index = match.getMatchedExpression().getId();
+                fireExpressions.add(expressionContextList.get(index));
+            }
         }
         return fireExpressions;
     }
+
+    public int size() {
+        return allRegexes.size();
+    }
 }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 244ed3b..6762e8e 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -152,9 +152,6 @@ public class ChainPipeline<T extends IMessage> extends 
Pipeline<T> implements IA
         return MapKeyUtil.createKeyBySign(".", getType(), getNameSpace(), 
getConfigureName());
     }
 
-    private static AtomicInteger total = new AtomicInteger(0);
-    private static AtomicInteger hitCache = new AtomicInteger(0);
-
     /**
      * 可以替换某个阶段的阶段,而不用配置的阶段
      *
@@ -166,12 +163,10 @@ public class ChainPipeline<T extends IMessage> extends 
Pipeline<T> implements IA
     @Override
     protected T doMessageInner(T t, AbstractContext context, AbstractStage... 
replaceStage) {
         if (this.duplicateCache != null && this.duplicateFields != null && 
!this.duplicateFields.isEmpty() && !t.getHeader().isSystemMessage()) {
-            total.incrementAndGet();
             String duplicateKey = createDuplicateKey(t);
             Long cacheTime = this.duplicateCache.get(duplicateKey);
             Long currentTime = System.currentTimeMillis();
             if (cacheTime != null && currentTime - cacheTime < 
this.duplicateCacheExpirationTime) {
-                hitCache.incrementAndGet();
                 context.breakExecute();
                 return t;
             } else {
@@ -180,9 +175,6 @@ public class ChainPipeline<T extends IMessage> extends 
Pipeline<T> implements IA
                     this.duplicateCache = new 
LongValueKV(this.duplicateCacheSize);
                 }
             }
-            if (total.get() % 5000 == 0) {
-                System.out.printf("total: %s, hit: %s%n", total.get(), 
hitCache.get());
-            }
         }
 
         if (!t.getHeader().isSystemMessage()) {
@@ -450,9 +442,9 @@ public class ChainPipeline<T extends IMessage> extends 
Pipeline<T> implements IA
     public String toString() {
         String LINE = PrintUtil.LINE;
         StringBuilder sb = new StringBuilder();
-        sb.append("###namespace=" + getNameSpace() + "###" + LINE);
+        
sb.append("###namespace=").append(getNameSpace()).append("###").append(LINE);
         if (source != null) {
-            sb.append(source.toString() + LINE);
+            sb.append(source.toString()).append(LINE);
         }
         if (stages != null) {
             for (AbstractStage stage : stages) {
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
index d1b7ab6..264ac6b 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
@@ -66,7 +66,7 @@ public abstract class ChainStage<T extends IMessage> extends 
AbstractStage<T> {
      * @return
      */
     public PiplineRecieverAfterCurrentNode getReceiverAfterCurrentNode() {
-        ChainPipeline pipeline = (ChainPipeline)getPipeline();
+        ChainPipeline pipeline = (ChainPipeline) getPipeline();
 
         return new PiplineRecieverAfterCurrentNode(pipeline);
     }
@@ -153,7 +153,7 @@ public abstract class ChainStage<T extends IMessage> 
extends AbstractStage<T> {
         Set<ChainPipeline> set = new HashSet<>();
         for (Pipeline pipeline : pipelines) {
             if (pipeline != null) {
-                set.add((ChainPipeline)pipeline);
+                set.add((ChainPipeline) pipeline);
             }
         }
         sendSystem(message, context, set);
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
index 99e7ebe..c417a41 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/builder/PipelineBuilder.java
@@ -101,13 +101,14 @@ public class PipelineBuilder implements Serializable {
             chainStage.setLabel(createConfigurableName(chainStage.getType()));
         }
         this.pipeline.addChainStage(chainStage);
+
         return chainStage;
     }
 
     public List<String> createSQL() {
         List<String> sqls = new ArrayList<>();
         for (IConfigurable configurable : configurables) {
-            AbstractConfigurable abstractConfigurable = 
(AbstractConfigurable)configurable;
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable) 
configurable;
             sqls.add(AbstractConfigurable.createSQL(configurable));
         }
         return sqls;
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
index 1a2d8b1..f39a971 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
@@ -19,8 +19,10 @@ package org.apache.rocketmq.streams.common.topology.model;
 import com.alibaba.fastjson.JSONObject;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
@@ -28,14 +30,14 @@ import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
 import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
 import org.apache.rocketmq.streams.common.optimization.SQLLogFingerprintFilter;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.common.utils.TraceUtil;
 
 public abstract class AbstractStage<T extends IMessage> extends 
BasedConfigurable
     implements IStreamOperator<T, T>, ISystemMessageProcessor {
 
-
-
     private static final Log LOG = LogFactory.getLog(AbstractStage.class);
 
     public static final String TYPE = "stage";
@@ -50,6 +52,11 @@ public abstract class AbstractStage<T extends IMessage> 
extends BasedConfigurabl
     protected transient Pipeline pipeline;
 
     /**
+     * 源头stage
+     */
+    protected transient AbstractStage sourceStage;
+
+    /**
      * 设置路由label,当需要做路由选择时需要设置
      */
     protected String label;
@@ -85,7 +92,6 @@ public abstract class AbstractStage<T extends IMessage> 
extends BasedConfigurabl
             return null;
         }
         try {
-
             TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", 
label, t.getMessageBody().toJSONString());
         } catch (Exception e) {
             LOG.error("t.getMessageBody() parse error", e);
@@ -97,9 +103,9 @@ public abstract class AbstractStage<T extends IMessage> 
extends BasedConfigurabl
         Object result = handle.doMessage(t, context);
         //
         if (!context.isContinue() || result == null) {
-            return (T)context.breakExecute();
+            return (T) context.breakExecute();
         }
-        return (T)result;
+        return (T) result;
     }
 
     /**
@@ -232,6 +238,64 @@ public abstract class AbstractStage<T extends IMessage> 
extends BasedConfigurabl
         }
     }
 
+    /**
+     * 为最源头的stage加载指纹信息
+     */
+    protected void loadLogFinger() {
+        ChainPipeline<?> pipeline = (ChainPipeline<?>) getPipeline();
+        String filterName = getLabel();
+        if (!pipeline.isTopology()) {
+            List<?> stages = pipeline.getStages();
+            int i = 0;
+            for (Object stage : stages) {
+                if (stage == this) {
+                    break;
+                }
+                i++;
+            }
+            filterName = i + "";
+        }
+        String key = MapKeyUtil.createKeyBySign(".", pipeline.getNameSpace(), 
pipeline.getConfigureName(), filterName);
+        String logFingerFieldNames = 
ComponentCreator.getProperties().getProperty(key);
+        if (logFingerFieldNames == null) {
+            return;
+        }
+        sourceStage = getSourceStage();
+        sourceStage.setLogFingerFieldNames(logFingerFieldNames);
+        sourceStage.setLogFingerFilterStageName(key);
+        
sourceStage.setLogFingerprintFilter(SQLLogFingerprintFilter.getInstance());
+    }
+
+    /**
+     * 发现最源头的stage
+     *
+     * @return
+     */
+    protected AbstractStage getSourceStage() {
+        ChainPipeline pipline = (ChainPipeline) getPipeline();
+        if (pipline.isTopology()) {
+            Map<String, AbstractStage> stageMap = pipline.createStageMap();
+            AbstractStage currentStage = this;
+            List<String> prewLables = currentStage.getPrevStageLabels();
+            while (prewLables != null && prewLables.size() > 0) {
+                if (prewLables.size() > 1) {
+                    return null;
+                }
+                String lable = prewLables.get(0);
+                AbstractStage stage = (AbstractStage) stageMap.get(lable);
+                if (stage != null) {
+                    currentStage = stage;
+                } else {
+                    return currentStage;
+                }
+                prewLables = currentStage.getPrevStageLabels();
+            }
+            return currentStage;
+        } else {
+            return (AbstractStage) pipline.getStages().get(0);
+        }
+    }
+
     public List<String> getNextStageLabels() {
         return nextStageLabels;
     }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
index ff5e6da..a040e3d 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/FilterChainStage.java
@@ -54,8 +54,6 @@ public class FilterChainStage<T extends IMessage, R extends 
AbstractRule> extend
     private transient Map<String, JSONObject> ruleName2JsonObject = new 
HashMap<>();
     public static transient Class componentClass = 
ReflectUtil.forClass("org.apache.rocketmq.streams.filter.FilterComponent");
 
-    protected transient AbstractStage sourceStage;
-
     public FilterChainStage() {
         setEntityName("filter");
     }
@@ -84,7 +82,10 @@ public class FilterChainStage<T extends IMessage, R extends 
AbstractRule> extend
             List<R> fireRules = component.getService().executeRule(message, 
context, rules);
             
//System.out.println("规则花费时间是:"+(System.currentTimeMillis()-start));
             if (fireRules == null || fireRules.size() == 0) {
-                addLogFingerprintFilter(message);
+                //增加日志指纹
+                if (sourceStage != null) {
+                    sourceStage.addLogFingerprint(message);
+                }
                 TopologyFilterMonitor monitor = 
message.getHeader().getPiplineExecutorMonitor();
                 if (monitor != null) {
                     if (monitor.getNotFireExpression2DependentFields() != 
null) {
@@ -142,17 +143,6 @@ public class FilterChainStage<T extends IMessage, R 
extends AbstractRule> extend
     };
 
     /**
-     * 增加日志指纹
-     *
-     * @param message
-     */
-    protected void addLogFingerprintFilter(IMessage message) {
-        if (sourceStage != null) {
-            sourceStage.addLogFingerprint(message);
-        }
-    }
-
-    /**
      * 如果是表达式,把表达式的值也提取出来
      *
      * @param expression
@@ -179,7 +169,7 @@ public class FilterChainStage<T extends IMessage, R extends 
AbstractRule> extend
     }
 
     protected ScriptChainStage findScriptChainStage(AbstractStage stage) {
-        ChainPipeline pipline = (ChainPipeline)stage.getPipeline();
+        ChainPipeline pipline = (ChainPipeline) stage.getPipeline();
         if (pipline.isTopology()) {
             List<String> lableNames = stage.getPrevStageLabels();
             if (lableNames != null) {
@@ -187,7 +177,7 @@ public class FilterChainStage<T extends IMessage, R extends 
AbstractRule> extend
                     Map<String, AbstractStage> stageMap = 
pipline.getStageMap();
                     AbstractStage prewStage = stageMap.get(lableName);
                     if (prewStage != null && 
ScriptChainStage.class.isInstance(prewStage)) {
-                        return (ScriptChainStage)prewStage;
+                        return (ScriptChainStage) prewStage;
                     }
                     if (prewStage != null) {
                         return findScriptChainStage(prewStage);
@@ -206,7 +196,7 @@ public class FilterChainStage<T extends IMessage, R extends 
AbstractRule> extend
             for (; i >= 0; i--) {
                 AbstractStage prewStage = stages.get(i);
                 if (ScriptChainStage.class.isInstance(prewStage)) {
-                    return (ScriptChainStage)prewStage;
+                    return (ScriptChainStage) prewStage;
                 }
             }
             return null;
@@ -235,41 +225,40 @@ public class FilterChainStage<T extends IMessage, R 
extends AbstractRule> extend
                     //rules[i]=(R)ruleList.get(i);
                     
ruleName2JsonObject.put(ruleList.get(i).getConfigureName(), 
ruleList.get(i).toOutputJson());
                 }
-                rules = (R[])Array.newInstance(ruleList.get(0).getClass(), 
matchedRules.size());
+                rules = (R[]) Array.newInstance(ruleList.get(0).getClass(), 
matchedRules.size());
                 for (int i = 0; i < rules.length; i++) {
-                    rules[i] = (R)matchedRules.get(i);
+                    rules[i] = (R) matchedRules.get(i);
                 }
             }
         } else {
             if (names != null && names.size() > 0) {
                 AbstractRule rule = 
configurableService.queryConfigurable(AbstractRule.TYPE, names.get(0));
-                rules = (R[])Array.newInstance(rule.getClass(), names.size());
+                rules = (R[]) Array.newInstance(rule.getClass(), names.size());
 
             }
             int i = 0;
             for (String name : names) {
                 AbstractRule rule = 
configurableService.queryConfigurable(AbstractRule.TYPE, name);
-                rules[i] = (R)rule;
+                rules[i] = (R) rule;
                 ruleName2JsonObject.put(rules[i].getConfigureName(), 
rules[i].toOutputJson());
                 i++;
             }
         }
-
+        //加载指纹信息
         loadLogFinger();
-
     }
 
     public void setRule(AbstractRule... rules) {
         if (rules == null || rules.length == 0) {
             return;
         }
-        this.rules = (R[])Array.newInstance(rules[0].getClass(), rules.length);
+        this.rules = (R[]) Array.newInstance(rules[0].getClass(), 
rules.length);
         if (names == null) {
             names = new ArrayList<>();
         }
         int i = 0;
         for (AbstractRule rule : rules) {
-            this.rules[i] = (R)rules[i];
+            this.rules[i] = (R) rules[i];
             names.add(rules[i].getConfigureName());
             ruleName2JsonObject.put(rules[i].getConfigureName(), 
rules[i].toOutputJson());
             i++;
@@ -278,69 +267,6 @@ public class FilterChainStage<T extends IMessage, R 
extends AbstractRule> extend
 
     }
 
-    /**
-     * 从配置文件加载日志指纹信息,如果存在做指纹优化
-     */
-    protected void loadLogFinger() {
-        ChainPipeline pipline = (ChainPipeline)getPipeline();
-        String filterName = getLabel();
-        if (pipline.isTopology() == false) {
-            List<AbstractStage> stages = pipline.getStages();
-            int i = 0;
-            for (AbstractStage stage : stages) {
-                if (stage == this) {
-                    break;
-                }
-                i++;
-            }
-            filterName = i + "";
-        }
-        String key = MapKeyUtil.createKeyBySign(".", pipline.getNameSpace(), 
pipline.getConfigureName(), filterName);
-        String logFingerFieldNames = 
ComponentCreator.getProperties().getProperty(key);
-        if (logFingerFieldNames == null) {
-            return;
-        }
-        sourceStage = getSourceStage();
-        int index = logFingerFieldNames.indexOf(";");
-        if (index != -1) {
-            String filterIndex = logFingerFieldNames.substring(0, index);
-            logFingerFieldNames = logFingerFieldNames.substring(index + 1);
-        }
-        sourceStage.setLogFingerFieldNames(logFingerFieldNames);
-        sourceStage.setLogFingerFilterStageName(key);
-        
sourceStage.setLogFingerprintFilter(SQLLogFingerprintFilter.getInstance());
-    }
-
-    /**
-     * 发现最源头的stage
-     *
-     * @return
-     */
-    protected AbstractStage getSourceStage() {
-        ChainPipeline pipline = (ChainPipeline)getPipeline();
-        if (pipline.isTopology()) {
-            Map<String, AbstractStage> stageMap = pipline.createStageMap();
-            AbstractStage currentStage = this;
-            List<String> prewLables = currentStage.getPrevStageLabels();
-            while (prewLables != null && prewLables.size() > 0) {
-                if (prewLables.size() > 1) {
-                    return null;
-                }
-                String lable = prewLables.get(0);
-                AbstractStage stage = (AbstractStage)stageMap.get(lable);
-                if (stage != null) {
-                    currentStage = stage;
-                } else {
-                    return currentStage;
-                }
-                prewLables = currentStage.getPrevStageLabels();
-            }
-            return currentStage;
-        } else {
-            return (AbstractStage)pipline.getStages().get(0);
-        }
-    }
-
     public List<String> getNames() {
         return names;
     }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
index 95925e8..c152945 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/StageBuilder.java
@@ -64,6 +64,7 @@ public abstract class StageBuilder extends 
AbstractStatelessChainStage<IMessage>
         };
     }
 
+
     @Override
     public boolean isAsyncNode() {
         return false;
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
index f6653ba..6c12753 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/udf/UDFChainStage.java
@@ -30,10 +30,12 @@ import 
org.apache.rocketmq.streams.common.utils.InstantiationUtil;
  * 所有给用户自定义代码的通用类,会转化成这个stage
  */
 public class UDFChainStage extends AbstractStatelessChainStage implements 
IAfterConfigurableRefreshListener {
+
     protected String 
udfOperatorClassSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
     protected transient StageBuilder selfChainStage;
 
-    public UDFChainStage() {}
+    public UDFChainStage() {
+    }
 
     public UDFChainStage(StageBuilder selfOperator) {
         this.selfChainStage = selfOperator;
@@ -68,6 +70,30 @@ public class UDFChainStage extends 
AbstractStatelessChainStage implements IAfter
     public void doProcessAfterRefreshConfigurable(IConfigurableService 
configurableService) {
         byte[] bytes = Base64Utils.decode(udfOperatorClassSerializeValue);
         selfChainStage = InstantiationUtil.deserializeObject(bytes);
+        loadLogFinger();
+    }
+
+    @Override public IMessage doMessage(IMessage t, AbstractContext context) {
+        if (filterByLogFingerprint(t)) {
+            context.breakExecute();
+            return null;
+        }
+        IStageHandle handle = selectHandle(t, context);
+        if (handle == null) {
+            return t;
+        }
+        IMessage result = handle.doMessage(t, context);
+        if (!context.isContinue() || result == null) {
+            if (context.get("NEED_USE_FINGER_PRINT") != null && 
Boolean.parseBoolean(context.get("NEED_USE_FINGER_PRINT").toString())) {
+                sourceStage.addLogFingerprint(t);
+                context.remove("NEED_USE_FINGER_PRINT");
+            }
+            return context.breakExecute();
+        }
+        if (context.get("NEED_USE_FINGER_PRINT") != null) {
+            context.remove("NEED_USE_FINGER_PRINT");
+        }
+        return result;
     }
 
 }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
index 848e09a..7f89c8b 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/TraceUtil.java
@@ -51,31 +51,31 @@ public class TraceUtil {
 
     public static void debug(String traceId, String... messages) {
         if (hit(traceId) && LOG.isDebugEnabled()) {
-            LOG.debug(envelope(traceId, messages));
+//            LOG.debug(envelope(traceId, messages));
         }
     }
 
     public static void info(String traceId, String... messages) {
         if (!skip(traceId) && LOG.isInfoEnabled()) {
-            LOG.info(envelope(traceId, messages));
+//            LOG.info(envelope(traceId, messages));
         }
     }
 
     public static void warn(String traceId, String... messages) {
         if (LOG.isWarnEnabled()) {
-            LOG.warn(envelope(traceId, messages));
+//            LOG.warn(envelope(traceId, messages));
         }
     }
 
     public static void error(String traceId, String... messages) {
         if (LOG.isErrorEnabled()) {
-            LOG.error(envelope(traceId, messages));
+//            LOG.error(envelope(traceId, messages));
         }
     }
 
     public static void error(String traceId, Throwable throwable, String... 
messages) {
         if (LOG.isErrorEnabled()) {
-            LOG.error(envelope(traceId, messages), throwable);
+//            LOG.error(envelope(traceId, messages), throwable);
         }
     }
 
diff --git 
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
 
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
index 4edc20a..00dd6be 100644
--- 
a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
+++ 
b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -160,7 +160,7 @@ public class ConfigurableComponent extends 
AbstractComponent<IConfigurableServic
     @SuppressWarnings("unchecked")
     @Override
     public <T> T queryConfigurable(String configurableType, String name) {
-        return (T)queryConfigurableByIdent(configurableType, name);
+        return (T) queryConfigurableByIdent(configurableType, name);
     }
 
     //protected void insertConfigurable(JSONObject message, IConfigurable 
configurable) {
@@ -170,7 +170,7 @@ public class ConfigurableComponent extends 
AbstractComponent<IConfigurableServic
     @Override
     public String getNamespace() {
         if (AbstractConfigurableService.class.isInstance(configureService)) {
-            return 
((AbstractConfigurableService)configureService).getNamespace();
+            return ((AbstractConfigurableService) 
configureService).getNamespace();
         }
         return namespace;
     }

Reply via email to