Aleksandr Savonin created FLINK-39398:
-----------------------------------------

             Summary: Handle duplicate key errors in MongoWriter bulk write 
retry loop
                 Key: FLINK-39398
                 URL: https://issues.apache.org/jira/browse/FLINK-39398
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / MongoDB
            Reporter: Aleksandr Savonin


MongoWriter.doBulkWrite() catches MongoException broadly without inspecting 
MongoBulkWriteException.getWriteErrors(). This causes two problems when bulk 
writes encounter duplicate key errors (E11000):

*Problem 1: Infinite restart loop*

When using InsertOneModel with a unique index and AT_LEAST_ONCE delivery 
guarantee, an unplanned failure (TaskManager kill, OOM, crash) causes Flink to 
replay records from the last checkpoint. Some of these records may already 
exist in MongoDB because they were written before the crash but the source 
offset was not yet committed. The retry loop retries the identical batch 
unchanged, all retries fail with the same E11000 errors, and after exhausting 
maxRetries throws IOException. Flink restarts from checkpoint, replays the same 
records, and the cycle repeats indefinitely.

*Problem 2: Un-attempted operations dropped with ordered=true*

With ordered=true (the default), MongoDB stops at the first error. Operations 
after the error index are never attempted by the server. The current code 
retries the entire batch unchanged without re-queuing un-attempted operations, 
which can lead to data loss.

 

*Solution*

Add a configurable sink.duplicate-key-handling option with two strategies:

- fail (default): *Preserves* existing retry behavior. Improves the error 
message to suggest skip-duplicates or upsert mode when E11000 is detected.
- skip-duplicates: Inspects MongoBulkWriteException.getWriteErrors() 
per-operation. Skips duplicate key errors. For ordered=true, re-queues 
un-attempted operations after the error index. For ordered=false, considers 
non-duplicate operations as already succeeded (no re-queuing).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to