GaoService opened a new issue, #5210: URL: https://github.com/apache/eventmesh/issues/5210
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/eventmesh/issues?q=is%3Aissue) and found no similar issues. ### Environment Linux ### EventMesh version master ### What happened v1.11.0 eventMesh.storage.plugin.type插件为kafka,在EventMeshTCPClient<EventMeshMessage> eventMeshClient.close() 消费客户端关闭时,报kafka异常 ### How to reproduce @PostConstruct public void startListener() { try { UserAgent userAgent = MessageUtils.generateSubClient(EventMeshUtils.buildUserAgent(eventMeshConfig, Integer.parseInt(RandomUtil.randomNumbers(5)), eventMeshConfig.getClient().getConsumerGroup())); EventMeshTCPClientConfig config = EventMeshTCPClientConfig.builder() .host(eventMeshConfig.getServer().getHost()) .port(eventMeshConfig.getServer().getTcpPort()) .userAgent(userAgent) .build(); eventMeshClient = EventMeshTCPClientFactory.createEventMeshTCPClient(config, EventMeshMessage.class); eventMeshClient.init(); // 为每个主题单独订阅 List<String> topicList = Arrays.asList(eventMeshConfig.getClient().getTopics().split(",")); for (String topic : topicList) { String trimmedTopic = topic.trim(); if (!trimmedTopic.isEmpty()) { eventMeshClient.subscribe(trimmedTopic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC); log.info("Subscribed to topic: {}", trimmedTopic); } } eventMeshClient.registerSubBusiHandler(this::processMessage); eventMeshClient.listen(); } catch (Exception e) { log.error("Failed to start EventMesh TCP listener", e); } } @PreDestroy public void stopListener() { try { if (eventMeshClient != null) { try { // 再关闭客户端 eventMeshClient.close(); log.info("EventMesh TCP Listener stopped successfully"); } catch (Exception e) { log.warn("Error during close: {}", e.getMessage()); } } } catch (Exception e) { log.error("Error stopping EventMesh TCP listener", e); } } 关闭客户端时报异常 ### Debug logs ```Java 2025-10-11 17:15:52,628 ERROR [eventMesh-tcp-worker-2] ConsumerImpl(ConsumerImpl.java:126) - Error while unsubscribing the Kafka consumer: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: eventMesh-tcp-worker-2, id: 50) otherThread(id: 55) at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232) ~[kafka-clients-3.9.0.jar:?] at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213) ~[kafka-clients-3.9.0.jar:?] at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.unsubscribe(ClassicKafkaConsumer.java:544) ~[kafka-clients-3.9.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:764) ~[kafka-clients-3.9.0.jar:?] at org.apache.eventmesh.storage.kafka.consumer.ConsumerImpl.unsubscribe(ConsumerImpl.java:121) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release] at org.apache.eventmesh.storage.kafka.consumer.KafkaConsumerImpl.unsubscribe(KafkaConsumerImpl.java:81) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release] at org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper.unsubscribe(MQConsumerWrapper.java:50) [eventmesh-runtime-1.11.0-release.jar:1.11.0-release] at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.unsubscribe(ClientGroupWrapper.java:623) [eventmesh-runtime-1.11.0-release.jar:1.11.0-r elease] at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanSubscriptionInSession(ClientSessionGroupMapping.java:304) [eventmesh-runtim e-1.11.0-release.jar:1.11.0-release] at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanClientGroupWrapperByCloseSub(ClientSessionGroupMapping.java:281) [eventmesh -runtime-1.11.0-release.jar:1.11.0-release] at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:175) [eventmesh-runtime-1.11.0-relea se.jar:1.11.0-release] at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:144) [eventmesh-runtime-1.11.0-relea se.jar:1.11.0-release] at org.apache.eventmesh.runtime.boot.AbstractTCPServer$TcpConnectionHandler.channelInactive(AbstractTCPServer.java:436) [eventmesh-runtime-1.11.0-release.jar:1.11.0-rele ase] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) [netty-codec-4.1.112.Final.jar:4.1.112.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) [netty-codec-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:286) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.112.Final.jar:4.1.112.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.112.Final.jar:4.1.112.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.112.Final.jar:4.1.112.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) [netty-transport-4.1.112.Final.jar:4.1.112.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final] at java.base/java.lang.Thread.run(Thread.java:834) [?:?] ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) * -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
