longbowen commented on issue #287:
URL: 
https://github.com/apache/rocketmq-streams/issues/287#issuecomment-1537877668

   sorry,最近有点忙。一直没有时间处理这个问题。
   
   服务B与服务A所在分组不一样,所以服务B启动时是会消费message1-10的呀。因为message1-10没有被服务B所在分组消费过。
   我再次查看了你的示例,应该调整如下:
   1、启动RocketMQ 5.0
   2、向source topic中写入数据;此次数据称为消息A
   3、运行wordcount例子;
   此时wordcount就会输出消息A相关的数据,他消费的是步骤2中写入mq中的数据(消息A)。
   
   4、向source topic中写入数据;此次数据称为消息B(实际测试不需要此步骤)
   
   我个人理解是因为在消息一直没有被服务消费,所以一直停留在mq中,
   所以在rocketmq-streams启动时,会被rocketmq-stream处理,不知道这个是不是正常的。
   在我的应用中,我希望消息A不会被rocketmq-streams处理,而rocketmq-streams只会处理在其启动之后发送到rocketmq的消息。
   我目前的解决方案是在发送mq消息时增加了sengTime字段,在rocketmq-streams处理时使用filter过滤掉这些数据。
   
   以下是步骤3,wordcount启动后输出信息
   2023-05-08 15:11:48.653 [ROCKETMQ_STREAMS_testCount_0] INFO  
org.apache.rocketmq.streams.core.util.RocketMQUtil - 
topic[testCount-ROCKETMQ-COUNT-00004-shuffleTopic] already exist.
   2023-05-08 15:11:49.136 [ROCKETMQ_STREAMS_testCount_0] INFO  
o.a.rocketmq.streams.core.running.WorkerThread - worker 
thread=[ROCKETMQ_STREAMS_testCount_0], start task success, jobId:testCount
   2023-05-08 15:11:50.710 [RebalanceService] INFO  
org.apache.rocketmq.streams.core.util.RocketMQUtil - 
topic[testCount-ROCKETMQ-COUNT-00004-shuffleTopic-stateTopic] already exist.
   2023-05-08 15:11:51.834 [RebalanceService] INFO  
o.a.r.s.core.running.MessageQueueListenerWrapper - recover messageQueue finish, 
addQueue: [[MessageQueue [topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=0], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=3], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=4], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=1], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=2], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=7], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=5], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=6]
 ]], removeQueue:[[]].
   (key=timestamp, value=1)
   (key=sendtime, value=1)
   (key=messageid, value=1)
   (key=true, value=1)
   (key=1231, value=4)
   (key=deviceid, value=4)
   (key=1683529142772, value=4)
   (key=deviceuuid, value=4)
   (key=rn010eab3c2e6817, value=4)
   (key=849024bed3bb44d, value=4)
   (key=productid, value=4)
   (key=msgdata, value=4)
   (key=switch, value=4)
   (key=, value=4)
   (key=1683529142758, value=4)
   (key=xxxxx, value=4)
   (key=1650393962322264064, value=4)
   (key=tejgjacue4f3cb, value=4)
   (key=productname, value=4)
   (key=tenantid, value=4)
   (key=devicename, value=4)
   2023-05-08 15:11:52.909 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:9876] result: 
true
   2023-05-08 15:11:52.910 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   
   
   
   运行步骤4后全部输出信息
   D:\tool\Java\jdk1.8.0_291\bin\java.exe "-javaagent:D:\tool\IntelliJ IDEA 
2022.2.3\lib\idea_rt.jar=60568:D:\tool\IntelliJ IDEA 2022.2.3\bin" 
-Dfile.encoding=UTF-8 -classpath 
C:\Users\77135\AppData\Local\Temp\classpath1419387445.jar 
com.rinoiot.provider.rule.engine.rocketmq.stream.DpStreamRule
   2023-05-08 15:11:48.653 [ROCKETMQ_STREAMS_testCount_0] INFO  
org.apache.rocketmq.streams.core.util.RocketMQUtil - 
topic[testCount-ROCKETMQ-COUNT-00004-shuffleTopic] already exist.
   2023-05-08 15:11:49.136 [ROCKETMQ_STREAMS_testCount_0] INFO  
o.a.rocketmq.streams.core.running.WorkerThread - worker 
thread=[ROCKETMQ_STREAMS_testCount_0], start task success, jobId:testCount
   2023-05-08 15:11:50.710 [RebalanceService] INFO  
org.apache.rocketmq.streams.core.util.RocketMQUtil - 
topic[testCount-ROCKETMQ-COUNT-00004-shuffleTopic-stateTopic] already exist.
   2023-05-08 15:11:51.834 [RebalanceService] INFO  
o.a.r.s.core.running.MessageQueueListenerWrapper - recover messageQueue finish, 
addQueue: [[MessageQueue [topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=0], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=3], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=4], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=1], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=2], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=7], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=5], MessageQueue 
[topic=testCount-ROCKETMQ-COUNT-00004-shuffleTopic, 
brokerName=__syslo__global__, queueId=6]
 ]], removeQueue:[[]].
   (key=timestamp, value=1)
   (key=sendtime, value=1)
   (key=messageid, value=1)
   (key=true, value=1)
   (key=1231, value=4)
   (key=deviceid, value=4)
   (key=1683529142772, value=4)
   (key=deviceuuid, value=4)
   (key=rn010eab3c2e6817, value=4)
   (key=849024bed3bb44d, value=4)
   (key=productid, value=4)
   (key=msgdata, value=4)
   (key=switch, value=4)
   (key=, value=4)
   (key=1683529142758, value=4)
   (key=xxxxx, value=4)
   (key=1650393962322264064, value=4)
   (key=tejgjacue4f3cb, value=4)
   (key=productname, value=4)
   (key=tenantid, value=4)
   (key=devicename, value=4)
   2023-05-08 15:11:52.909 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:9876] result: 
true
   2023-05-08 15:11:52.910 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   2023-05-08 15:11:52.911 [NettyClientSelector_1] INFO  RocketmqRemoting - 
closeChannel: close the connection to remote address[43.142.45.99:10911] 
result: true
   (key=, value=5)
   (key=timestamp, value=2)
   (key=1683529142758, value=5)
   (key=sendtime, value=2)
   (key=1683529142772, value=5)
   (key=tenantid, value=5)
   (key=1231, value=5)
   (key=deviceid, value=5)
   (key=1650393962322264064, value=5)
   (key=deviceuuid, value=5)
   (key=rn010eab3c2e6817, value=5)
   (key=devicename, value=5)
   (key=xxxxx, value=5)
   (key=productid, value=5)
   (key=tejgjacue4f3cb, value=5)
   (key=messageid, value=2)
   (key=msgdata, value=5)
   (key=switch, value=5)
   (key=productname, value=5)
   (key=true, value=2)
   (key=849024bed3bb44d, value=5)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to