https://docs.google.com/document/d/1p2K1tYiLz7nvaW4akTGdHDPzG_fMjXzhtzVKpSfcNC0/edit?usp=sharing
On Mon, May 11, 2026 at 1:30 PM FeatZhang <[email protected]> wrote: > Hi devs, > > I'd like to open discussion on `Async MongoDB Sink with Error > Classification and Shared Connection Pool`, a proposal to modernize the > sink path of flink-connector-mongodb by bringing it in line with the > AsyncSink standard, adding a structured error-handling model, and > introducing a TM-shared MongoClient pool. > Motivation > > While reviewing flink-connector-mongodb at HEAD, three recurring issues > showed up in production workloads: > > 1. > > MongoSink still extends SinkV2 directly, so it does not benefit > from AsyncSinkBase's in-flight rate limiting, automatic batching, > or the standard connector metrics that Kinesis / DynamoDB / > Firehose already expose. prepareCommit also blocks on flush, so > barrier alignment latency is amplified by MongoDB write latency. > 2. > > MongoWriter.doBulkWrite catches the top-level MongoException and > retries a fixed number of times with LINEAR backoff. That causes > (a) permanent errors such as E11000 / E10334 / invalid-doc to be > retried uselessly, (b) thundering-herd patterns after transient > overload, and (c) no dead-letter path for unrecoverable records. > This overlaps with the recent thread "Optional configuration > parameters for better performance" and with FLINK-39398 (PR #66 > by Savonitar, which only covers duplicate keys). > 3. > > Every MongoWriter / MongoSourceReader / MongoLookupFunction > instance opens its own MongoClient. A job with sink=4 + lookup=4 > - source=4 against the same cluster opens 3 x 4 x poolSize > connections per TM, plus the corresponding SDAM and DNS monitor > threads. > > Proposal > > Three coordinated changes, shipped as sub-tasks under a single FLIP: > > 1. > > MongoAsyncSink on top of AsyncSinkBase (FLIP-171). Public API: > > MongoAsyncSink.builder() > .setConnectionOptions(...) > .setSerializationSchema(...) > .setErrorClassifier(...) // see (2) > .setDeadLetterSink(...) // see (2) > .setClientProvider(...) // see (3) > // AsyncSinkBase options inherited as-is: > // maxBatchSize / maxInFlightRequests / maxBufferedRequests > // maxBatchSizeInBytes / maxTimeInBufferMS > .build(); > > The existing MongoSink / MongoSinkBuilder are kept for one minor > release as @Deprecated delegates to MongoAsyncSink. > 2. > > A pluggable MongoErrorClassifier mapping MongoDB server error > codes to one of: > > RETRY_WITH_BACKOFF | DEAD_LETTER | IGNORE | UPSERT | FAIL_JOB > > Default mapping (configurable): > > 6,7,89,91,189,13435,13436 -> RETRY_WITH_BACKOFF (transient) > 50 -> RETRY_WITH_BACKOFF (timeout) > 11000,11001 (dup key) -> DEAD_LETTER | IGNORE | UPSERT > 10334,2,14,121 (invalid doc) -> DEAD_LETTER > others -> FAIL_JOB > > Retries use exponential backoff with jitter (via AsyncSink's > ResultHandler.retryForEntries). DEAD_LETTER is routed to a > user-supplied Sink with full metadata. > 3. > > A TM-shared MongoClientProvider (ref-counted, keyed by canonical > URI + credentials digest + TLS + compressors + appName). Sink, > Source and Lookup all go through: > > MongoClient c = provider.acquire(options); > ... > provider.release(c); // underlying close() when refCount == 0 > > Default provider: sharedPerTaskManager(). Legacy behavior is kept > via perInstance() and a sink.client-provider=PER_INSTANCE option. > > Backward compatibility > > - MongoSink / MongoSinkBuilder: @Deprecated, internally delegating. > - Legacy ConfigOptions (sink.bulk-flush.max-actions, > sink.bulk-flush.interval) continue to work; mapped to > maxBatchSize / maxTimeInBufferMS at runtime with a WARN log. > - Table DDL parameters: legacy keys kept for one minor version. > - State schema is not binary compatible; a MongoStateMigrator tool > is provided to replay unacked entries from an old savepoint into > the new sink. > > Deprecation timeline: > flink-connector-mongodb 2.0 : both exist, legacy is default > flink-connector-mongodb 2.1 : MongoAsyncSink becomes default > flink-connector-mongodb 3.0 : legacy MongoSink removed > Test plan > > - Unit tests: DefaultMongoErrorClassifierTest (all branches), > SharedMongoClientProviderTest (concurrency / ref-count), > MongoSinkWriterTest (standard AsyncSink behaviors). > - Integration tests with testcontainers MongoDB 6.0 / 7.0 covering > network partition, primary step-down, oversized doc, duplicate > key, and savepoint migration from the legacy sink. > - JMH benchmark: 1 KB docs, parallelism 100, 5 min. Target: > throughput >= 1.2x legacy, p99 <= legacy, TM heap -30% (from the > shared client). > - CI matrix expanded to Flink 1.20 LTS / 2.0 / 2.1. > > Note on FLINK-36228 > > An earlier attempt (FLINK-36228) was resolved as Fixed but later > withdrawn by the reporter -- no AsyncSink migration actually landed. > This FLIP is not a duplicate and is scoped wider (error classifier > and shared client pool). I'm happy to link the reasoning from that > ticket in a dedicated "history" section of the cwiki page if that > helps reviewers. > > > Best, > featzhang >
