Pietro created FLINK-34722: ------------------------------ Summary: Support conditional upserts with Postgres JDBC sink Key: FLINK-34722 URL: https://issues.apache.org/jira/browse/FLINK-34722 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: jdbc-3.1.2 Reporter: Pietro
The default Postgres dialect used by the JDBC sink for PostgreSQL DBs does not support custom _WHERE_ conditions inside upsert statements at the moment. Indeed, upsert statements returned by the {{[getUpsertStatement()|https://github.com/apache/flink-connector-jdbc/blob/95294ffbc57c93c2af34cda94c27fc5781e06177/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L61]}} method are limited to: {code:sql} ON CONFLICT (col1, ..., colN) DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN) {code} PostgreSQL allows a finer-grained control of upsert statements by specifying a _WHERE_ statement (see [ON CONFLICT Clause|https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT]), for instance: {code:sql} ON CONFLICT (col1, ..., colN) DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN) WHERE colN < EXCLUDED.colN {code} This could be useful in many use cases, for instance, in a CDC scenario where a batch reconciliation process has written records in the destination, which now face the risk of being overwritten by late arriving, stale, records in the streaming pipeline (adding a condition on the operation timestamp could protect from these events). My proposal is to extend the {{[AbstractPostgresCompatibleDialect|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java]}} functionalities by making the upsert query support _WHERE_ statements provided by users. I'm thinking of two possible approaches, but I'd love to hear your opinion on this: # provide the statement through options of the JDBC sink connector. # allow users to plug custom dialects without them having to rewrite the whole JDBC sink (about this I'll open a separate issue soon) Thanks for your consideration -- This message was sent by Atlassian Jira (v8.20.10#820010)