Hi everyone,
We are running a Flink 1.20 streaming job that consumes events from Kafka
and upserts them into PostgreSQL using the JDBC connector
(flink-connector-jdbc 3.3.0-1.20). We use INSERT ... ON CONFLICT (pk) DO
UPDATE SET ... statements with the at-least-once sink built via JdbcSink.
We recently enabled the PostgreSQL JDBC driver flag
`reWriteBatchedInserts=true` on the connection URL to improve write
throughput. This causes the driver to rewrite individual batched statements
into a single multi-row INSERT:
INSERT INTO t (id, ...) VALUES (1, ...), (2, ...), (1, ...) ON CONFLICT
(id) DO UPDATE SET ...
PostgreSQL rejects this with:
> "ON CONFLICT
DO UPDATE command cannot affect row a second time"
This happens because `ON CONFLICT DO UPDATE` only resolves conflicts with
existing rows in the table, not intra-statement duplicates. The root cause
on the Flink side is that SimpleBatchStatementExecutor buffers records in a
`List<T>`, so the same primary key can appear multiple times within a
single `executeBatch()` call. Without reWriteBatchedInserts, each row is
sent as a separate statement and there is no problem. With it enabled, the
driver merges them into one statement, exposing the duplicate keys.
The existing `KeyedBatchStatementExecutor` does track seen keys via a
`Set<K>`, but it only stores the keys, not the full records - it is
designed for DELETE statements and cannot be used for upserts.
As a workaround we wrote a `DeduplicatingBatchStatementExecutor` that
replaces the `List<T>` with a `LinkedHashMap<K, T>`, keyed by the primary
key. At each flush, only the latest record per key is written. This
guarantees each key appears at most once per `executeBatch()` call, which
makes `reWriteBatchedInserts=true` work correctly. However, to plug this
custom executor into the pipeline, we had to bypass `JdbcSink` entirely and
directly instantiate `JdbcOutputFormat` with a custom
`StatementExecutorFactory`. This required us to duplicate a fair amount of
wiring code from `JdbcSink` internals, and we depend on
`@Internal`-annotated classes (`JdbcOutputFormat`,
`JdbcBatchStatementExecutor`, `JdbcOutputSerializer`) that are not part of
the public API.
Proposal / Question
Would it make sense to make the JDBC sink more extensible in this area? Two
ideas:
1. Expose a way to provide a custom `JdbcBatchStatementExecutor` (or
factory) through the public `JdbcSink` builder API. This would allow users
to plug in alternative batching strategies without bypassing the builder
and depending on internal classes.
2. Provide a built-in key-aware deduplicating executor for upsert use cases
- similar to the existing `KeyedBatchStatementExecutor`, but retaining the
full record (latest-wins) rather than just the key.
Either of these would have made our workaround significantly simpler and
would avoid depending on `@Internal` classes. We are aware of
https://github.com/apache/flink-connector-jdbc/pull/188 (UNNEST-based
inserts for PostgreSQL), which takes a different approach to the
performance problem, but the deduplication issue exists independently of
the insert strategy. We have a minimal PoC reproducing the problem and
demonstrating our fix. We'd be happy to contribute a patch if the community
thinks this is a reasonable direction.
Thanks for your time.
Best regards,
Christian