GaoService opened a new issue, #5210:
URL: https://github.com/apache/eventmesh/issues/5210

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Environment
   
   Linux
   
   ### EventMesh version
   
   master
   
   ### What happened
   
   v1.11.0 
eventMesh.storage.plugin.type插件为kafka,在EventMeshTCPClient<EventMeshMessage>  
eventMeshClient.close() 消费客户端关闭时,报kafka异常
   
   ### How to reproduce
   
   @PostConstruct
       public void startListener() {
           try {
               UserAgent userAgent = 
MessageUtils.generateSubClient(EventMeshUtils.buildUserAgent(eventMeshConfig, 
Integer.parseInt(RandomUtil.randomNumbers(5)), 
eventMeshConfig.getClient().getConsumerGroup()));
               EventMeshTCPClientConfig config = 
EventMeshTCPClientConfig.builder()
                       .host(eventMeshConfig.getServer().getHost())
                       .port(eventMeshConfig.getServer().getTcpPort())
                       .userAgent(userAgent)
                       .build();
               eventMeshClient = 
EventMeshTCPClientFactory.createEventMeshTCPClient(config, 
EventMeshMessage.class);
               eventMeshClient.init();
               // 为每个主题单独订阅
               List<String> topicList = 
Arrays.asList(eventMeshConfig.getClient().getTopics().split(","));
               for (String topic : topicList) {
                   String trimmedTopic = topic.trim();
                   if (!trimmedTopic.isEmpty()) {
                       eventMeshClient.subscribe(trimmedTopic, 
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
                       log.info("Subscribed to topic: {}", trimmedTopic);
                   }
               }
               eventMeshClient.registerSubBusiHandler(this::processMessage);
               eventMeshClient.listen();
           } catch (Exception e) {
               log.error("Failed to start EventMesh TCP listener", e);
           }
       }
   
    @PreDestroy
       public void stopListener() {
           try {
               if (eventMeshClient != null) {
                   try {
                       // 再关闭客户端
                       eventMeshClient.close();
                       log.info("EventMesh TCP Listener stopped successfully");
                   } catch (Exception e) {
                       log.warn("Error during close: {}", e.getMessage());
                   }
               }
           } catch (Exception e) {
               log.error("Error stopping EventMesh TCP listener", e);
           }
       }
   
   关闭客户端时报异常
   
   ### Debug logs
   
   ```Java
   2025-10-11 17:15:52,628 ERROR [eventMesh-tcp-worker-2] 
ConsumerImpl(ConsumerImpl.java:126) - Error while unsubscribing the Kafka 
consumer:
   java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access. currentThread(name: eventMesh-tcp-worker-2, id: 50) 
otherThread(id: 55)
           at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232)
 ~[kafka-clients-3.9.0.jar:?]
           at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213)
 ~[kafka-clients-3.9.0.jar:?]
           at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.unsubscribe(ClassicKafkaConsumer.java:544)
 ~[kafka-clients-3.9.0.jar:?]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:764)
 ~[kafka-clients-3.9.0.jar:?]
           at 
org.apache.eventmesh.storage.kafka.consumer.ConsumerImpl.unsubscribe(ConsumerImpl.java:121)
 [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
           at 
org.apache.eventmesh.storage.kafka.consumer.KafkaConsumerImpl.unsubscribe(KafkaConsumerImpl.java:81)
 [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper.unsubscribe(MQConsumerWrapper.java:50)
 [eventmesh-runtime-1.11.0-release.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.unsubscribe(ClientGroupWrapper.java:623)
 [eventmesh-runtime-1.11.0-release.jar:1.11.0-r
   elease]
           at 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanSubscriptionInSession(ClientSessionGroupMapping.java:304)
 [eventmesh-runtim
   e-1.11.0-release.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanClientGroupWrapperByCloseSub(ClientSessionGroupMapping.java:281)
 [eventmesh
   -runtime-1.11.0-release.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:175)
 [eventmesh-runtime-1.11.0-relea
   se.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:144)
 [eventmesh-runtime-1.11.0-relea
   se.jar:1.11.0-release]
           at 
org.apache.eventmesh.runtime.boot.AbstractTCPServer$TcpConnectionHandler.channelInactive(AbstractTCPServer.java:436)
 [eventmesh-runtime-1.11.0-release.jar:1.11.0-rele
   ase]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
 [netty-codec-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
 [netty-codec-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:286)
 [netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
 [netty-common-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
 [netty-common-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
 [netty-common-4.1.112.Final.jar:4.1.112.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) 
[netty-transport-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
 [netty-common-4.1.112.Final.jar:4.1.112.Final]
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[netty-common-4.1.112.Final.jar:4.1.112.Final]
           at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) *


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to