jinrongluo opened a new issue #768: URL: https://github.com/apache/incubator-eventmesh/issues/768
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no similar issues. ### Environment Window ### EventMesh version master ### What happened In HTTP subscription, the changes in subscription will not trigger the `ConsumerGroupManager` to refresh. So the updates of the subscription is not taking effect. The `ConsumerGroupManager` still using the old subscription even it is already updated. ### How to reproduce Run test using the http subscriber sample in `eventmesh-examples -> Http.deom.sub` package, with the following changes: ``` public void afterPropertiesSet() throws Exception { .... eventMeshHttpConsumer.subscribe(topicList, url); // after 10s, update the subscription with new topic Thread.sleep(10000); List<SubscriptionItem> topicList = Lists.newArrayList( new SubscriptionItem("TEST2-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.SYNC)); eventMeshHttpConsumer.subscribe(topicList, url); .... } ``` After running this test, the logs shows no consumer is started for topic `TEST2-TOPIC-HTTP-ASYNC` Eventmesh Runtime Log ``` 2022-02-14 15:00:19,434 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=1,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=ASYNC}]}} 2022-02-14 15:00:19,589 INFO [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start..... 2022-02-14 15:00:19,608 INFO [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31 2022-02-14 15:00:19,767 INFO [eventMesh-clientManage-] ConsumerManager(ConsumerManager.java:322) - onChange event:consumerGroupStateEvent={consumerGroup=EventMeshTest-consumerGroup,action=NEW} 2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] MetaInfExtensionClassLoader(MetaInfExtensionClassLoader.java:79) - load extension class success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionClass: class org.apache.eventmesh.connector.standalone.consumer.StandaloneConsumerAdaptor 2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone 2022-02-14 15:00:19,771 INFO [eventMesh-clientManage-] EventMeshExtensionFactory(EventMeshExtensionFactory.java:101) - initialize extension instance success, extensionType: interface org.apache.eventmesh.api.consumer.Consumer, extensionName: standalone 2022-02-14 15:00:19,778 INFO [eventMesh-clientManage-] EventMeshConsumer(EventMeshConsumer.java:105) - EventMeshConsumer [EventMeshTest-consumerGroup] inited............. 2022-02-14 15:00:19,783 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:19,786 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95} 2022-02-14 15:00:19,786 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=1,cost=534,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@1679171f,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@47909e95} 2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:108) - ===========================================SERVER METRICS================================================== 2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:110) - {"maxHTTPTPS":"1.0","avgHTTPTPS":"0.0","maxHTTPCOST":"562","avgHTTPCOST":"549.5","avgHTTPBodyDecodeCost":"6.0", "httpDiscard":"0"} 2022-02-14 15:00:20,585 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:119) - {"maxBatchSendMsgTPS":"0.0","avgBatchSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "discard":"0"} 2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:129) - {"maxSendMsgTPS":"0.0","avgSendMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.00", "replyMsg":"0", "replyFail":"0"} 2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:140) - {"maxPushMsgTPS":"0.0","avgPushMsgTPS":"0.0", "sum":"0", "sumFail":"0", "sumFailRate":"0.0", "maxClientLatency":"0.0", "avgClientLatency":"0.0"} 2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:151) - {"batchMsgQ":"0","sendMsgQ":"0","pushMsgQ":"0","httpRetryQ":"0"} 2022-02-14 15:00:20,586 INFO [eventMesh-metrics-2] httpMonitor(HTTPMetricsServer.java:157) - {"batchAvgSend2MQCost":"0.0", "avgSend2MQCost":"0.0", "avgReply2MQCost":"0.0"} 2022-02-14 15:00:20,789 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:21,791 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:22,792 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:23,804 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:24,590 INFO [BatchSpanProcessor_WorkerThread-1] LogExporter(LogExporter.java:61) - 'HTTP POST' : 13edb08ae53aa8fd95ca799e9c93e690 63b98f38a3ad1255 SERVER [tracer: class org.apache.eventmesh.runtime.boot.EventMeshHTTPServer:] AttributesMap{data={http.status_code=206, http.flavor=HTTP, http.method=POST}, capacity=128, totalAddedValues=3} 2022-02-14 15:00:29,589 INFO [pool-11-thread-1] ConsumerManager(ConsumerManager.java:78) - clientInfo check start..... 2022-02-14 15:00:29,883 DEBUG [eventMesh-http-worker-1] http(AbstractHTTPServer.java:381) - httpCommand={REQ,POST/HTTP,requestCode=206,opaque=3,header=subscribeRequestHeader={code=206,language=JAVA,version=V1,env=P,idc=FT,sys=1234,pid=18328,ip=127.0.0.1:51386,username=,passwd=},body=subscribeBody{consumerGroup='EventMeshTest-consumerGroup', url='http://192.168.2.31:8088/sub/test', topics=[SubscriptionItem{topic=TEST2-TOPIC-HTTP-ASYNC, mode=CLUSTERING, type=SYNC}]}} 2022-02-14 15:00:29,884 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:30,094 INFO [eventMesh-clientManage-] http(SubscribeProcessor.java:78) - cmd=SUBSCRIBE|http|client2eventMesh|from=127.0.0.1:51386|to=192.168.2.31 2022-02-14 15:00:30,302 DEBUG [eventMesh-clientManage-] http(AbstractHTTPServer.java:429) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf} 2022-02-14 15:00:30,302 DEBUG [EventMesh-http-asyncContext-] http(SubscribeProcessor.java:257) - httpCommand={RES,POST/HTTP,requestCode=206,opaque=3,cost=420,header=org.apache.eventmesh.common.protocol.http.header.BaseResponseHeader@4206e247,body=org.apache.eventmesh.common.protocol.http.body.BaseResponseBody@46396edf} 2022-02-14 15:00:30,900 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:31,912 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:32,925 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null 2022-02-14 15:00:33,937 DEBUG [StandaloneConsumerThread] SubScribeTask(SubScribeTask.java:56) - execute subscribe task, topic: TEST-TOPIC-HTTP-ASYNC, offset: null ``` ### Debug logs _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
