afedulov commented on a change in pull request #18666:
URL: https://github.com/apache/flink/pull/18666#discussion_r802735376



##########
File path: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
##########
@@ -125,15 +124,14 @@ public void write(IN element, Context context) throws 
IOException, InterruptedEx
     }
 
     @Override
-    public List<Void> prepareCommit(boolean flush) throws IOException, 
InterruptedException {
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
         checkpointInProgress = true;
-        while (pendingActions != 0 && (flushOnCheckpoint || flush)) {
+        while (pendingActions != 0 && flushOnCheckpoint) {

Review comment:
       I am not sure there is anything that can be done differently.  My 
understanding is that it would be only useful, if there were cases, where we 
would not want to flush, unless this is an end-of-input-situation.  For 
instance, `KafkaWriter` does not flush if the `DeliveryGuaranty` is `NONE`:  
https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L195
 
   
   ES connector does not make such distinction and seems to always operate in 
at-least-once mode.  
   The only difference I could think of is that maybe `checkpointInProgress` 
should not be set if the `flush()` call was not caused by the checkpoint, as 
the docs for it state:
   ```
   Called on checkpoint or end of input so that the writer to flush all pending 
data for at-least-once.
   ```
   but the only place where the `checkpointInProgress`  is used talks about 
simply waiting for the flush to finish, regardless of the triggering reason:
   
https://github.com/apache/flink/blob/4d4b1fa4851ce8564c319a2976c1cf0f48281b2a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java#L118-L122




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to