jiangzzwy opened a new issue, #4597:
URL: https://github.com/apache/incubator-seatunnel/issues/4597

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   当任务的并行度大于2时,使用Kafka作为sink端,后台在启动服务的时候就会报错,但是这个错误不影响后续的运行,但是对于使用者而言,会带来困扰。
   
   ### SeaTunnel Version
   
   2.3.1
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 3
     job.mode = "STREAMING"
     checkpoint.interval = 2000
     #execution.checkpoint.interval = 10000
     #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
   }
   
   source {
      Fmq {
       parallelism = 2
       format = "JSON"
       address = "xxxx"
       app = "xxx"
       password = "****"
       topic = "xxx"
       batch_size = 10
       batch_window = 1000
       wait_interval = 1000
     }
   }
   
   sink {
   
       Kafka {
         topic = "t_count"
         bootstrap.servers = "localhost:9092"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   % ./bin/seatunnel.sh --config ./config/fmq_kafka2.template -e local
   ```
   
   
   ### Error Exception
   
   ```log
   2023-04-17 20:46:13,136 ERROR org.apache.kafka.common.metrics.Metrics - 
Error when registering metric on org.apache.kafka.common.metrics.JmxReporter
   org.apache.kafka.common.KafkaException: Error registering mbean 
kafka.producer:type=kafka-metrics-count,client-id=producer-1
        at 
org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:229) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:144) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:573) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:512) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:495) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:480) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:180) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at org.apache.kafka.common.metrics.Metrics.<init>(Metrics.java:134) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:357) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) 
~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:43)
 ~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:104)
 ~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:102)
 ~[connector-kafka-2.3.1.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:246)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:383)
 ~[seatunnel-starter.jar:2.3.1]
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
~[?:1.8.0_311]
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
~[?:1.8.0_311]
        at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) 
~[?:1.8.0_311]
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
~[?:1.8.0_311]
        at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
~[?:1.8.0_311]
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
~[?:1.8.0_311]
        at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
~[?:1.8.0_311]
        at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
 ~[?:1.8.0_311]
        at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
~[?:1.8.0_311]
        at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) 
~[?:1.8.0_311]
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:380)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:96)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:85)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
 ~[seatunnel-starter.jar:2.3.1]
        at 
com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:51)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:272)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.restoreTaskState(CheckpointCoordinator.java:249)
 ~[seatunnel-starter.jar:2.3.1]
        at 
org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$reportedTask$1(CheckpointCoordinator.java:189)
 ~[seatunnel-starter.jar:2.3.1]
        at 
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
 [?:1.8.0_311]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_311]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_311]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
   Caused by: javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=kafka-metrics-count,client-id=producer-1
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) 
~[?:1.8.0_311]
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
 ~[?:1.8.0_311]
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
 ~[?:1.8.0_311]
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
 ~[?:1.8.0_311]
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
 ~[?:1.8.0_311]
        at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
~[?:1.8.0_311]
        at 
org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:227) 
~[connector-kafka-2.3.1.jar:2.3.1]
        ... 49 more
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   java
   
   ### Screenshots
   
   
![image](https://user-images.githubusercontent.com/23492991/232499897-dd08d4ef-159b-4620-97b1-3a60309fc6b2.png)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to