[ 
https://issues.apache.org/jira/browse/FLINK-26595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wuguihu updated FLINK-26595:
----------------------------
    Description: 
I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
But I encountered an error.
The error message is as follows:
{quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES 
('110'::numeric, 'user_110', 'Shanghai', '123567891234', '[email protected]') ON 
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, 
address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email was aborted: ERROR: modification of distribution columns 
in OnConflictUpdate is not supported Call getNextException to see other errors 
in the batch.
{quote}
This exception is caused by the getUpsertStatement method of PostgresDialect.
There is something wrong with the upsert statement.
In the Update statement, uniqueKey-related columns should be deleted;

 

I did the following experiment to test my modifications.
At the same time, I recompiled and packaged flink-connector-JDBC. Using the 
modified flink-connector-JDBC, my program no longer reported errors.
{code:sql}
-- 1、Create a table for maxtrixDB
CREATE TABLE user_1 (
  id int,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  UNIQUE(id)
);


-- 2、Insert a record.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
'[email protected]') 
ON CONFLICT (id) 
DO UPDATE SET 
id=EXCLUDED.id, 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;

-- 3、Executing the above insert statement results in the following error.
ERROR:  modification of distribution columns in OnConflictUpdate is not 
supported


-- 4、If the value is changed to the following statement, the command is 
executed successfully.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
'[email protected]') 
ON CONFLICT (id) 
DO UPDATE SET 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;
{code}
 

 

The PostgresDialect class handles upsert statements as follows:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String updateClause =
                Arrays.stream(fieldNames)
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }
{code}
 

 

To fix this problem, make the following changes to PostgresDialect:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        List tempList = Arrays.asList(uniqueKeyFields);
        String updateClause =
                Arrays.stream(fieldNames)
                        .filter(f->!tempList.contains(f))
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }
{code}

  was:
I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
But I encountered an error.
The error message is as follows:
{quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: 
Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES 
('110'::numeric, 'user_110', 'Shanghai', '123567891234', '[email protected]') ON 
CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, 
address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email was aborted: ERROR: modification of distribution columns 
in OnConflictUpdate is not supported Call getNextException to see other errors 
in the batch.
{quote}
This exception is caused by the getUpsertStatement method of PostgresDialect.
There is something wrong with the upsert statement.
In the Update statement, unque-related columns should be deleted;

 

I did the following experiment to test my modifications.
At the same time, I recompiled and packaged flink-connector-JDBC. Using the 
modified flink-connector-JDBC, my program no longer reported errors.
{code:sql}
-- 1、Create a table for maxtrixDB
CREATE TABLE user_1 (
  id int,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  UNIQUE(id)
);


-- 2、Insert a record.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
'[email protected]') 
ON CONFLICT (id) 
DO UPDATE SET 
id=EXCLUDED.id, 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;

-- 3、Executing the above insert statement results in the following error.
ERROR:  modification of distribution columns in OnConflictUpdate is not 
supported


-- 4、If the value is changed to the following statement, the command is 
executed successfully.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
'[email protected]') 
ON CONFLICT (id) 
DO UPDATE SET 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;
{code}
 

 

The PostgresDialect class handles upsert statements as follows:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String updateClause =
                Arrays.stream(fieldNames)
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }
{code}
 

 

To fix this problem, make the following changes to PostgresDialect:
{code:java}
// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        List tempList = Arrays.asList(uniqueKeyFields);
        String updateClause =
                Arrays.stream(fieldNames)
                        .filter(f->!tempList.contains(f))
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }
{code}


> Improve the PostgresDialect method for getting upsert statements.
> -----------------------------------------------------------------
>
>                 Key: FLINK-26595
>                 URL: https://issues.apache.org/jira/browse/FLINK-26595
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.13.1
>            Reporter: wuguihu
>            Priority: Critical
>         Attachments: image-20220311125613545.png, 
> image-20220311130744606.png, image-20220311141815540.png
>
>
> I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real 
> time.
> But I encountered an error.
> The error message is as follows:
> {quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: 
> Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) 
> VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
> '[email protected]') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, 
> name=EXCLUDED.name, address=EXCLUDED.address, 
> phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: 
> modification of distribution columns in OnConflictUpdate is not supported 
> Call getNextException to see other errors in the batch.
> {quote}
> This exception is caused by the getUpsertStatement method of PostgresDialect.
> There is something wrong with the upsert statement.
> In the Update statement, uniqueKey-related columns should be deleted;
>  
> I did the following experiment to test my modifications.
> At the same time, I recompiled and packaged flink-connector-JDBC. Using the 
> modified flink-connector-JDBC, my program no longer reported errors.
> {code:sql}
> -- 1、Create a table for maxtrixDB
> CREATE TABLE user_1 (
>   id int,
>   name VARCHAR(255) NOT NULL DEFAULT 'flink',
>   address VARCHAR(1024),
>   phone_number VARCHAR(512),
>   email VARCHAR(255),
>   UNIQUE(id)
> );
> -- 2、Insert a record.
> INSERT INTO user_1(id, name, address, phone_number, email) 
> VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
> '[email protected]') 
> ON CONFLICT (id) 
> DO UPDATE SET 
> id=EXCLUDED.id, 
> name=EXCLUDED.name, 
> address=EXCLUDED.address, 
> phone_number=EXCLUDED.phone_number, 
> email=EXCLUDED.email;
> -- 3、Executing the above insert statement results in the following error.
> ERROR:  modification of distribution columns in OnConflictUpdate is not 
> supported
> -- 4、If the value is changed to the following statement, the command is 
> executed successfully.
> INSERT INTO user_1(id, name, address, phone_number, email) 
> VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 
> '[email protected]') 
> ON CONFLICT (id) 
> DO UPDATE SET 
> name=EXCLUDED.name, 
> address=EXCLUDED.address, 
> phone_number=EXCLUDED.phone_number, 
> email=EXCLUDED.email;
> {code}
>  
>  
> The PostgresDialect class handles upsert statements as follows:
> {code:java}
> // package org.apache.flink.connector.jdbc.dialect.psql
>     public Optional<String> getUpsertStatement(
>             String tableName, String[] fieldNames, String[] uniqueKeyFields) {
>         String uniqueColumns =
>                 Arrays.stream(uniqueKeyFields)
>                         .map(this::quoteIdentifier)
>                         .collect(Collectors.joining(", "));
>         String updateClause =
>                 Arrays.stream(fieldNames)
>                         .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
> quoteIdentifier(f))
>                         .collect(Collectors.joining(", "));
>         return Optional.of(
>                 getInsertIntoStatement(tableName, fieldNames)
>                         + " ON CONFLICT ("
>                         + uniqueColumns
>                         + ")"
>                         + " DO UPDATE SET "
>                         + updateClause);
>     }
> {code}
>  
>  
> To fix this problem, make the following changes to PostgresDialect:
> {code:java}
> // package org.apache.flink.connector.jdbc.dialect.psql
>     public Optional<String> getUpsertStatement(
>             String tableName, String[] fieldNames, String[] uniqueKeyFields) {
>         String uniqueColumns =
>                 Arrays.stream(uniqueKeyFields)
>                         .map(this::quoteIdentifier)
>                         .collect(Collectors.joining(", "));
>         List tempList = Arrays.asList(uniqueKeyFields);
>         String updateClause =
>                 Arrays.stream(fieldNames)
>                         .filter(f->!tempList.contains(f))
>                         .map(f -> quoteIdentifier(f) + "=EXCLUDED." + 
> quoteIdentifier(f))
>                         .collect(Collectors.joining(", "));
>         return Optional.of(
>                 getInsertIntoStatement(tableName, fieldNames)
>                         + " ON CONFLICT ("
>                         + uniqueColumns
>                         + ")"
>                         + " DO UPDATE SET "
>                         + updateClause);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to