tomma-a opened a new issue, #10276: URL: https://github.com/apache/seatunnel/issues/10276
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened One of my colleague who finds that for kafka source option 'commit_on_checkpoint' default value is true when seatunnel job whose source is kafka ( engine is flink, we think for zeta engine has the same problem) restore from a checkpoint. it's possible to loss data , because when 'commit_on_checkpoint' is true: Beside committing kafka offset when a checkpoint is finished, in code: we also set ENABLE_AUTO_COMMIT_CONFIG to true if 'commit_on_checkpoint' is true, we think this logic is wrong We think we set ENABLE_AUTO_COMMIT_CONFIG to true if 'commit_on_checkpoint' is false instead of true. like this: ` props.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(!kafkaSourceConfig.isCommitOnCheckpoint())); ` https://github.com/apache/seatunnel/blob/2.3.12/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java#L342 ` } props.setProperty( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.setProperty( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint())); // Disable auto create topics feature props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); return new KafkaConsumer<>(props); } }` ### SeaTunnel Version seatunnel 2.3.12 ### SeaTunnel Config ```conf parallelism = 1 job.mode = "STREAMING" checkpoint.interval=60000 flink.execution.checkpointing.mode = "EXACTLY_ONCE" flink.pipeline.max-parallelism=64 flink.execution.checkpointing.timeout = 600000 } source { Kafka { parallelism =2 plugin_output="fake2" topic = "info" consumer.group="testr" bootstrap.servers = "tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" commit_on_checkpoint=true format = json } } sink { ..... ``` ### Running Command ```shell we use flink as the underlying engine ``` ### Error Exception ```log No error here, just lost data when restore from a checkpoint ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
