This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 54426def [ISSUE #199]Support rsqldb (#200)
54426def is described below
commit 54426def8ca6b2e13c967e3b8e1fc261731c3003
Author: Ni Ze <[email protected]>
AuthorDate: Tue Aug 30 21:04:05 2022 +0800
[ISSUE #199]Support rsqldb (#200)
* feat(nest join) support nest join
* maintain(example) remove MqttSourceExample
---
.../channel/sinkcache/impl/MessageCache.java | 1 +
.../streams/common/context/MessageHeader.java | 10 ++++++
.../streams/common/topology/ChainPipeline.java | 1 +
.../common/topology/stages/JoinChainStage.java | 10 ++++--
.../streams/examples/source/MqttSourceExample.java | 42 ----------------------
.../streams/window/operator/join/JoinWindow.java | 17 ++++++++-
.../streams/window/shuffle/ShuffleChannel.java | 5 +++
7 files changed, 40 insertions(+), 46 deletions(-)
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index dcb7abbb..7586d3e6 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import
org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
+import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 0b105430..70d93106 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -99,6 +99,8 @@ public class MessageHeader {
protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table
name来标记
+ private String originTable;
+
protected String logFingerprintValue;//日志指纹的值
public MessageHeader copy() {
@@ -359,4 +361,12 @@ public class MessageHeader {
public void setPipelineName(String pipelineName) {
this.pipelineName = pipelineName;
}
+
+ public String getOriginTable() {
+ return originTable;
+ }
+
+ public void setOriginTable(String originTable) {
+ this.originTable = originTable;
+ }
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 96416044..87743696 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -290,6 +290,7 @@ public class ChainPipeline<T extends IMessage> extends
Pipeline<T> implements IA
//boolean needFlush = needFlush(msg);
if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) {
msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName);
+ msg.getHeader().setOriginTable(oriMsgPrewSourceName);
}
boolean isContinue = executeStage(stage, msg, copyContext);
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
index 9cb9df35..cf4b9e9f 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java
@@ -44,8 +44,14 @@ public class JoinChainStage<T extends IMessage> extends
AbstractWindowStage<T> {
@Override
protected IMessage doProcess(IMessage message, AbstractContext
context) {
String lable = message.getHeader().getMsgRouteFromLable();
+ String originTable = message.getHeader().getOriginTable();
+
String joinFlag = null;
if (lable != null) {
+ if ((lable.equals("left") || lable.equals("right")) &&
originTable != null) {
+ lable = originTable;
+ }
+
if (lable.equals(rightDependentTableName)) {
joinFlag = MessageHeader.JOIN_RIGHT;
} else {
@@ -61,9 +67,7 @@ public class JoinChainStage<T extends IMessage> extends
AbstractWindowStage<T> {
} else {
rightPipeline.doMessage(message, context);
}
- //if(!MessageGloableTrace.existFinshBranch(message)){
- // context.setBreak(true);
- //}
+
context.breakExecute();
return message;
}
diff --git
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
deleted file mode 100644
index 33b8b0a4..00000000
---
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.examples.source;
-
-import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.streams.client.StreamBuilder;
-import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.strategy.ShuffleStrategy;
-import org.apache.rocketmq.streams.client.transform.window.Time;
-import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
-
-public class MqttSourceExample {
-
- public static void main(String[] args) {
- DataStreamSource dataStream =
StreamBuilder.dataStream("test_namespace", "graph_pipeline");
- dataStream.fromMqtt("xxxxx", "xxxx", "xxxxxx", "", "")
- .flatMap(message -> ((JSONObject) message).getJSONArray("Data"))
- .window(TumblingWindow.of(Time.minutes(1)))
- .groupBy("AttributeCode")
- .setLocalStorageOnly(true)
- .avg("Value", "avg_value")
- .toDataStream()
- .toPrint()
- .with(ShuffleStrategy.shuffleWithMemory())
- .start();
- }
-
-}
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index b16002a3..eea5986a 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -102,7 +102,6 @@ public class JoinWindow extends AbstractShuffleWindow {
if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) {
storage.putWindowBaseValue(queueId, windowInstanceId,
WindowType.JOIN_WINDOW, WindowJoinType.left, temp);
-
} else if
(WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) {
storage.putWindowBaseValue(queueId, windowInstanceId,
WindowType.JOIN_WINDOW, WindowJoinType.right, temp);
} else {
@@ -490,6 +489,22 @@ public class JoinWindow extends AbstractShuffleWindow {
if (needFlush) {
nextMessage.getHeader().setNeedFlush(true);
}
+
+ String routeLabel = nextMessage.getHeader().getMsgRouteFromLable();
+ if (routeLabel == null) {
+ //嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel
+ String configureName = this.getConfigureName();
+ String[] tempList = configureName.split("_");
+ for (int i = tempList.length -1; i > 0; i--) {
+ if ("left".equalsIgnoreCase(tempList[i]) ||
"right".equalsIgnoreCase(tempList[i])) {
+ routeLabel = tempList[i];
+ System.out.println("nested join, routeLabel=" +
routeLabel);
+ break;
+ }
+ }
+ nextMessage.getHeader().setMsgRouteFromLable(routeLabel);
+ }
+
AbstractContext context = new Context(nextMessage);
boolean isWindowTest =
ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
if (isWindowTest) {
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index bad436fd..7b8175c4 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -229,6 +229,11 @@ public class ShuffleChannel extends AbstractSystemChannel {
for (String splitId : splitIds) {
this.loadResult.put(splitId, future);
}
+
+ if (message.getHeader().isSystemMessage() && window.getFireReceiver()
== null) {
+ return;
+ }
+
window.getFireReceiver().doMessage(message, context);
}