[
https://issues.apache.org/jira/browse/FLINK-33785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17995713#comment-17995713
]
Dhwaneet Bhatt commented on FLINK-33785:
----------------------------------------
This bug is preventing us to use Flink for a streaming ETL approach using
postgres-cdc source and postgres jdbc sink.
> TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when
> primary keys were set
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-33785
> URL: https://issues.apache.org/jira/browse/FLINK-33785
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: jdbc-3.1.1
> Environment: Flink: 1.17.1
> Jdbc connector: 3.1.1
> Postgresql: 16.1
> Reporter: Bodong Liu
> Priority: Major
> Attachments: image-2023-12-08-22-24-20-295.png,
> image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png,
> image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png,
> image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png
>
>
> h1. Issue Description
> When using jdbc connector to DELETE records in database, I found it CAN NOT
> delete records correctly.
> h1. Reproduction steps
> The steps are as follows:
> * Create a table with 5 fields and a pk. DDL in postgres:
>
> {code:java}
> create table public.fake
> (
> id bigint not null default
> nextval('fake_id_seq'::regclass),
> name character varying(128) not null,
> age integer,
> location character varying(256),
> birthday timestamp without time zone default CURRENT_TIMESTAMP,
> primary key (id, name)
> );{code}
> !image-2023-12-08-22-24-26-493.png!
>
> * Insert some data into the table:
> {code:java}
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1,
> 'Jack', 10, null, '2023-12-08 21:35:46.000000');
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2,
> 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295');
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3,
> 'John', 20, null, null);
> INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4,
> 'Marry', null, null, '2023-12-08 13:37:09.721785');
> {code}
> !image-2023-12-08-22-24-58-986.png!
> * Run the flink code:
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> final String[] fieldNames = {"id", "name", "age", "location", "birthday"};
> final int[] fieldTypes = {
> Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR,
> Types.TIMESTAMP
> };
> final String[] primaryKeys = {"id", "name"};
> InternalJdbcConnectionOptions internalJdbcConnectionOptions =
> InternalJdbcConnectionOptions.builder()
>
> .setClassLoader(Thread.currentThread().getContextClassLoader())
> .setDriverName(Driver.class.getName())
> .setDBUrl("jdbc:postgresql://localhost:5432/postgres")
> .setUsername("postgres")
> .setPassword("postgres")
> .setTableName("fake")
> .setParallelism(1)
> .setConnectionCheckTimeoutSeconds(10)
> .setDialect(new PostgresDialect())
> .build();
> JdbcOutputFormat<Tuple2<Boolean, Row>, Row,
> JdbcBatchStatementExecutor<Row>> jdbcOutputFormat =
> JdbcOutputFormat.builder()
> .setFieldNames(fieldNames)
> .setKeyFields(primaryKeys)
> .setFieldTypes(fieldTypes)
> .setOptions(internalJdbcConnectionOptions)
> .setFlushIntervalMills(1000)
> .setFlushMaxSize(10)
> .setMaxRetryTimes(3)
> .build();
> GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction =
> new GenericJdbcSinkFunction<>(jdbcOutputFormat);
> Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000");
> // Row to delete
> Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp);
> Tuple2<Boolean, Row> element = Tuple2.of(false, row);
>
> env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction);
> env.execute();
> } {code}
> When the code executed successfully, we can see that the record id=1 and
> name=Jack was not deleted.
> h1. Cause Analysis
> In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was
> set in the JdbcDmlOptions, the method will return a
> 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.
> !image-2023-12-08-22-28-44-948.png!
> And in
> 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor',
> the method get all the fieldNames instead of keyFields to build the delete
> sql statement. So the detele sql may not execute correctly.
> !image-2023-12-08-22-38-08-559.png!
> h1. How to fix
> * Use the real keyFields then fallback to fieldNames to build the executor.
> !image-2023-12-08-22-42-06-566.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)