djhexe opened a new issue, #5505:
URL: https://github.com/apache/seatunnel/issues/5505

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   应用场景:从一个kafka集群(0.10.2)同步主题数据到另一个kafka集群(3.5.1)
   引擎:zeta(2.3.3),flink(1.17.0)都试过,报相同错误
   其他方式:尝试过写flink程序做相同的数据,可以正常读取和写入
   
   ### SeaTunnel Version
   
   2.3.3
   
   ### SeaTunnel Config
   
   ```conf
   env {
     job.mode = "STREAMING"
   }
   
   source {
     Kafka {
       parallelism = 64
       topic = "xxx"
       bootstrap.servers = "xxx"
       format = "text"
       field_delimiter = "\n"
       commit_on_checkpoint = true
       kafka.config = {
         group.id = "xxx"
         max.poll.records = 2000
         receive.buffer.bytes = 262144
         auto.offset.reset = "latest"
         enable.auto.commit = false
         allow.auto.create.topics = false
         key.deserializer = 
"org.apache.kafka.common.serialization.StringDeserializer"
         value.deserializer = 
"org.apache.kafka.common.serialization.StringDeserializer"
       }
     }
   }
   
   sink {
     kafka {
       parallelism = 64
       topic = "xxx"
       bootstrap.servers = "xxx"
       format = "text"
       field_delimiter = "\n"
       kafka.request.timeout.ms = 60000
       semantics = EXACTLY_ONCE
       kafka.config = {
         acks = "all"
         request.timeout.ms = 60000
         buffer.memory = 134217728
         batch.size = 131072
         max.request.size = 10485760
         linger.ms = 100
         max.in.flight.requests.per.connection = 5
         compression.type = snappy
         transactional.id = "xxx"
         key.deserializer = 
"org.apache.kafka.common.serialization.StringDeserializer"
         value.deserializer = 
"org.apache.kafka.common.serialization.StringDeserializer"
       }
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   sh bin/seatunnel.sh --config 
config/customer/flink-kafka-to-kafka-us_packet.conf
   ```
   
   
   ### Error Exception
   
   ```log
   2023-09-15 23:32:17,534 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
   
===============================================================================
   
   
   
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
           at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
           at 
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest 
versions older than 4 don't support the allowAutoTopicCreation field
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
           at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.listOffsets(KafkaSourceSplitEnumerator.java:342)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.getTopicInfo(KafkaSourceSplitEnumerator.java:279)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.fetchPendingPartitionSplit(KafkaSourceSplitEnumerator.java:372)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.run(KafkaSourceSplitEnumerator.java:130)
           at 
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:303)
           at 
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:134)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: 
MetadataRequest versions older than 4 don't support the allowAutoTopicCreation 
field
   
           at 
org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
           at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
           ... 2 more
   2023-09-15 23:32:17,535 INFO  
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run 
shutdown hook because get close signal
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   flink:1.17.0
   
   ### Java or Scala Version
   
   java1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to