This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0c51893e fix(bug) npe when process data
     new 4bd58c82 Merge pull request #302 from ni-ze/develop
0c51893e is described below

commit 0c51893e1d79276876e9021999e2c7af73e5e245
Author: karp <[email protected]>
AuthorDate: Sat Jul 1 19:02:47 2023 +0800

    fix(bug) npe when process data
---
 .../core/running/MessageQueueListenerWrapper.java       | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
index a9db7115..fbac1fef 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java
@@ -16,7 +16,6 @@ package org.apache.rocketmq.streams.core.running;
  * limitations under the License.
  */
 
-import com.google.common.collect.Sets;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.streams.core.common.Constant;
@@ -51,16 +50,15 @@ class MessageQueueListenerWrapper implements 
MessageQueueListener {
     public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
         Set<MessageQueue> ownedQueues = ownedMapping.computeIfAbsent(topic, s 
-> new HashSet<>());
 
-        Set<MessageQueue> unchangedQueue = Sets.intersection(mqDivided, 
ownedQueues);
-        Set<MessageQueue> addQueue = Sets.difference(mqDivided, 
unchangedQueue);
-        Set<MessageQueue> removeQueue = Sets.difference(ownedQueues, 
unchangedQueue);
+        HashSet<MessageQueue> addQueue = new HashSet<>(mqDivided);
+        addQueue.removeAll(ownedQueues);
+
+        HashSet<MessageQueue> removeQueue = new HashSet<>(ownedQueues);
+        removeQueue.removeAll(mqDivided);
 
         ownedQueues.addAll(new HashSet<>(addQueue));
         ownedQueues.removeAll(new HashSet<>(removeQueue));
 
-        // First step, remove the removeQueue from listener to avoid inflight 
data in between setting up the state.
-        originListener.messageQueueChanged(topic, mqAll, unchangedQueue);
-
         //从shuffle topic中读出的数据才能进行有状态计算。
         if (topic.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
             Throwable throwable = this.recoverHandler.apply(addQueue, 
removeQueue);
@@ -71,9 +69,10 @@ class MessageQueueListenerWrapper implements 
MessageQueueListener {
         }
 
         buildTask(addQueue);
-        removeTask(removeQueue);
-        // Last step, add the addQueue to the listener after the state setup.
+        
//设计的不太好,移除q,添加消费任务之前,应该加一个状态移除函数;目前这样写的问题是:状态提前移除/加载了,consumer其实仍然在从某个将要移除的q中拉取数据,但是状态却被移除了。
+        
//也不能把originListener.messageQueueChanged放在loadState/removeState之前,那样会已经在拉取数据了,但是状态没有加载好。
         originListener.messageQueueChanged(topic, mqAll, mqDivided);
+        removeTask(removeQueue);
     }
 
 

Reply via email to