This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/develop by this push:
new 0c51893e fix(bug) npe when process data
new 4bd58c82 Merge pull request #302 from ni-ze/develop
0c51893e is described below
commit 0c51893e1d79276876e9021999e2c7af73e5e245
Author: karp <[email protected]>
AuthorDate: Sat Jul 1 19:02:47 2023 +0800
fix(bug) npe when process data
---
.../core/running/MessageQueueListenerWrapper.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
index a9db7115..fbac1fef 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
@@ -16,7 +16,6 @@ package org.apache.rocketmq.streams.core.running;
* limitations under the License.
*/
-import com.google.common.collect.Sets;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
@@ -51,16 +50,15 @@ class MessageQueueListenerWrapper implements
MessageQueueListener {
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
Set<MessageQueue> ownedQueues = ownedMapping.computeIfAbsent(topic, s
-> new HashSet<>());
- Set<MessageQueue> unchangedQueue = Sets.intersection(mqDivided,
ownedQueues);
- Set<MessageQueue> addQueue = Sets.difference(mqDivided,
unchangedQueue);
- Set<MessageQueue> removeQueue = Sets.difference(ownedQueues,
unchangedQueue);
+ HashSet<MessageQueue> addQueue = new HashSet<>(mqDivided);
+ addQueue.removeAll(ownedQueues);
+
+ HashSet<MessageQueue> removeQueue = new HashSet<>(ownedQueues);
+ removeQueue.removeAll(mqDivided);
ownedQueues.addAll(new HashSet<>(addQueue));
ownedQueues.removeAll(new HashSet<>(removeQueue));
- // First step, remove the removeQueue from listener to avoid inflight
data in between setting up the state.
- originListener.messageQueueChanged(topic, mqAll, unchangedQueue);
-
//从shuffle topic中读出的数据才能进行有状态计算。
if (topic.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
Throwable throwable = this.recoverHandler.apply(addQueue,
removeQueue);
@@ -71,9 +69,10 @@ class MessageQueueListenerWrapper implements
MessageQueueListener {
}
buildTask(addQueue);
- removeTask(removeQueue);
- // Last step, add the addQueue to the listener after the state setup.
+
//设计的不太好,移除q,添加消费任务之前,应该加一个状态移除函数;目前这样写的问题是:状态提前移除/加载了,consumer其实仍然在从某个将要移除的q中拉取数据,但是状态却被移除了。
+
//也不能把originListener.messageQueueChanged放在loadState/removeState之前,那样会已经在拉取数据了,但是状态没有加载好。
originListener.messageQueueChanged(topic, mqAll, mqDivided);
+ removeTask(removeQueue);
}