[ 
https://issues.apache.org/jira/browse/FLINK-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenyore updated FLINK-24626:
----------------------------
    Description: 
The JDBC sink will lose some data while using 
TableBufferReducedStatementExecutor and left join.

Here are a snippet of executeBatch.
{code:java}
    @Override
    public void executeBatch() throws SQLException {
        for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
reduceBuffer.entrySet()) {
            if (entry.getValue().f0) {
                upsertExecutor.addToBatch(entry.getValue().f1);
            } else {
                // delete by key
                deleteExecutor.addToBatch(entry.getKey());
            }
        }
        upsertExecutor.executeBatch();
        deleteExecutor.executeBatch();
        reduceBuffer.clear();
    }
{code}
Left join will generate a DETETE row before upsert row and the executeBatch 
will excute them in a wrong order.Whitch may causes data lose.

  was:
The JDBC sink will lose some data while using 
TableBufferReducedStatementExecutor.

Here are some snippets.

{code}
    @Override
    public void addToBatch(RowData record) throws SQLException {
        RowData key = keyExtractor.apply(record);
        if(record.getRowKind()==RowKind.DELETE) {
            //XXX cut delete off because the retract stream would generate
            return;
        }
        boolean flag = changeFlag(record.getRowKind());
        RowData value = valueTransform.apply(record); // copy or not
        reduceBuffer.put(key, Tuple2.of(flag, value));
    }

    private boolean changeFlag(RowKind rowKind) {
        switch (rowKind) {
            case INSERT:
            case UPDATE_AFTER:
                return true;
            case DELETE:
            case UPDATE_BEFORE:
                return false;
            default:
                throw new UnsupportedOperationException(
                        String.format(
                                "Unknown row kind, the supported row kinds is: 
INSERT, UPDATE_BEFORE, UPDATE_AFTER,"
                                        + " DELETE, but get: %s.",
                                rowKind));
        }
    }
{code}

The code above add changeFlag to Tuple2 as the sign of upsert or delete

{code}
    @Override
    public void executeBatch() throws SQLException {
        for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
reduceBuffer.entrySet()) {
            if (entry.getValue().f0) {
                upsertExecutor.addToBatch(entry.getValue().f1);
            } else {
                // delete by key
                deleteExecutor.addToBatch(entry.getKey());
            }
        }
        upsertExecutor.executeBatch();
        deleteExecutor.executeBatch();
        reduceBuffer.clear();
    }
{code}
executeBatch deletes all false flag data after true flag data.
It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we 
would meet data lose because of this.


> Flink JDBC Sink may lose data in left join
> ------------------------------------------
>
>                 Key: FLINK-24626
>                 URL: https://issues.apache.org/jira/browse/FLINK-24626
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>            Reporter: Kenyore
>            Priority: Major
>
> The JDBC sink will lose some data while using 
> TableBufferReducedStatementExecutor and left join.
> Here are a snippet of executeBatch.
> {code:java}
>     @Override
>     public void executeBatch() throws SQLException {
>         for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : 
> reduceBuffer.entrySet()) {
>             if (entry.getValue().f0) {
>                 upsertExecutor.addToBatch(entry.getValue().f1);
>             } else {
>                 // delete by key
>                 deleteExecutor.addToBatch(entry.getKey());
>             }
>         }
>         upsertExecutor.executeBatch();
>         deleteExecutor.executeBatch();
>         reduceBuffer.clear();
>     }
> {code}
> Left join will generate a DETETE row before upsert row and the executeBatch 
> will excute them in a wrong order.Whitch may causes data lose.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to