djhexe opened a new issue, #5505: URL: https://github.com/apache/seatunnel/issues/5505
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 应用场景:从一个kafka集群(0.10.2)同步主题数据到另一个kafka集群(3.5.1) 引擎:zeta(2.3.3),flink(1.17.0)都试过,报相同错误 其他方式:尝试过写flink程序做相同的数据,可以正常读取和写入 ### SeaTunnel Version 2.3.3 ### SeaTunnel Config ```conf env { job.mode = "STREAMING" } source { Kafka { parallelism = 64 topic = "xxx" bootstrap.servers = "xxx" format = "text" field_delimiter = "\n" commit_on_checkpoint = true kafka.config = { group.id = "xxx" max.poll.records = 2000 receive.buffer.bytes = 262144 auto.offset.reset = "latest" enable.auto.commit = false allow.auto.create.topics = false key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer" value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer" } } } sink { kafka { parallelism = 64 topic = "xxx" bootstrap.servers = "xxx" format = "text" field_delimiter = "\n" kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE kafka.config = { acks = "all" request.timeout.ms = 60000 buffer.memory = 134217728 batch.size = 131072 max.request.size = 10485760 linger.ms = 100 max.in.flight.requests.per.connection = 5 compression.type = snappy transactional.id = "xxx" key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer" value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer" } } } ``` ### Running Command ```shell sh bin/seatunnel.sh --config config/customer/flink-kafka-to-kafka-us_packet.conf ``` ### Error Exception ```log 2023-09-15 23:32:17,534 ERROR org.apache.seatunnel.core.starter.SeaTunnel - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest versions older than 4 don't support the allowAutoTopicCreation field at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.listOffsets(KafkaSourceSplitEnumerator.java:342) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.getTopicInfo(KafkaSourceSplitEnumerator.java:279) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.fetchPendingPartitionSplit(KafkaSourceSplitEnumerator.java:372) at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.run(KafkaSourceSplitEnumerator.java:130) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:303) at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:134) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest versions older than 4 don't support the allowAutoTopicCreation field at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184) ... 2 more 2023-09-15 23:32:17,535 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal ``` ### Zeta or Flink or Spark Version flink:1.17.0 ### Java or Scala Version java1.8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
