wuguihu created FLINK-26595:
-------------------------------
Summary: Improve the PostgresDialect method for getting upsert
statements.
Key: FLINK-26595
URL: https://issues.apache.org/jira/browse/FLINK-26595
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 1.13.1
Reporter: wuguihu
I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
But I encountered an error.
The error message is as follows:
{quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException:
Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES
('110'::numeric, 'user_110', 'Shanghai', '123567891234', '[email protected]') ON
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name,
address=EXCLUDED.address, phone_number=EXCLUDED.phone_number,
email=EXCLUDED.email was aborted: ERROR: modification of distribution columns
in OnConflictUpdate is not supported Call getNextException to see other errors
in the batch.
{quote}
This exception is caused by the getUpsertStatement method of PostgresDialect.
There is something wrong with the upsert statement.
In the Update statement, unque-related columns should be deleted;
I did the following experiment to test my modifications.
At the same time, I recompiled and packaged flink-connector-JDBC. Using the
modified flink-connector-JDBC, my program no longer reported errors.
{code:sql}
-- 1、Create a table for maxtrixDB
CREATE TABLE user_1 (
id int,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255),
UNIQUE(id)
);
-- 2、Insert a record.
INSERT INTO user_1(id, name, address, phone_number, email)
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234',
'[email protected]')
ON CONFLICT (id)
DO UPDATE SET
id=EXCLUDED.id,
name=EXCLUDED.name,
address=EXCLUDED.address,
phone_number=EXCLUDED.phone_number,
email=EXCLUDED.email;
-- 3、Executing the above insert statement results in the following error.
ERROR: modification of distribution columns in OnConflictUpdate is not
supported
-- 4、If the value is changed to the following statement, the command is
executed successfully.
INSERT INTO user_1(id, name, address, phone_number, email)
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234',
'[email protected]')
ON CONFLICT (id)
DO UPDATE SET
name=EXCLUDED.name,
address=EXCLUDED.address,
phone_number=EXCLUDED.phone_number,
email=EXCLUDED.email;
{code}
The PostgresDialect class handles upsert statements as follows:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String uniqueColumns =
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String updateClause =
Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f) + "=EXCLUDED." +
quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(
getInsertIntoStatement(tableName, fieldNames)
+ " ON CONFLICT ("
+ uniqueColumns
+ ")"
+ " DO UPDATE SET "
+ updateClause);
}
{code}
To fix this problem, make the following changes to PostgresDialect:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String uniqueColumns =
Arrays.stream(uniqueKeyFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
List tempList = Arrays.asList(uniqueKeyFields);
String updateClause =
Arrays.stream(fieldNames)
.filter(f->!tempList.contains(f))
.map(f -> quoteIdentifier(f) + "=EXCLUDED." +
quoteIdentifier(f))
.collect(Collectors.joining(", "));
return Optional.of(
getInsertIntoStatement(tableName, fieldNames)
+ " ON CONFLICT ("
+ uniqueColumns
+ ")"
+ " DO UPDATE SET "
+ updateClause);
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)