zhangshenghang opened a new issue, #6986: URL: https://github.com/apache/seatunnel/issues/6986
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened Described in the documentation: <img width="1129" alt="image" src="https://github.com/apache/seatunnel/assets/29418975/474cc691-f414-4e4c-a154-9241f862bcec"> I submit the Flink task to Yarn for execution through the following command ``` bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job ``` test.config ``` env { parallelism = 2 job.mode = "STREAMING" # flink.execution.checkpointing.interval=5000 } source { Kafka { schema = { fields { comment_num = string insert_time = string user_info = { username = string, age = string } } } topic = "test-topic" consumer.group = "test-group" bootstrap.servers = "xxxx" kafka.config = { client.id = client_1 max.poll.records = 500 auto.offset.reset = "earliest" enable.auto.commit = "false" } result_table_name = "kafka_table" } } sink { Elasticsearch { source_table_name = "kafka_table" hosts = ["xxxx"] index = "test-588" } } ``` I did not set the `flink.execution.checkpointing.interval` parameter in the configuration file, The `checkpoint.intercal` parameter is not set in the Flink configuration file. At this time, writing to ElasticSearch will lose data. Because ElasticSearch Sink uses the parameter maxBatchSize to submit in batches, it will process uncommitted data through prepareCommit ``` @Override public Optional<ElasticsearchCommitInfo> prepareCommit() { bulkEsWithRetry(this.esRestClient, this.requestEsList); return Optional.empty(); } ``` This may be because the default value of checkpoint.interval is not set in Flink STREAMING mode in the code.  If this is a problem, please assign me. ### SeaTunnel Version 2.3.5 ### SeaTunnel Config ```conf seatunnel.yaml seatunnel: engine: history-job-expire-minutes: 1440 backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: interval: 10000 timeout: 60000 storage: type: hdfs max-retained: 3 plugin-config: namespace: /tmp/seatunnel/checkpoint_snapshot storage.type: hdfs ``` ``` ### Running Command ```shell bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config --master yarn-per-job ``` ### Error Exception ```log When the data does not meet the maxBatchSize, the writer will not write the data. This will cause the data to not be flushed. ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
