This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 8e093c0 bugfix list - adjust test code in WindowTrigger in case the
window which size unit is minute - adjust register logic in session window
operator - fix issue of window value can't be delete after fired
new 23bb048 Merge pull request #125 from speak2me/bugfix_20220113
8e093c0 is described below
commit 8e093c0d3a0ac44ab8d5f04b78e7b26d2c6719f1
Author: write2me <[email protected]>
AuthorDate: Thu Jan 13 21:12:29 2022 +0800
bugfix list
- adjust test code in WindowTrigger in case the window which size unit is
minute
- adjust register logic in session window operator
- fix issue of window value can't be delete after fired
---
.../rocketmq/streams/common/utils/SQLUtil.java | 2 +-
.../window/offset/WindowMaxValueProcessor.java | 2 +-
.../window/operator/impl/SessionOperator.java | 33 ++++++++--------------
.../streams/window/trigger/WindowTrigger.java | 10 ++++---
4 files changed, 20 insertions(+), 27 deletions(-)
diff --git
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
index b1676b9..7bce3cb 100644
---
a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
+++
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
@@ -577,7 +577,7 @@ public class SQLUtil {
buffer.append(" ");
for (int index = 0; index < keywordList.size(); index++) {
Pair<String, String> pair = keywordList.get(index);
- buffer.append(pair.getKey() + " like '" + pair.getValue() + "'");
+ buffer.append(pair.getKey() + " like '" + pair.getValue() + "%'");
if (index != (keywordList.size() - 1)) {
buffer.append(" or ");
}
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index 4aa86ae..cf09bf4 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -127,7 +127,7 @@ public class WindowMaxValueProcessor {
}
String keyPrefix = MapKeyUtil.createKey(name, splitId);
- String sql = "select * from " +
ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" +
name + "%' and partition like '%" + splitId + "%'";
+ String sql="select * from "+
ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like
'"+keyPrefix+"%'";
List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null,
WindowMaxValue.class);
if (windowMaxValues == null || windowMaxValues.size() == 0) {
return result;
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 7b27e12..e9aa617 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -386,13 +386,6 @@ public class SessionOperator extends WindowOperator {
//get iterator sorted by fire time
WindowBaseValueIterator<WindowValue> it =
storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId,
windowInstance.createWindowInstanceId(), null, getWindowBaseValueClass());
//
- if (queueId2Offset != null) {
- String offset = queueId2Offset.get(queueId);
- if (StringUtil.isNotEmpty(offset)) {
- it.setPartitionNum(Long.valueOf(offset));
- }
- }
- //
Long currentFireTime =
DateUtil.parse(windowInstance.getFireTime(),
SESSION_DATETIME_PATTERN).getTime();
Long nextFireTime = currentFireTime + 1000 * 60 * 1;
List<WindowValue> toFireValueList = new ArrayList<>();
@@ -412,8 +405,18 @@ public class SessionOperator extends WindowOperator {
}
}
}
- doFire(queueId, windowInstance, toFireValueList, currentFireTime,
nextFireTime);
+ doFire(queueId, windowInstance, toFireValueList);
//
+ if (!nextFireTime.equals(currentFireTime)) {
+ String instanceId = windowInstance.createWindowInstanceId();
+ WindowInstance existedWindowInstance =
searchWindowInstance(instanceId);
+ if (existedWindowInstance != null) {
+ existedWindowInstance.setFireTime(DateUtil.format(new
Date(nextFireTime)));
+
windowFireSource.registFireWindowInstanceIfNotExist(windowInstance, this);
+ } else {
+ LOG.error("window instance lost, queueId: " + queueId + "
,fire time" + windowInstance.getFireTime());
+ }
+ }
return toFireValueList.size();
}
@@ -432,25 +435,13 @@ public class SessionOperator extends WindowOperator {
return false;
}
- private void doFire(String queueId, WindowInstance instance,
List<WindowValue> valueList, Long currentFireTime,
- Long nextFireTime) {
+ private void doFire(String queueId, WindowInstance instance,
List<WindowValue> valueList) {
if (CollectionUtil.isEmpty(valueList)) {
return;
}
valueList.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum));
sendFireMessage(valueList, queueId);
clearWindowValues(valueList, queueId, instance);
- //
- if (!nextFireTime.equals(currentFireTime)) {
- String instanceId = instance.createWindowInstanceId();
- WindowInstance existedWindowInstance =
searchWindowInstance(instanceId);
- if (existedWindowInstance != null) {
- existedWindowInstance.setFireTime(DateUtil.format(new
Date(nextFireTime)));
- windowFireSource.registFireWindowInstanceIfNotExist(instance,
this);
- } else {
- LOG.error("window instance lost, queueId: " + queueId + "
,fire time" + instance.getFireTime());
- }
- }
}
/**
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
index 3b9410d..a872f3a 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
@@ -248,10 +248,12 @@ public class WindowTrigger extends
AbstractSupportShuffleSource implements IStre
if (eventTimeLastUpdateTime == null) {
return new FireResult();
}
- int gap = (int) (System.currentTimeMillis() - eventTimeLastUpdateTime);
- if (window.getMsgMaxGapSecond() != null && gap >
window.getMsgMaxGapSecond() * 1000) {
- LOG.warn("the fire reason is exceed the gap " + gap + " window
instance id is " + windowInstanceTriggerId);
- return new FireResult(true, 1);
+ if (isTest) {
+ int gap = (int) (System.currentTimeMillis() -
eventTimeLastUpdateTime);
+ if (window.getMsgMaxGapSecond() != null && gap >
window.getMsgMaxGapSecond() * 1000) {
+ LOG.warn("the fire reason is exceed the gap " + gap + " window
instance id is " + windowInstanceTriggerId);
+ return new FireResult(true, 1);
+ }
}
return new FireResult();
}