zhangshenghang opened a new issue, #6986:
URL: https://github.com/apache/seatunnel/issues/6986

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Described in the documentation:
   <img width="1129" alt="image" 
src="https://github.com/apache/seatunnel/assets/29418975/474cc691-f414-4e4c-a154-9241f862bcec";>
   
   I submit the Flink task to Yarn for execution through the following command
   ```
   bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config 
--master yarn-per-job
   ```
   test.config
   ```
   env {
     parallelism = 2
     job.mode = "STREAMING"
   #   flink.execution.checkpointing.interval=5000
   }
   source {
     Kafka {
       schema = {
         fields {
           comment_num = string
           insert_time = string
           user_info = {
               username = string,
               age = string
           }
         }
       }
       topic = "test-topic"
       consumer.group   = "test-group"
       bootstrap.servers = "xxxx"
       kafka.config = {
         client.id = client_1
         max.poll.records = 500
         auto.offset.reset = "earliest"
         enable.auto.commit = "false"
       }
       result_table_name = "kafka_table"
     }
   }
   
   
   
   sink {
       Elasticsearch {
           source_table_name = "kafka_table"
           hosts = ["xxxx"]
           index = "test-588"
       }
   }
   ```
   I did not set the `flink.execution.checkpointing.interval` parameter in the 
configuration file, 
   The `checkpoint.intercal` parameter is not set in the Flink configuration 
file.
   At this time, writing to ElasticSearch will lose data.
   
   Because ElasticSearch Sink uses the parameter maxBatchSize to submit in 
batches, it will process uncommitted data through prepareCommit
   ```
       @Override
       public Optional<ElasticsearchCommitInfo> prepareCommit() {
           bulkEsWithRetry(this.esRestClient, this.requestEsList);
           return Optional.empty();
       }
   ```
   
   This may be because the default value of checkpoint.interval is not set in 
Flink STREAMING mode in the code.
   
![image](https://github.com/apache/seatunnel/assets/29418975/8d589fec-0c96-4eab-862c-ef4aba5d38b4)
   
   If this is a problem, please assign me. 
   
   
   
   ### SeaTunnel Version
   
   2.3.5
   
   ### SeaTunnel Config
   
   ```conf
   seatunnel.yaml
   
   seatunnel:
     engine:
       history-job-expire-minutes: 1440
       backup-count: 1
       queue-type: blockingqueue
       print-execution-info-interval: 60
       print-job-metrics-info-interval: 60
       slot-service:
         dynamic-slot: true
       checkpoint:
         interval: 10000
         timeout: 60000
         storage:
           type: hdfs
           max-retained: 3
           plugin-config:
             namespace: /tmp/seatunnel/checkpoint_snapshot
             storage.type: hdfs
   ```
   ```
   
   
   ### Running Command
   
   ```shell
   bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/test.config 
--master yarn-per-job
   ```
   
   
   ### Error Exception
   
   ```log
   When the data does not meet the maxBatchSize, the writer will not write the 
data. This will cause the data to not be flushed.
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to