wuguihu created FLINK-26595: ------------------------------- Summary: Improve the PostgresDialect method for getting upsert statements. Key: FLINK-26595 URL: https://issues.apache.org/jira/browse/FLINK-26595 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.13.1 Reporter: wuguihu
I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time. But I encountered an error. The error message is as follows: {quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_...@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch. {quote} This exception is caused by the getUpsertStatement method of PostgresDialect. There is something wrong with the upsert statement. In the Update statement, unque-related columns should be deleted; I did the following experiment to test my modifications. At the same time, I recompiled and packaged flink-connector-JDBC. Using the modified flink-connector-JDBC, my program no longer reported errors. {code:sql} -- 1、Create a table for maxtrixDB CREATE TABLE user_1 ( id int, name VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), UNIQUE(id) ); -- 2、Insert a record. INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_...@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email; -- 3、Executing the above insert statement results in the following error. ERROR: modification of distribution columns in OnConflictUpdate is not supported -- 4、If the value is changed to the following statement, the command is executed successfully. INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_...@foo.com') ON CONFLICT (id) DO UPDATE SET name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email; {code} The PostgresDialect class handles upsert statements as follows: {code:java} // package org.apache.flink.connector.jdbc.dialect.psql public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { String uniqueColumns = Arrays.stream(uniqueKeyFields) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String updateClause = Arrays.stream(fieldNames) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of( getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT (" + uniqueColumns + ")" + " DO UPDATE SET " + updateClause); } {code} To fix this problem, make the following changes to PostgresDialect: {code:java} // package org.apache.flink.connector.jdbc.dialect.psql public Optional<String> getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { String uniqueColumns = Arrays.stream(uniqueKeyFields) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); List tempList = Arrays.asList(uniqueKeyFields); String updateClause = Arrays.stream(fieldNames) .filter(f->!tempList.contains(f)) .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); return Optional.of( getInsertIntoStatement(tableName, fieldNames) + " ON CONFLICT (" + uniqueColumns + ")" + " DO UPDATE SET " + updateClause); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)