[ 
https://issues.apache.org/jira/browse/FLINK-38734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikola Stanisavljevic updated FLINK-38734:
------------------------------------------
    Description: 
The idea is to be able to customize how update set expression will look like 
when doing upserts. At the moment update set expression is a complete row and 
there is no customization. If we want to do conditional update only of 
particular column or just partial update, we would need to keep in state and 
create statefull processing in order to correctly upsert. If we are able to 
customize update set expression, we dont need to keep things in state, and we 
can rely on database.

Option could like this
{code:java}
public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
ConfigOptions.key("sink.upsert.update-set-expression")
.stringType()
.noDefaultValue()
.withDescription("Update set expression to be used in upsert statement. If not 
specified default is used which will upsert with whole row");{code}
Usage in sql code could look like this:
{code:java}
INSERT INTO catalog.db.table
/*+ OPTIONS( 'sink.upsert.update-set-expression' =
'load_time = VALUES(load_time),
epoch = LEAST(epoch, VALUES(epoch)),
last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
) */
...{code}
I would appreciate input on proposal. 
Regarding implementation i already have working code in my fork for mysql.

  was:
The idea is to be able to customize how update set expression will look like 
when doing upserts. At the moment update set expression is a complete row and 
there is no customization. If we want to do conditional update only of 
particular column or just partial update, we would need to keep in state and 
create statefull processing in order to correctly upsert. If we are able to 
customize update set expression, we dont need to keep things in state, and we 
can rely on database.

Option could like this
{code:java}
public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
ConfigOptions.key("sink.upsert.update-set-expression")
.stringType()
.noDefaultValue()
.withDescription("Update set expression to be used in upsert statement. If not 
specified default is used which will upsert with whole row");{code}
Usage in sql code could look like this:
{code:java}
INSERT INTO catalog.db.table
/*+ OPTIONS( 'sink.upsert.update-set-expression' =
'load_time = VALUES(load_time),
epoch = LEAST(epoch, VALUES(epoch)),
last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
) */
...{code}
I would appreciate input on proposal. 
Regarding implementation i already have working code in my fork.


> Add option to customize update set expression when doing upserts with table 
> sink
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-38734
>                 URL: https://issues.apache.org/jira/browse/FLINK-38734
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / JDBC
>            Reporter: Nikola Stanisavljevic
>            Priority: Not a Priority
>
> The idea is to be able to customize how update set expression will look like 
> when doing upserts. At the moment update set expression is a complete row and 
> there is no customization. If we want to do conditional update only of 
> particular column or just partial update, we would need to keep in state and 
> create statefull processing in order to correctly upsert. If we are able to 
> customize update set expression, we dont need to keep things in state, and we 
> can rely on database.
> Option could like this
> {code:java}
> public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
> ConfigOptions.key("sink.upsert.update-set-expression")
> .stringType()
> .noDefaultValue()
> .withDescription("Update set expression to be used in upsert statement. If 
> not specified default is used which will upsert with whole row");{code}
> Usage in sql code could look like this:
> {code:java}
> INSERT INTO catalog.db.table
> /*+ OPTIONS( 'sink.upsert.update-set-expression' =
> 'load_time = VALUES(load_time),
> epoch = LEAST(epoch, VALUES(epoch)),
> last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
> ) */
> ...{code}
> I would appreciate input on proposal. 
> Regarding implementation i already have working code in my fork for mysql.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to