This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 54426def [ISSUE #199]Support rsqldb (#200)
54426def is described below

commit 54426def8ca6b2e13c967e3b8e1fc261731c3003
Author: Ni Ze <[email protected]>
AuthorDate: Tue Aug 30 21:04:05 2022 +0800

    [ISSUE #199]Support rsqldb (#200)
    
    * feat(nest join) support nest join
    
    * maintain(example) remove MqttSourceExample
---
 .../channel/sinkcache/impl/MessageCache.java       |  1 +
 .../streams/common/context/MessageHeader.java      | 10 ++++++
 .../streams/common/topology/ChainPipeline.java     |  1 +
 .../common/topology/stages/JoinChainStage.java     | 10 ++++--
 .../streams/examples/source/MqttSourceExample.java | 42 ----------------------
 .../streams/window/operator/join/JoinWindow.java   | 17 ++++++++-
 .../streams/window/shuffle/ShuffleChannel.java     |  5 +++
 7 files changed, 40 insertions(+), 46 deletions(-)

diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index dcb7abbb..7586d3e6 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import 
org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
 import 
org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.context.Message;
 import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
 import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
 
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 0b105430..70d93106 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -99,6 +99,8 @@ public class MessageHeader {
 
     protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table 
name来标记
 
+    private String originTable;
+
     protected String logFingerprintValue;//日志指纹的值
 
     public MessageHeader copy() {
@@ -359,4 +361,12 @@ public class MessageHeader {
     public void setPipelineName(String pipelineName) {
         this.pipelineName = pipelineName;
     }
+
+    public String getOriginTable() {
+        return originTable;
+    }
+
+    public void setOriginTable(String originTable) {
+        this.originTable = originTable;
+    }
 }
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 96416044..87743696 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -290,6 +290,7 @@ public class ChainPipeline<T extends IMessage> extends 
Pipeline<T> implements IA
             //boolean needFlush = needFlush(msg);
             if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) {
                 msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName);
+                msg.getHeader().setOriginTable(oriMsgPrewSourceName);
             }
             boolean isContinue = executeStage(stage, msg, copyContext);
 
diff --git 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
index 9cb9df35..cf4b9e9f 100644
--- 
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
+++ 
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
@@ -44,8 +44,14 @@ public class JoinChainStage<T extends IMessage> extends 
AbstractWindowStage<T> {
         @Override
         protected IMessage doProcess(IMessage message, AbstractContext 
context) {
             String lable = message.getHeader().getMsgRouteFromLable();
+            String originTable = message.getHeader().getOriginTable();
+
             String joinFlag = null;
             if (lable != null) {
+                if ((lable.equals("left") || lable.equals("right")) && 
originTable != null) {
+                    lable = originTable;
+                }
+
                 if (lable.equals(rightDependentTableName)) {
                     joinFlag = MessageHeader.JOIN_RIGHT;
                 } else {
@@ -61,9 +67,7 @@ public class JoinChainStage<T extends IMessage> extends 
AbstractWindowStage<T> {
             } else {
                 rightPipeline.doMessage(message, context);
             }
-            //if(!MessageGloableTrace.existFinshBranch(message)){
-            //    context.setBreak(true);
-            //}
+
             context.breakExecute();
             return message;
         }
diff --git 
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
 
b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
deleted file mode 100644
index 33b8b0a4..00000000
--- 
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.examples.source;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.strategy.ShuffleStrategy;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-
-public class MqttSourceExample {
-
-    public static void main(String[] args) {
-        DataStreamSource dataStream = 
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
-        dataStream.fromMqtt("xxxxx", "xxxx", "xxxxxx", "", "")
-            .flatMap(message -> ((JSONObject) message).getJSONArray("Data"))
-            .window(TumblingWindow.of(Time.minutes(1)))
-            .groupBy("AttributeCode")
-            .setLocalStorageOnly(true)
-            .avg("Value", "avg_value")
-            .toDataStream()
-            .toPrint()
-            .with(ShuffleStrategy.shuffleWithMemory())
-            .start();
-    }
-
-}
diff --git 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b16002a3..eea5986a 100644
--- 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -102,7 +102,6 @@ public class JoinWindow extends AbstractShuffleWindow {
 
             if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) {
                 storage.putWindowBaseValue(queueId, windowInstanceId, 
WindowType.JOIN_WINDOW, WindowJoinType.left, temp);
-
             } else if 
(WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) {
                 storage.putWindowBaseValue(queueId, windowInstanceId, 
WindowType.JOIN_WINDOW, WindowJoinType.right, temp);
             } else {
@@ -490,6 +489,22 @@ public class JoinWindow extends AbstractShuffleWindow {
         if (needFlush) {
             nextMessage.getHeader().setNeedFlush(true);
         }
+
+        String routeLabel = nextMessage.getHeader().getMsgRouteFromLable();
+        if (routeLabel == null) {
+            //嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel
+            String configureName = this.getConfigureName();
+            String[] tempList = configureName.split("_");
+            for (int i = tempList.length -1; i > 0; i--) {
+                if ("left".equalsIgnoreCase(tempList[i]) || 
"right".equalsIgnoreCase(tempList[i])) {
+                    routeLabel = tempList[i];
+                    System.out.println("nested join, routeLabel=" + 
routeLabel);
+                    break;
+                }
+            }
+            nextMessage.getHeader().setMsgRouteFromLable(routeLabel);
+        }
+
         AbstractContext context = new Context(nextMessage);
         boolean isWindowTest = 
ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
         if (isWindowTest) {
diff --git 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index bad436fd..7b8175c4 100644
--- 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -229,6 +229,11 @@ public class ShuffleChannel extends AbstractSystemChannel {
         for (String splitId : splitIds) {
              this.loadResult.put(splitId, future);
         }
+
+        if (message.getHeader().isSystemMessage() && window.getFireReceiver() 
== null) {
+            return;
+        }
+
         window.getFireReceiver().doMessage(message, context);
     }
 

Reply via email to