Pietro created FLINK-34722:
------------------------------

             Summary: Support conditional upserts with Postgres JDBC sink
                 Key: FLINK-34722
                 URL: https://issues.apache.org/jira/browse/FLINK-34722
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / JDBC
    Affects Versions: jdbc-3.1.2
            Reporter: Pietro


The default Postgres dialect used by the JDBC sink for PostgreSQL DBs does not 
support custom _WHERE_ conditions inside upsert statements at the moment.

Indeed, upsert statements returned by the 
{{[getUpsertStatement()|https://github.com/apache/flink-connector-jdbc/blob/95294ffbc57c93c2af34cda94c27fc5781e06177/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L61]}}
 method are limited to:
{code:sql}
ON CONFLICT (col1, ..., colN) 
DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN)
{code}
PostgreSQL allows a finer-grained control of upsert statements by specifying a 
_WHERE_ statement (see [ON CONFLICT 
Clause|https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT]),
 for instance:
{code:sql}
ON CONFLICT (col1, ..., colN) 
DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN) 
WHERE colN < EXCLUDED.colN
{code}
 
This could be useful in many use cases, for instance, in a CDC scenario where a 
batch reconciliation process has written records in the destination, which now 
face the risk of being overwritten by late arriving, stale, records in the 
streaming pipeline (adding a condition on the operation timestamp could protect 
from these events). 

My proposal is to extend the 
{{[AbstractPostgresCompatibleDialect|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java]}}
 functionalities by making the upsert query support _WHERE_ statements provided 
by users.

I'm thinking of two possible approaches, but I'd love to hear your opinion on 
this:
 # provide the statement through options of the JDBC sink connector.
 # allow users to plug custom dialects without them having to rewrite the whole 
JDBC sink (about this I'll open a separate issue soon) 

Thanks for your consideration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to