This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 0742cf4 Joinwindow bug fix (#61)
0742cf4 is described below
commit 0742cf44bd5cf3903461f85fa0c9c4eef2d3c447
Author: YUDA <[email protected]>
AuthorDate: Wed Sep 15 11:37:25 2021 +0800
Joinwindow bug fix (#61)
* fix bugs
* fix joinwindow message remove bugs
---
pom.xml | 2 +-
.../streams/common/topology/ChainStage.java | 4 +-
.../common/topology/model/AbstractStage.java | 7 +-
.../impl/string/SubStringIndexFunction.java | 6 +
.../streams/window/model/WindowInstance.java | 4 +
.../streams/window/operator/impl/OverWindow.java | 2 +-
.../streams/window/operator/join/JoinWindow.java | 157 ++++++++++++++-------
.../streams/window/shuffle/ShuffleChannel.java | 3 +-
.../streams/window/state/impl/WindowValue.java | 2 +-
9 files changed, 129 insertions(+), 58 deletions(-)
diff --git a/pom.xml b/pom.xml
index ab8825d..7bbe1f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
<spring.version>3.2.13.RELEASE</spring.version>
<auto-service.version>1.0-rc5</auto-service.version>
<mysql-connector.version>5.1.40</mysql-connector.version>
- <fastjson.version>1.2.27</fastjson.version>
+ <fastjson.version>1.2.78</fastjson.version>
<quartz.version>2.2.1</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
<commons-io.version>2.5</commons-io.version>
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
index 1916291..d1b7ab6 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
@@ -152,7 +152,9 @@ public abstract class ChainStage<T extends IMessage>
extends AbstractStage<T> {
}
Set<ChainPipeline> set = new HashSet<>();
for (Pipeline pipeline : pipelines) {
- set.add((ChainPipeline)pipeline);
+ if (pipeline != null) {
+ set.add((ChainPipeline)pipeline);
+ }
}
sendSystem(message, context, set);
}
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
index 9860c51..1a2d8b1 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
@@ -84,7 +84,12 @@ public abstract class AbstractStage<T extends IMessage>
extends BasedConfigurabl
context.breakExecute();
return null;
}
- TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label,
t.getMessageBody().toJSONString());
+ try {
+
+ TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage",
label, t.getMessageBody().toJSONString());
+ } catch (Exception e) {
+ LOG.error("t.getMessageBody() parse error", e);
+ }
IStageHandle handle = selectHandle(t, context);
if (handle == null) {
return t;
diff --git
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
index 705a5fc..3d50f01 100644
---
a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
+++
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
@@ -85,6 +85,12 @@ public class SubStringIndexFunction {
@FunctionParamter(comment =
"指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer startIndex,
@FunctionParamter(comment =
"指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer endIndex) {
oriMsg = FunctionUtils.getValueString(message, context, oriMsg);
+ int msgLength = oriMsg.length();
+ if (startIndex >= msgLength) {
+ return "";
+ } else if (endIndex > msgLength) {
+ endIndex = msgLength;
+ }
return oriMsg.substring(startIndex, endIndex);
}
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index ebcc0fe..c575976 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@ -109,6 +109,10 @@ public class WindowInstance extends Entity implements
Serializable {
return MapKeyUtil.createKey(splitId, windowNameSpace, windowName,
windowInstanceName, startTime, endTime);
}
+ public String createWindowInstanceIdWithoutSplitid() {
+ return MapKeyUtil.createKey(windowNameSpace, windowName,
windowInstanceName, startTime, endTime);
+ }
+
public String createWindowInstanceTriggerId(){
return MapKeyUtil.createKey(splitId, windowNameSpace, windowName,
windowInstanceName, startTime, endTime,fireTime);
}
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
index b8b74f8..86fa99b 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
@@ -128,7 +128,7 @@ public class OverWindow extends AbstractWindow {
@Override
protected boolean initConfigurable() {
- return true;
+ return super.initConfigurable();
}
@Override
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index 6579647..920d525 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -40,6 +41,7 @@ import
org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.state.impl.JoinLeftState;
import org.apache.rocketmq.streams.window.state.impl.JoinRightState;
import org.apache.rocketmq.streams.window.state.impl.JoinState;
+import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
public class JoinWindow extends AbstractShuffleWindow {
@@ -73,6 +75,7 @@ public class JoinWindow extends AbstractShuffleWindow {
//
// }
+
@Override
protected int fireWindowInstance(WindowInstance instance, String
shuffleId, Map<String, String> queueId2Offsets) {
clearFire(instance);
@@ -81,7 +84,8 @@ public class JoinWindow extends AbstractShuffleWindow {
@Override
public void clearCache(String queueId) {
-
+
getStorage().clearCache(shuffleChannel.getChannelQueue(queueId),getWindowBaseValueClass());
+ ShufflePartitionManager.getInstance().clearSplit(queueId);
}
@Override
@@ -110,8 +114,7 @@ public class JoinWindow extends AbstractShuffleWindow {
}
for (IMessage msg : messages) {
- MessageHeader header = JSONObject.parseObject(msg.getMessageBody().
- getString(WindowCache.ORIGIN_MESSAGE_HEADER),
MessageHeader.class);
+ MessageHeader header = msg.getHeader();
String routeLabel = header.getMsgRouteFromLable();
// Map<String,WindowBaseValue> joinMessages = new
HashMap<>();
String storeKeyPrefix = "";
@@ -128,7 +131,11 @@ public class JoinWindow extends AbstractShuffleWindow {
List<WindowBaseValue> tmpMessages = new ArrayList<>();
int count = 0;
while (iterator.hasNext()) {
- tmpMessages.add(iterator.next());
+ WindowBaseValue windowBaseValue = iterator.next();
+ if (windowBaseValue == null) {
+ continue;
+ }
+ tmpMessages.add(windowBaseValue);
count++;
if (count == 100) {
sendMessage(msg, tmpMessages);
@@ -146,7 +153,9 @@ public class JoinWindow extends AbstractShuffleWindow {
List<WindowInstance> instances = new ArrayList<>();
for (Map.Entry<String, WindowInstance> entry :
this.windowInstanceMap.entrySet()) {
- instances.add(entry.getValue());
+ if (queueId.equalsIgnoreCase(entry.getValue().getSplitId())) {
+ instances.add(entry.getValue());
+ }
}
Iterator<WindowInstance> windowInstanceIter = instances.iterator();
return new Iterator<WindowBaseValue>() {
@@ -159,9 +168,9 @@ public class JoinWindow extends AbstractShuffleWindow {
if (iterator != null && iterator.hasNext()) {
return true;
}
- while (windowInstanceIter.hasNext()) {
+ if (windowInstanceIter.hasNext()) {
WindowInstance instance = windowInstanceIter.next();
- iterator = storage.loadWindowInstanceSplitData(null,
queueId,
+ iterator = storage.loadWindowInstanceSplitData(null, null,
instance.createWindowInstanceId(),
keyPrefix,
clazz);
@@ -217,17 +226,64 @@ public class JoinWindow extends AbstractShuffleWindow {
}
- public List<JSONObject> connectJoin(IMessage message, List<Map<String,
Object>> rows, String joinType, String rightAsName) {
+ public List<JSONObject> connectJoin(IMessage message, List<Map<String,
Object>> rows, String joinType,
+ String rightAsName) {
List<JSONObject> result = new ArrayList<>();
if (rows.size() <= 0) {
return result;
}
if ("inner".equalsIgnoreCase(joinType)) {
result = connectInnerJoin(message, rows, rightAsName);
+ } else if ("left".equalsIgnoreCase(joinType)) {
+ result = connectLeftJoin(message, rows, rightAsName);
+ }
+ return result;
+ }
+
+ private List<JSONObject> connectLeftJoin(IMessage message,
List<Map<String, Object>> rows, String rightAsName) {
+
+ List<JSONObject> result = new ArrayList<>();
+ String routeLabel = message.getHeader().getMsgRouteFromLable();
+ JSONObject messageBody = message.getMessageBody();
+ String traceId = message.getHeader().getTraceId();
+ int index = 1;
+ if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() > 0) {
+ for (Map<String, Object> raw : rows) {
+ // addAsName(raw, rightAsName);
+ JSONObject object = (JSONObject)messageBody.clone();
+ object.fluentPutAll(addAsName(raw, rightAsName));
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ index++;
+ result.add(object);
+ }
+ } else if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() <=
0) {
+ JSONObject object = (JSONObject) messageBody.clone();
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ result.add(object);
+ } else {
+ messageBody = addAsName(messageBody, rightAsName);
+ for (Map<String, Object> raw : rows) {
+ JSONObject object = (JSONObject)messageBody.clone();
+ object.fluentPutAll(raw);
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ index++;
+ result.add(object);
+ }
+ }
+
+
+
+ if (rows != null && rows.size() > 0) {
+ for (Map<String,Object> raw : rows) {
+ JSONObject object = (JSONObject) messageBody.clone();
+ object.fluentPutAll(raw);
+ result.add(object);
+ }
+ return result;
+ }
+ if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
+ result.add(messageBody);
}
- // else if ("left".equalsIgnoreCase(joinType)) {
- // result = connectLeftJoin(message, rows, rightAsName);
- // }
return result;
}
@@ -241,12 +297,16 @@ public class JoinWindow extends AbstractShuffleWindow {
public List<JSONObject> connectInnerJoin(IMessage message,
List<Map<String, Object>> rows, String rightAsName) {
List<JSONObject> result = new ArrayList<>();
String routeLabel = message.getHeader().getMsgRouteFromLable();
+ String traceId = message.getHeader().getTraceId();
+ int index = 1;
if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
JSONObject messageBody = message.getMessageBody();
for (Map<String, Object> raw : rows) {
// addAsName(raw, rightAsName);
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(addAsName(raw, rightAsName));
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ index++;
result.add(object);
}
} else {
@@ -255,6 +315,8 @@ public class JoinWindow extends AbstractShuffleWindow {
for (Map<String, Object> raw : rows) {
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(raw);
+ object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+ index++;
result.add(object);
}
}
@@ -284,14 +346,9 @@ public class JoinWindow extends AbstractShuffleWindow {
*/
protected String createStoreKey(IMessage message, String routeLabel,
WindowInstance windowInstance) {
String shuffleKey =
message.getMessageBody().getString(WindowCache.SHUFFLE_KEY);
- String shuffleId =
shuffleChannel.getChannelQueue(shuffleKey).getQueueId();
String orginQueueId =
message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
String originOffset =
message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
- String windowNamespace = getNameSpace();
- String windowName = getConfigureName();
- String startTime = windowInstance.getStartTime();
- String endTime = windowInstance.getEndTime();
- String storeKey = MapKeyUtil.createKey(shuffleId, windowNamespace,
windowName, startTime, endTime, shuffleKey, routeLabel, orginQueueId,
originOffset);
+ String storeKey =
MapKeyUtil.createKey(windowInstance.createWindowInstanceId(), shuffleKey,
routeLabel, orginQueueId, originOffset);
return storeKey;
}
@@ -327,6 +384,8 @@ public class JoinWindow extends AbstractShuffleWindow {
JSONObject messageBody = (JSONObject)message.getMessageBody().clone();
messageBody.remove("WindowInstance");
messageBody.remove("AbstractWindow");
+ messageBody.remove(WindowCache.ORIGIN_MESSAGE_HEADER);
+ messageBody.remove("MessageHeader");
JoinState state = null;
if ("left".equalsIgnoreCase(routeLabel)) {
@@ -396,47 +455,41 @@ public class JoinWindow extends AbstractShuffleWindow {
return JoinState.class;
}
- // @Override
- // public void finishWindowProcessAndSend2Receiver(List<IMessage>
messageList,WindowInstance windowInstance) {
- // for (IMessage message : messageList) {
- // List<Map<String, Object>> result =
joinOperator.dealJoin(message);
- // List<Map<String,Object>> rows =
matchRows(message.getMessageBody(), result);
- // String rightAsName =
message.getMessageBody().getString("rightAsName");
- // String joinType =
message.getMessageBody().getString("joinType");
- // List<JSONObject> connectMsgs =
joinOperator.connectJoin(message, rows, joinType, rightAsName);
- // for (int i=0; i < connectMsgs.size(); i++) {
- // if (i == connectMsgs.size() -1) {
- // sendMessage(connectMsgs.get(i), true);
- // } else {
- // sendMessage(connectMsgs.get(i), false);
- // }
- //
- // }
- //
- // }
- // //todo 完成处理
- // //todo 发送消息到下一个节点 sendFireMessage();
- // }
/**
* window触发后的清理工作
- * @param windowInstances
- */
- /**
- * 删除掉触发过的数据
- *
- * @param instance
+ * @param windowInstance
*/
@Override
- public void clearFireWindowInstance(WindowInstance instance) {
- if(instance==null){
- return;
+ public void clearFireWindowInstance(WindowInstance windowInstance) {
+// String partitionNum=(getOrderBypPrefix()+
windowInstance.getSplitId());
+
+ List<WindowInstance> removeInstances = new ArrayList<>();
+
+ Date clearTime =
DateUtil.addSecond(DateUtil.parse(windowInstance.getStartTime()), -sizeInterval
* (retainWindowCount - 1) * 60);
+ Iterator<String> iterable = this.windowInstanceMap.keySet().iterator();
+ while (iterable.hasNext()) {
+ WindowInstance instance =
this.windowInstanceMap.get(iterable.next());
+ Date startTime = DateUtil.parse(instance.getStartTime());
+ if (DateUtil.dateDiff(clearTime, startTime) >= 0) {
+ removeInstances.add(instance);
+ iterable.remove();
+ }
}
- WindowInstance.clearInstance(instance);
- joinOperator.cleanMessage(instance.getWindowNameSpace(),
instance.getWindowName(), this.getRetainWindowCount(),
- this.getSizeInterval(), instance.getStartTime());
- //todo windowinstace
- //todo left+right
+
+ for (WindowInstance instance : removeInstances) {
+
+ windowMaxValueManager.deleteSplitNum(instance,
instance.getSplitId());
+
ShufflePartitionManager.getInstance().clearWindowInstance(instance.createWindowInstanceId());
+
storage.delete(instance.createWindowInstanceId(),null,WindowBaseValue.class,sqlCache);
+ if(!isLocalStorageOnly){
+ WindowInstance.clearInstance(instance,sqlCache);
+ joinOperator.cleanMessage(instance.getWindowNameSpace(),
instance.getWindowName(), this.getRetainWindowCount(),
+ this.getSizeInterval(), windowInstance.getStartTime());
+ }
+ }
+
+
}
protected List<Map<String, Object>> matchRows(JSONObject msg,
List<Map<String, Object>> rows) {
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index cb6cb4e..ba17a44 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -67,7 +67,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
protected static final Log LOG = LogFactory.getLog(ShuffleChannel.class);
protected static final String SHUFFLE_QUEUE_ID = "SHUFFLE_QUEUE_ID";
-
+ protected static final String SHUFFLE_OFFSET = "SHUFFLE_OFFSET";
protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
protected String MSG_OWNER = "MSG_OWNER";//消息所属的window
@@ -158,6 +158,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
for (Object obj : messages) {
IMessage message = new Message((JSONObject) obj);
message.getHeader().setQueueId(queueId);
+ message.getMessageBody().put(SHUFFLE_OFFSET,
oriMessage.getHeader().getOffset());
window.updateMaxEventTime(message);
if (isRepeateMessage(message, queueId)) {
continue;
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index b4ae904..e04bd16 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -264,7 +264,7 @@ public class WindowValue extends WindowBaseValue implements
Serializable {
calProjectColumn(window, message);
String traceId =
message.getMessageBody().getString(WindowCache.ORIGIN_MESSAGE_TRACE_ID);
if (!StringUtil.isEmpty(traceId)) {
- TraceUtil.debug(traceId, "window value result",
getComputedColumnResult());
+ TraceUtil.debug(traceId, "window value result",
decodeSQLContent(getComputedColumnResult()));
}
} catch (Exception e) {
LOG.error("failed in calculating the message", e);