jiangzzwy opened a new issue, #4597: URL: https://github.com/apache/incubator-seatunnel/issues/4597
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 当任务的并行度大于2时,使用Kafka作为sink端,后台在启动服务的时候就会报错,但是这个错误不影响后续的运行,但是对于使用者而言,会带来困扰。 ### SeaTunnel Version 2.3.1 ### SeaTunnel Config ```conf env { execution.parallelism = 3 job.mode = "STREAMING" checkpoint.interval = 2000 #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } source { Fmq { parallelism = 2 format = "JSON" address = "xxxx" app = "xxx" password = "****" topic = "xxx" batch_size = 10 batch_window = 1000 wait_interval = 1000 } } sink { Kafka { topic = "t_count" bootstrap.servers = "localhost:9092" } } ``` ### Running Command ```shell % ./bin/seatunnel.sh --config ./config/fmq_kafka2.template -e local ``` ### Error Exception ```log 2023-04-17 20:46:13,136 ERROR org.apache.kafka.common.metrics.Metrics - Error when registering metric on org.apache.kafka.common.metrics.JmxReporter org.apache.kafka.common.KafkaException: Error registering mbean kafka.producer:type=kafka-metrics-count,client-id=producer-1 at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:229) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:144) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:573) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:512) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:495) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:480) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:180) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:134) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:357) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:43) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:104) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:102) ~[connector-kafka-2.3.1.jar:2.3.1] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:246) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:383) ~[seatunnel-starter.jar:2.3.1] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_311] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_311] at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_311] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_311] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_311] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_311] at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_311] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_311] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_311] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_311] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:380) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:96) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:85) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[seatunnel-starter.jar:2.3.1] at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:51) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:272) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.restoreTaskState(CheckpointCoordinator.java:249) ~[seatunnel-starter.jar:2.3.1] at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$reportedTask$1(CheckpointCoordinator.java:189) ~[seatunnel-starter.jar:2.3.1] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) [?:1.8.0_311] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311] Caused by: javax.management.InstanceAlreadyExistsException: kafka.producer:type=kafka-metrics-count,client-id=producer-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_311] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_311] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_311] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_311] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_311] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_311] at org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:227) ~[connector-kafka-2.3.1.jar:2.3.1] ... 49 more ``` ### Flink or Spark Version _No response_ ### Java or Scala Version java ### Screenshots  ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
