tomma-a opened a new issue, #10276:
URL: https://github.com/apache/seatunnel/issues/10276

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   
   
   One of my colleague who finds that for kafka source option 
'commit_on_checkpoint' default value is true
   when seatunnel job whose source is kafka ( engine is flink,  we think for 
zeta engine has the same problem) restore from a checkpoint.   it's possible to 
loss data , because when 'commit_on_checkpoint' is true:
   Beside committing kafka offset when a checkpoint is finished,   in code:
   we also set ENABLE_AUTO_COMMIT_CONFIG to true if  'commit_on_checkpoint' is 
true,  we think this logic is wrong
   We think we set ENABLE_AUTO_COMMIT_CONFIG  to true if  
'commit_on_checkpoint' is  false instead of true.
   like this:
   `
    props.setProperty(
                       ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                       
String.valueOf(!kafkaSourceConfig.isCommitOnCheckpoint()));
   `
   
   
   
https://github.com/apache/seatunnel/blob/2.3.12/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java#L342
   
   `            }
               props.setProperty(
                       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                       ByteArrayDeserializer.class.getName());
               props.setProperty(
                       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                       ByteArrayDeserializer.class.getName());
               props.setProperty(
                       ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                       
String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint()));
   
               // Disable auto create topics feature
               
props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
               return new KafkaConsumer<>(props);
           }
       }`
   
   
   
   ### SeaTunnel Version
   
   seatunnel 2.3.12
   
   ### SeaTunnel Config
   
   ```conf
   parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval=60000
     flink.execution.checkpointing.mode = "EXACTLY_ONCE"
     flink.pipeline.max-parallelism=64
     flink.execution.checkpointing.timeout = 600000
   }
   
   source {
   Kafka {
         parallelism =2
         plugin_output="fake2"
         topic = "info"
         consumer.group="testr"
         bootstrap.servers = 
"tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
         commit_on_checkpoint=true
         format = json
   }
   }
   
   
   sink {
   .....
   ```
   
   ### Running Command
   
   ```shell
   we use flink as the underlying engine
   ```
   
   ### Error Exception
   
   ```log
   No error here, just lost data when restore from a checkpoint
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to